Produce async to allow higher throughput.

This commit is contained in:
Christian 2020-04-27 17:54:47 -04:00
parent f878788ef7
commit d10563f24e
1 changed files with 30 additions and 17 deletions

View File

@ -1,8 +1,10 @@
package com.scribd.streams.kafka.player
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import com.google.common.util.concurrent.RateLimiter
import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
import org.apache.kafka.clients.producer.{Producer, ProducerRecord, RecordMetadata}
import scala.io.BufferedSource
@ -12,6 +14,9 @@ import scala.io.BufferedSource
object FilePlayer {
lazy val logger: Logger = Logger(getClass)
var generatedMessageCount = new AtomicInteger(0)
var playing = new AtomicBoolean(false)
/**
* Starts playing the given message source onto the topic.
*
@ -22,33 +27,41 @@ object FilePlayer {
* @param topic the topic to play messages onto
*/
def play(producer: Producer[String, String], messageSource: BufferedSource, numMessages: Int, rateLimiter: RateLimiter, topic: String): Unit = {
var generatedMessageCount = 0
playing.set(true)
logger.info(s"Will play $numMessages messages. Rate limited to ${rateLimiter.getRate} messages per second on $topic.")
messageSource.getLines().foreach(m => {
if (!playing.get()) return
if (!m.isEmpty) {
// only produce the desired number of messages per second.
rateLimiter.acquire(1)
val producerRecord = new ProducerRecord[String, String](topic, m)
val recordMetadata = producer.send(producerRecord).get()
generatedMessageCount += 1
if (generatedMessageCount % 1000 == 0) {
logger.info(s"Played $generatedMessageCount events so far.")
logger.info(s"Last timestamp played is ${recordMetadata.timestamp()} at offset ${recordMetadata.offset()} on partition ${recordMetadata.partition()}")
}
}
// stop if we've played the desired number of events.
if (generatedMessageCount >= numMessages) {
logger.info(s"Played $generatedMessageCount messages. Stopping.")
return
producer.send(producerRecord, sendCallback(numMessages))
}
})
}
private def sendCallback(numMessages: Int)(recordMetadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exception.printStackTrace()
throw exception
}
val g = generatedMessageCount.incrementAndGet()
if (g % 1000 == 0) {
logger.info(s"Played $g events so far.")
logger.info(s"Last timestamp played is ${recordMetadata.timestamp()} at offset ${recordMetadata.offset()} on partition ${recordMetadata.partition()}")
}
if (g >= numMessages) {
playing.set(false)
logger.info(s"Played $g messages. Stopping.")
return
}
}
}