mirror of https://github.com/scribd/kafka-player
initial commit
This commit is contained in:
commit
dfdae597ee
|
@ -0,0 +1,3 @@
|
|||
target
|
||||
.idea
|
||||
example.json
|
|
@ -0,0 +1,76 @@
|
|||
# Contributor Covenant Code of Conduct
|
||||
|
||||
## Our Pledge
|
||||
|
||||
In the interest of fostering an open and welcoming environment, we as
|
||||
contributors and maintainers pledge to making participation in our project and
|
||||
our community a harassment-free experience for everyone, regardless of age, body
|
||||
size, disability, ethnicity, sex characteristics, gender identity and expression,
|
||||
level of experience, education, socio-economic status, nationality, personal
|
||||
appearance, race, religion, or sexual identity and orientation.
|
||||
|
||||
## Our Standards
|
||||
|
||||
Examples of behavior that contributes to creating a positive environment
|
||||
include:
|
||||
|
||||
* Using welcoming and inclusive language
|
||||
* Being respectful of differing viewpoints and experiences
|
||||
* Gracefully accepting constructive criticism
|
||||
* Focusing on what is best for the community
|
||||
* Showing empathy towards other community members
|
||||
|
||||
Examples of unacceptable behavior by participants include:
|
||||
|
||||
* The use of sexualized language or imagery and unwelcome sexual attention or
|
||||
advances
|
||||
* Trolling, insulting/derogatory comments, and personal or political attacks
|
||||
* Public or private harassment
|
||||
* Publishing others' private information, such as a physical or electronic
|
||||
address, without explicit permission
|
||||
* Other conduct which could reasonably be considered inappropriate in a
|
||||
professional setting
|
||||
|
||||
## Our Responsibilities
|
||||
|
||||
Project maintainers are responsible for clarifying the standards of acceptable
|
||||
behavior and are expected to take appropriate and fair corrective action in
|
||||
response to any instances of unacceptable behavior.
|
||||
|
||||
Project maintainers have the right and responsibility to remove, edit, or
|
||||
reject comments, commits, code, wiki edits, issues, and other contributions
|
||||
that are not aligned to this Code of Conduct, or to ban temporarily or
|
||||
permanently any contributor for other behaviors that they deem inappropriate,
|
||||
threatening, offensive, or harmful.
|
||||
|
||||
## Scope
|
||||
|
||||
This Code of Conduct applies both within project spaces and in public spaces
|
||||
when an individual is representing the project or its community. Examples of
|
||||
representing a project or community include using an official project e-mail
|
||||
address, posting via an official social media account, or acting as an appointed
|
||||
representative at an online or offline event. Representation of a project may be
|
||||
further defined and clarified by project maintainers.
|
||||
|
||||
## Enforcement
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be
|
||||
reported by contacting the project team at rtyler@scribd.com. All
|
||||
complaints will be reviewed and investigated and will result in a response that
|
||||
is deemed necessary and appropriate to the circumstances. The project team is
|
||||
obligated to maintain confidentiality with regard to the reporter of an incident.
|
||||
Further details of specific enforcement policies may be posted separately.
|
||||
|
||||
Project maintainers who do not follow or enforce the Code of Conduct in good
|
||||
faith may face temporary or permanent repercussions as determined by other
|
||||
members of the project's leadership.
|
||||
|
||||
## Attribution
|
||||
|
||||
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
|
||||
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
|
||||
|
||||
[homepage]: https://www.contributor-covenant.org
|
||||
|
||||
For answers to common questions about this code of conduct, see
|
||||
https://www.contributor-covenant.org/faq
|
|
@ -0,0 +1,21 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) Scribd, Inc
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,66 @@
|
|||
# kafka-player
|
||||
|
||||
Plays a file onto a Kafka topic - one line == one message.
|
||||
|
||||
### Build
|
||||
|
||||
Build the project by running `sbt assembly`. This produces an uber jar at `target/scala-2.12/kafka-player.jar`.
|
||||
|
||||
### Invocation
|
||||
|
||||
For running against a local Kafka, invoke the player with a command like below:
|
||||
|
||||
```
|
||||
java -jar $jar_file \
|
||||
--message-file $message_file \
|
||||
--num-messages $num_messages \
|
||||
--messages-per-second $messages_per_second \
|
||||
--topic $topic \
|
||||
--broker-string $broker_string
|
||||
```
|
||||
|
||||
If running against a TLS authenticated Kafka cluster, include additional parameters to specify the keystore location and keystore password:
|
||||
|
||||
```
|
||||
--keystore-location $keystore_location
|
||||
--keystore-passphrase $keystore_passphrase
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
* `--message-file` - A file containing the lines to play onto Kafka. The player will play one message per line.
|
||||
* `--num-messages` - The number of messages to play. The default is 100,000 if not provided.
|
||||
* `--messages-per-second` - The number of messages to play per second. The default is 2 per second if not provided. Float values are supported (e.g. 0.5 to produce one message every two seconds).
|
||||
* `--topic` - The topic to play the messages onto.
|
||||
* `--broker-string` - The bootstrap broker string of the target cluster. The default is localhost:9092 if not provided.
|
||||
* `--keystore-location` - The location of the keystore to use for TLS authentication. Only set this if TLS authentication is desired.
|
||||
* `--keystore-passphrase` - The passphrase for the keystore. Only set this if TLS authentication is desired and a `keystore-location` is also provided.
|
||||
|
||||
### Local Kafka
|
||||
|
||||
A docker-compose file is included for local development for spinning up a single broker wurstmeister/kafka cluster (https://hub.docker.com/r/wurstmeister/kafka/) exposed to localhost.
|
||||
Topics required for specific scenarios may be added to the `KAFKA_CREATE_TOPICS` environment variable in `docker-compose.yml`.
|
||||
The required format is described in https://github.com/wurstmeister/kafka-docker#automatically-create-topics.
|
||||
|
||||
### Example
|
||||
|
||||
The `example` folder of this repository contains a data file called `example.json` and a launch script called `play-example.sh` to play the data file onto a local Kafka. Run the example with the following steps:
|
||||
|
||||
```
|
||||
# start the kafka docker containers in the background - this will also create a topic called `example` on the cluster.
|
||||
docker-compose up -d
|
||||
|
||||
# play the example data file onto the local kafka
|
||||
./bin/play-example.sh
|
||||
```
|
||||
|
||||
Once the player starts, run `kafkacat` in another terminal to verify your messages are coming in.
|
||||
|
||||
```
|
||||
kafkacat -C -b localhost:9092 -t example
|
||||
```
|
||||
|
||||
The example plays 10,000 messages at a rate of one message every two seconds.
|
||||
|
||||
The `example.json` data file is an abridged and slightly transformed (into a line delimited format) version of American movies from https://github.com/jdorfman/awesome-json-datasets#movies.
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
import Dependencies._
|
||||
|
||||
name := "kafka-player"
|
||||
organization := "com.scribd"
|
||||
version := "0.1.0"
|
||||
scalaVersion := "2.12.10"
|
||||
assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
scalaTest,
|
||||
logback,
|
||||
scalaLogging,
|
||||
scallop,
|
||||
kafkaClients,
|
||||
guava
|
||||
)
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
version: '3'
|
||||
services:
|
||||
kafka:
|
||||
image: wurstmeister/kafka
|
||||
ports:
|
||||
- '9092:9092'
|
||||
environment:
|
||||
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
|
||||
KAFKA_CREATE_TOPICS: 'example:1:1'
|
||||
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- '2181:2181'
|
Binary file not shown.
|
@ -0,0 +1,30 @@
|
|||
#!/bin/bash
|
||||
|
||||
#
|
||||
# Runs the `kafka-player` application against a local Kafka.
|
||||
#
|
||||
|
||||
set -eu
|
||||
|
||||
scriptpath="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
jar_file=$scriptpath/../target/scala-2.12/kafka-player.jar
|
||||
message_file=$scriptpath/example.json
|
||||
num_messages=10000
|
||||
messages_per_second=0.5
|
||||
topic=example
|
||||
broker_string=localhost:9092
|
||||
|
||||
if [[ ! -f $scriptpath/example.json ]]; then
|
||||
echo "Unzipping example.json"
|
||||
tar -xzvf $scriptpath/example.json.tar.gz -C $scriptpath/
|
||||
fi
|
||||
|
||||
echo "Starting kafka-player"
|
||||
|
||||
java -jar $jar_file \
|
||||
--message-file $message_file \
|
||||
--num-messages $num_messages \
|
||||
--messages-per-second $messages_per_second \
|
||||
--topic $topic \
|
||||
--broker-string $broker_string
|
|
@ -0,0 +1,11 @@
|
|||
import sbt._
|
||||
|
||||
object Dependencies {
|
||||
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8" % Test
|
||||
|
||||
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
|
||||
lazy val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
|
||||
lazy val scallop = "org.rogach" %% "scallop" % "3.3.1"
|
||||
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % "2.2.1"
|
||||
lazy val guava = "com.google.guava" % "guava" % "28.0-jre"
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
|
|
@ -0,0 +1 @@
|
|||
sbt.version = 1.3.8
|
|
@ -0,0 +1,11 @@
|
|||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
|
@ -0,0 +1,91 @@
|
|||
package com.scribd.streams.kafka.player
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import com.google.common.util.concurrent.RateLimiter
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig}
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.rogach.scallop.{ScallopConf, ScallopOption}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
/**
|
||||
* Main driver for the application.
|
||||
*/
|
||||
object Driver {
|
||||
|
||||
/**
|
||||
* Command line options for the application.
|
||||
*
|
||||
* @param arguments
|
||||
*/
|
||||
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
|
||||
val messageFile: ScallopOption[String] = opt[String](required = true,
|
||||
descr = "A file containing the lines to play onto Kafka. The player will play one message per line.")
|
||||
val numMessages: ScallopOption[Int] = opt[Int](default = Some(100000),
|
||||
descr = "The number of messages to play. The default is 100,000 if not provided.")
|
||||
val messagesPerSecond: ScallopOption[Float] = opt[Float](default = Some(2.0f),
|
||||
descr = "The number of messages to play per second. The default is 2 per second if not provided. Float values are supported (e.g. 0.5 to produce one message every two seconds).")
|
||||
val topic: ScallopOption[String] = opt[String](required = true,
|
||||
descr = "The topic to play the messages onto.")
|
||||
val brokerString: ScallopOption[String] = opt[String](default = Some("localhost:9092"),
|
||||
descr = "The bootstrap broker string representing for the target cluster. The default is localhost:9092 if not provided.")
|
||||
val keystoreLocation: ScallopOption[String] = opt[String](
|
||||
descr = "The location of the keystore to use for TLS authentication. Only set this if TLS authentication is desired.")
|
||||
val keystorePassphrase: ScallopOption[String] = opt[String](
|
||||
descr = "The passphrase for the keystore. Only set this if TLS authentication is desired and a `keystore-location` is also provided.")
|
||||
|
||||
verify()
|
||||
}
|
||||
|
||||
lazy val logger: Logger = Logger(getClass)
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf = new Conf(args)
|
||||
|
||||
val rateLimiter: RateLimiter = RateLimiter.create(conf.messagesPerSecond())
|
||||
|
||||
val kafkaProperties = createKafkaProperties(conf)
|
||||
val producer = createProducer(kafkaProperties)
|
||||
|
||||
val messageSource = Source.fromFile(conf.messageFile())
|
||||
|
||||
FilePlayer.play(producer, messageSource, conf.numMessages(), rateLimiter, conf.topic())
|
||||
|
||||
producer.close()
|
||||
messageSource.close()
|
||||
|
||||
sys.ShutdownHookThread {
|
||||
logger.warn("Shutting down producer from shutdown hook.")
|
||||
|
||||
producer.close()
|
||||
messageSource.close()
|
||||
}
|
||||
}
|
||||
|
||||
private def createKafkaProperties(conf: Conf): Properties = {
|
||||
val kafkaProperties = new Properties()
|
||||
|
||||
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.brokerString())
|
||||
kafkaProperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
|
||||
|
||||
if (conf.keystoreLocation.isDefined) {
|
||||
kafkaProperties.put("security.protocol", "SSL")
|
||||
kafkaProperties.put("ssl.keystore.location", conf.keystoreLocation())
|
||||
}
|
||||
|
||||
if (conf.keystorePassphrase.isDefined) {
|
||||
kafkaProperties.put("ssl.keystore.password", conf.keystorePassphrase())
|
||||
}
|
||||
|
||||
kafkaProperties
|
||||
}
|
||||
|
||||
private def createProducer(kafkaProperties: Properties): Producer[String, String] =
|
||||
new KafkaProducer[String, String](
|
||||
kafkaProperties,
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
)
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
package com.scribd.streams.kafka.player
|
||||
|
||||
import com.google.common.util.concurrent.RateLimiter
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
|
||||
|
||||
import scala.io.BufferedSource
|
||||
|
||||
/**
|
||||
* Plays files onto Kafka topics.
|
||||
*/
|
||||
object FilePlayer {
|
||||
lazy val logger: Logger = Logger(getClass)
|
||||
|
||||
/**
|
||||
* Starts playing the given message source onto the topic.
|
||||
*
|
||||
* @param producer the producer client to send messages through
|
||||
* @param messageSource the message source to read messages from
|
||||
* @param numMessages the total number of messages to play onto the topic
|
||||
* @param rateLimiter the rate limiter to control message send rate
|
||||
* @param topic the topic to play messages onto
|
||||
*/
|
||||
def play(producer: Producer[String, String], messageSource: BufferedSource, numMessages: Int, rateLimiter: RateLimiter, topic: String): Unit = {
|
||||
var generatedMessageCount = 0
|
||||
|
||||
logger.info(s"Will play $numMessages messages. Rate limited to ${rateLimiter.getRate} messages per second on $topic.")
|
||||
|
||||
messageSource.getLines().foreach(m => {
|
||||
if (!m.isEmpty) {
|
||||
// only produce the desired number of messages per second.
|
||||
rateLimiter.acquire(1)
|
||||
|
||||
val producerRecord = new ProducerRecord[String, String](topic, m)
|
||||
|
||||
val recordMetadata = producer.send(producerRecord).get()
|
||||
|
||||
generatedMessageCount += 1
|
||||
|
||||
if (generatedMessageCount % 1000 == 0) {
|
||||
logger.info(s"Played $generatedMessageCount events so far.")
|
||||
logger.info(s"Last timestamp played is ${recordMetadata.timestamp()} at offset ${recordMetadata.offset()} on partition ${recordMetadata.partition()}")
|
||||
}
|
||||
}
|
||||
|
||||
// stop if we've played the desired number of events.
|
||||
if (generatedMessageCount >= numMessages) {
|
||||
logger.info(s"Played $generatedMessageCount messages. Stopping.")
|
||||
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue