chtefi
12/14/2016 - 2:11 AM

How to use Kafka in tests (+ schemarepo)

How to use Kafka in tests (+ schemarepo)

import java.nio.file.Files

import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServerStartable}
import kafka.utils.ZkUtils
import org.apache.curator.test.TestingServer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.schemarepo.{InMemoryRepository, SubjectConfig, ValidatorFactory}
import org.springframework.util.SocketUtils
import collection.JavaConversions._

object KafkaInfrastructure {
  def apply(topics: Set[String], subjects: Set[String] = Set()) = new KafkaInfrastructure(topics, subjects)
}

class KafkaInfrastructure(topics: Set[String], subjectsSet: Set[String] = Set()) {

  val zkServer = new TestingServer(SocketUtils.findAvailableTcpPort())
  val logsDir = Files.createTempDirectory("test-kafka").toFile
  logsDir.deleteOnExit()

  val kafkaServer = new KafkaServerStartable(KafkaConfig(Map(
    "broker.id" -> 1,
    "zookeeper.connect" -> zkServer.getConnectString,
    "log.dir" -> logsDir.getAbsolutePath,
    "port" -> SocketUtils.findAvailableTcpPort()
    // "host.name" -> "localhost",
    // "batch.size" -> 1,
  )))
  kafkaServer.startup()

  val broker = kafkaServer.serverConfig.hostName + ":" + kafkaServer.serverConfig.port
  topics.foreach(AdminUtils.createTopic(ZkUtils(zkServer.getConnectString, 30000, 30000, isZkSecurityEnabled = false), _, 10, 1))

  lazy val subjects = subjectsSet.map(s => s -> new InMemoryRepository(ValidatorFactory.EMPTY).register(s, SubjectConfig.emptyConfig())).toMap
  lazy val subject = subjects.head._2

  def consume(topic: String, timeout: Int = 2000) = {
    val consumer = new KafkaConsumer(Map(
      "bootstrap.servers" -> broker,
      "key.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
      "group.id" -> "testid",
      "auto.offset.reset" -> "earliest"
    ))

    consumer.subscribe(List(topic))
    val records = consumer.poll(timeout)
    consumer.close()
    records
  }

  def shutdown() = {
    kafkaServer.shutdown()
    zkServer.stop()
  }
}