chtefi
3/28/2016 - 8:39 PM

Kafka Producer/Consumer (0.9)

Kafka Producer/Consumer (0.9)

Set(elo, __consumer_offsets)
Set(elo, __consumer_offsets)
[Partition(topic = elo, partition = 8, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 5, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 4, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 7, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 9, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 3, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 6, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 0, leader = 0, replicas = [0,], isr = [0,]]
[Partition(topic = elo, partition = 8, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 5, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 4, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 7, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 9, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 3, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 6, leader = 0, replicas = [0,], isr = [0,], Partition(topic = elo, partition = 0, leader = 0, replicas = [0,], isr = [0,]]
first one: 1 : ConsumerRecord(topic = elo, partition = 4, offset = 52976, key = 1013, value = 1013)
second one: 1 : ConsumerRecord(topic = elo, partition = 8, offset = 53579, key = 1018, value = 1018)
...
second one: 12 : ConsumerRecord(topic = elo, partition = 6, offset = 53779, key = 1328, value = 1328)
first one: 13 : ConsumerRecord(topic = elo, partition = 0, offset = 55155, key = 1331, value = 1331)
second one: 13 : ConsumerRecord(topic = elo, partition = 9, offset = 53122, key = 1332, value = 1332)
second one: 13 : ConsumerRecord(topic = elo, partition = 9, offset = 53123, key = 1333, value = 1333)
import java.util.Properties
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

object Producer extends App {
  def props(host: String) = {
    val props = new Properties()
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer")
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
    props
  }

  val producer = new KafkaProducer[Integer, String](props("localhost:9092"))
  1 to 100000 foreach { i =>
    print(s"produce $i ...")
    val future: Future[RecordMetadata] = producer.send(new ProducerRecord[Integer, String]("elo", i, Integer.toString(i)))
    val result: RecordMetadata = future.get()
    println(s"offset ${result.offset} / partition ${result.partition} / topic ${result.topic}")
    Thread.sleep(100)
  }

}
import java.util
import java.util.Properties
import java.util.concurrent.{Executors}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.ProducerConfig

import scala.collection.JavaConverters._

object Consumer extends App {
  def props(host: String) = {
    val props = new Properties()
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "elo-group")
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer")
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
    props
  }

  // Let's simulate 3 consumers in parallel
  val tpool = Executors.newFixedThreadPool(3)

  def createConsumerThread(id: String) = new Runnable {
    override def run(): Unit = {
      val consumer = new KafkaConsumer[Integer, String](props("localhost:9092"))
      val topics = consumer.listTopics.asScala
      println(topics.keys)
      topics foreach {
        case (name, list) if name.equals("elo") => println(list)
        case _ =>
      }

      consumer.subscribe(util.Arrays.asList("elo"))
      var i = 0
      while (true) {
        val records: ConsumerRecords[Integer, String] = consumer.poll(1000)
        records.asScala.foreach(x => println(s"${id}: ${i} : $x"))
        i += 1
      }
    }
  }

  tpool.submit(createConsumerThread("first one"))
  tpool.submit(createConsumerThread("second one"))

}
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.9.0.0"