Kafka Producer/Consumer (0.9)
Set(elo, __consumer_offsets)
Set(elo, __consumer_offsets)
[Partition(topic = elo, partition = 8, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 5, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 4, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 7, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 9, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 3, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 6, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 0, leader = 0, replicas = [0,], isr = [0,]]
[Partition(topic = elo, partition = 8, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 5, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 4, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 7, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 9, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 3, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 6, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 0, leader = 0, replicas = [0,], isr = [0,]]
first one: 1 : ConsumerRecord(topic = elo, partition = 4, offset = 52976, key = 1013, value = 1013)
second one: 1 : ConsumerRecord(topic = elo, partition = 8, offset = 53579, key = 1018, value = 1018)
...
second one: 12 : ConsumerRecord(topic = elo, partition = 6, offset = 53779, key = 1328, value = 1328)
first one: 13 : ConsumerRecord(topic = elo, partition = 0, offset = 55155, key = 1331, value = 1331)
second one: 13 : ConsumerRecord(topic = elo, partition = 9, offset = 53122, key = 1332, value = 1332)
second one: 13 : ConsumerRecord(topic = elo, partition = 9, offset = 53123, key = 1333, value = 1333)
import java.util.Properties
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
object Producer extends App {
def props(host: String) = {
val props = new Properties()
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer")
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
props
}
val producer = new KafkaProducer[Integer, String](props("localhost:9092"))
1 to 100000 foreach { i =>
print(s"produce $i ...")
val future: Future[RecordMetadata] = producer.send(new ProducerRecord[Integer, String]("elo", i, Integer.toString(i)))
val result: RecordMetadata = future.get()
println(s"offset ${result.offset} / partition ${result.partition} / topic ${result.topic}")
Thread.sleep(100)
}
}
import java.util
import java.util.Properties
import java.util.concurrent.{Executors}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.ProducerConfig
import scala.collection.JavaConverters._
object Consumer extends App {
def props(host: String) = {
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "elo-group")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer")
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
props
}
// Let's simulate 3 consumers in parallel
val tpool = Executors.newFixedThreadPool(3)
def createConsumerThread(id: String) = new Runnable {
override def run(): Unit = {
val consumer = new KafkaConsumer[Integer, String](props("localhost:9092"))
val topics = consumer.listTopics.asScala
println(topics.keys)
topics foreach {
case (name, list) if name.equals("elo") => println(list)
case _ =>
}
consumer.subscribe(util.Arrays.asList("elo"))
var i = 0
while (true) {
val records: ConsumerRecords[Integer, String] = consumer.poll(1000)
records.asScala.foreach(x => println(s"${id}: ${i} : $x"))
i += 1
}
}
}
tpool.submit(createConsumerThread("first one"))
tpool.submit(createConsumerThread("second one"))
}
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.9.0.0"