hivefans
5/5/2016 - 12:02 PM

Kafka Offset Range Provider/Storage Engine

Kafka Offset Range Provider/Storage Engine

package com.blackberry.bdp.korpse

import scala.collection.parallel.ParIterable
import scala.collection.mutable
import scala.collection.mutable.{ Map => MutableMap }

import org.apache.spark.streaming.kafka.OffsetRange

import kafka.api._
import kafka.api.OffsetResponse
import kafka.api.PartitionOffsetsResponse
import kafka.cluster.Broker
import kafka.network.BlockingChannel
import kafka.common.ErrorMapping
import kafka.common.{ TopicAndPartition, OffsetAndMetadata }

import org.slf4j.{ Logger, LoggerFactory }

import java.io.IOException

case class HostAndPort(host: String, port: Int)

case class AllBrokersFailed(str: String) extends Exception(str)

case class CorrelationMismatchException(str: String) extends Exception(str)

object Korpse {
  def apply(seedBrokersStr: String,
    topicsStr: String,
    consumerGroupId: String,
    kafkaClientId: String,
    defaultOffset: Long = -1) = {
    new Korpse(parseBrokerString(seedBrokersStr),
      parseTopicsString(topicsStr),
      consumerGroupId,
      kafkaClientId,
      defaultOffset)
  }

  def parseTopicsString(topicsStr: String): Seq[String] = {
    topicsStr.split("""\s?,\s?""").map(_.trim).toSeq
  }

  def parseBrokerString(seedBrokerStr: String): Seq[HostAndPort] = {
    val seedBrokerBuffer = new mutable.ListBuffer[HostAndPort]()
    for (seedBroker <- seedBrokerStr
        .split("""\s?,\s?""")
        .map(_.trim)) {
      val host :: port :: xs = seedBroker
      .split("""\s?:\s?""").map(_.trim).toList
      seedBrokerBuffer += HostAndPort(host, port.toInt)
    }
    seedBrokerBuffer.toSeq
  }
}

class Korpse(
    seedBrokers: Seq[HostAndPort],
    topics: Seq[String],
    consumerGroupId: String,
    kafkaClientId: String,
    defaultOffset: Long) {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass);
  var correlationId = 0
  val backOffMs = 500
  val retries = 3
  val backoffExponent = 2
  val partitions = getPartitions()

  var partitionDefaultStarts: Option[Map[TopicAndPartition, Long]] = None
  var topicDefaultStarts: Option[Map[String, Long]] = None
  var globalDefaultStarts: Long = -1

  // Gets a channel to a specific broker
  private def getChannel(broker: Broker): Option[BlockingChannel] = {
    return getChannel(HostAndPort(broker.host, broker.port), true)
  }

  // Gets a channel from a list of seed brokers
  private def getChannel(seedBrokers: Seq[HostAndPort]): Option[BlockingChannel] = {
    for (seedBroker <- seedBrokers) {
      try {
        return getChannel(new HostAndPort(seedBroker.host, seedBroker.port))
      } catch {
        case ioe: IOException => println(s"Blocking Channel "
          + "IOException on $host:$port")
      }
    }
    return None
  }

  // Gets a channel from a host and port 
  private def getChannel(host: HostAndPort, 
      ignoreCoordinatorRedirect: Boolean = false): 
      Option[BlockingChannel] = {
    try {
      println(s"Establishing channel to $host")
      var channel = new BlockingChannel(host.host,
        host.port,
        BlockingChannel.UseDefaultBufferSize,
        BlockingChannel.UseDefaultBufferSize,
        5000);
      correlationId = correlationId + 1
      channel.connect
      channel.send(ConsumerMetadataRequest(consumerGroupId,
        ConsumerMetadataRequest.CurrentVersion,
        correlationId,
        kafkaClientId));
      val metadataResponse = ConsumerMetadataResponse.readFrom(
        channel.receive.buffer);
      if (correlationId != metadataResponse.correlationId)
        throw new CorrelationMismatchException(
          "ConsumerMetadataRequest: " + correlationId + ", " +
            "ConsumerMetadataResponse: " + metadataResponse.correlationId)
      if (metadataResponse.errorCode == ErrorMapping.NoError) {
        if (ignoreCoordinatorRedirect) {
          println("Connected to " + channel.host + ":" + channel.port 
              + " and not redirecting to my coordinator for offset mgmt") 
          return Option(channel)
        }          
        val offsetManager = metadataResponse.coordinatorOpt
        if (offsetManager.isDefined) {
          channel.disconnect
          channel = new BlockingChannel(offsetManager.get.host,
            offsetManager.get.port,
            BlockingChannel.UseDefaultBufferSize,
            BlockingChannel.UseDefaultBufferSize,
            5000);
          channel.connect
          return Option(channel)
        }
      } else {
        println("Error code: " + metadataResponse.errorCode)
        return None
      }
    } catch {
      case ioe: IOException =>
        println(s"Blocking Channel "
          + "IOException on $host:$port")
        return None
    }
    return None
  }

  /**
   * This mapping of TopicAndPartition to Broker is used as
   * subsequent OffsetRequest's to fetch latest/earliest
   * offset must be sent to the partiton leader only
   */
  private def getPartitions(): Map[TopicAndPartition, Broker] = {
    val tempMap = MutableMap[TopicAndPartition, Broker]()
    correlationId = correlationId + 1
    val channel = getChannel(seedBrokers)
    if (!channel.isDefined)
      throw new Exception("Cannot get channel from seed brokers")
    channel.get.send(new TopicMetadataRequest(topics, correlationId))
    val response = TopicMetadataResponse.readFrom(channel.get.receive.buffer)
    channel.get.disconnect()
    if (correlationId != response.correlationId)
      throw new CorrelationMismatchException(
        "TopicMetadataRequest: " + correlationId + ", " +
          "TopicMetadataResponse: " + response.correlationId)
    for (topicMetadata <- response.topicsMetadata) {
      for (partitionMetadata <- topicMetadata.partitionsMetadata) {
        if (!partitionMetadata.leader.isDefined) {
          println("ERROR: leader not defined for " + partitionMetadata)
        } else {
          println("discovered partition: " + partitionMetadata)
          val partition = TopicAndPartition(topicMetadata.topic,
            partitionMetadata.partitionId)
          tempMap += (partition -> partitionMetadata.leader.get)
        }
      }
    }    
    return tempMap.toMap
  }

  /**
   * See code references 1 and 2, there's something weird about
   * the OffsetRequest, I would have suspected that a None would
   * have been present for the optional instead of always getting
   * back a -1.  There's the default Kafka parameter named
   * auto.offset.reset that defaults to largest (-1) which doesn't
   * seem to be relevant for using a blocking channel to make API
   * calls directly.  Needs more investigation as this workaround
   * seems hackish -- dariens
   */
  def getFromOffsets(): Option[Map[TopicAndPartition, Long]] = {
    var fromOffsets = MutableMap[TopicAndPartition, Long]()
    var unknowns = MutableMap[TopicAndPartition, Broker]()
    val channel = getChannel(seedBrokers)
    if (!channel.isDefined)
      throw new Exception("Cannot get channel from seed brokers")
    
    correlationId = correlationId + 1
    val fetchRequest = new OffsetFetchRequest(consumerGroupId,
      partitions.keySet.toSeq, 1,
      correlationId,
      kafkaClientId);
    channel.get.send(fetchRequest)
    val fetchResponse = OffsetFetchResponse.readFrom(channel.get.receive.buffer)
    channel.get.disconnect()
    if (correlationId != fetchResponse.correlationId)
      throw new CorrelationMismatchException(
        "OffsetFetchRequest: " + correlationId + ", " +
          "OffsetFetchResponse: " + fetchResponse.correlationId)
    for ((partition, broker) <- partitions) {
      val metadataAndError = fetchResponse.requestInfo.get(partition)
      if (metadataAndError.isDefined) {
        val errorCode = metadataAndError.get.error
        if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) {
          println("channel created on " + channel.get.host + ":" + channel.get.port
            + " is not coordinator for group " + consumerGroupId
            + " and client ID " + kafkaClientId + ",  retrying")
          Thread sleep backOffMs
          return getFromOffsets
        } else if (errorCode == ErrorMapping.OffsetsLoadInProgressCode) {
          LOG.warn("offset load in processs for " + partition.topic
            + "-" + partition.partition + ", retrying")
          Thread sleep backOffMs
          return getFromOffsets
        } else {
          if (metadataAndError.get.offset == -1) {
            println(s"$partition received -1  OffsetFetchResponse " 
                + s"fetching default for time $defaultOffset")
            // Code reference [1] -- why do we need to do this?! 
            val offset = getDefaultFromOffset(partition, broker)
            if (offset.isDefined) {
              println(s"$partition default offset for $defaultOffset was " + offset.get)
              fromOffsets += (partition -> offset.get)
            } else {
              throw new Exception(s"couldn't find offset for partition: $partition")
            }
          } else {
            fromOffsets += (partition -> metadataAndError.get.offset)
            println("mapping from offset for partion " + partition
              + " to " + metadataAndError.get.offset)
          }
        }
      } else {
        // Code reference [2] -- Does this ever execute--since it appears we get a -1 when 
        // There are no offsets committed 
        println(partition + ": Committed offset not found, fetching default")
        val offset = getDefaultFromOffset(partition, broker)
        if (offset.isDefined) {
          println(s"$partition default offset for $defaultOffset was " + offset.get)
          fromOffsets += (partition -> offset.get)
        } else {
          throw new Exception("couldn't find any from offset for partition: " + partition)
        }
      }
    }    
    return Option(fromOffsets.toMap)
  }

  def getDefaultFromOffset(partition: TopicAndPartition,
    broker: Broker): Option[Long] = {
    val channel = getChannel(broker)
    if (!channel.isDefined)
      throw new Exception(s"Cannot get channel for broker $broker")
    correlationId = correlationId + 1
    val reqInfo = Map(partition -> PartitionOffsetRequestInfo(defaultOffset, 1))
    val request = OffsetRequest(reqInfo, 1, correlationId, kafkaClientId)
    channel.get.send(request)
    val response = OffsetResponse.readFrom(channel.get.receive.buffer)
    channel.get.disconnect()
    if (correlationId != response.correlationId)
      throw new CorrelationMismatchException(
        "OffsetRequest: " + correlationId + ", " +
          "OffsetResponse: " + response.correlationId)
    if (response.hasError) {
      println(s"Trying to fetch offset request for time $defaultOffset failed")
      return None
    } else {
      val offsetResponse = response.partitionErrorAndOffsets.get(partition).get
      println(s"$partition fetched offset for time $defaultOffset is " + offsetResponse.offsets(0))
      return Option(offsetResponse.offsets(0))
    }
    return None
  }

  def storeOffsetRanges(offsetRanges: Array[OffsetRange]): Unit = {
    val channel = getChannel(seedBrokers)
    if (!channel.isDefined)
      throw new Exception("Cannot get channel from seed brokers")
    val now = System.currentTimeMillis()
    val offsets = mutable.Map[TopicAndPartition, OffsetAndMetadata]()
    for (osr <- offsetRanges) {
      val partition = TopicAndPartition(osr.topic, osr.partition)
      val oam = OffsetAndMetadata(osr.untilOffset, "metadata?", now)
      offsets += (partition -> oam)
      println("[" + osr.topic + "-" + osr.partition + "] offset: "
        + osr.untilOffset + " added to  commit request")
    }
    correlationId = correlationId + 1
    val commitRequest = new OffsetCommitRequest(consumerGroupId, offsets.toMap, 1,
      correlationId, kafkaClientId);
    try {
      channel.get.send(commitRequest)
      val commitResponse = OffsetCommitResponse.readFrom(channel.get.receive.buffer)
      channel.get.disconnect()
      if (correlationId != commitResponse.correlationId)
        throw new CorrelationMismatchException(
          "OffsetCommitRequest: " + correlationId + ", " +
            "OffsetCommitResponse: " + commitResponse.correlationId)
      if (commitResponse.hasError) {
        for ((partition, broker) <- partitions) {
          val status = commitResponse.commitStatus.get(partition)
          if (!status.isDefined) {
            println("WARN: commit status was not defined for " + partition)
          } else {
            val errorCode = status.get
            if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode
              || errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) {
              println("[" + partition.topic + "-" + partition.partition
                + "] Consumer coordinator has moved, need to retry")
              Thread sleep 500
              storeOffsetRanges(offsetRanges)
            } else if (errorCode == ErrorMapping.OffsetMetadataTooLargeCode) {
              throw new Exception("WARN: partition " + partition
                + ": You must reduce the size of the metadata if you wish to retry")
            }
          }
        }
      } else {
        println("offsets and metadata stored without errors")
      }
    } catch {
      case ioe: IOException => println("an IOException occred: " + ioe)
    }

  }

}
package com.blackberry.bdp.kafkafilter

import kafka.common.TopicAndPartition
import kafka.serializer.StringDecoder
import kafka.message.MessageAndMetadata
import org.apache.spark.{ SparkConf, TaskContext }
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka.{ OffsetRange, HasOffsetRanges }
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.mutable
import com.blackberry.bdp.korpse._

object Main {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass);

  def main(args: Array[String]) {
    if (args.length != 7) {
      println("Required arguments:")
      println("\t1 => metadata brokers")
      println("\t2 => topic names (comma seperated)")
      println("\t3 => batch length (seconds)")
      println("\t4 => consumer group ID")
      println("\t5 => Kafka client ID")
      println("\t6 => default offset (-2 earliest, -1 latest)")
      System.exit(1);
    }

    val seedBrokerList = args(0)
    val topics = args(1)
    val batchSeconds = args(2).toInt
    val consumerGroup = args(3)
    val kafkaClientId = args(4)
    val defaultOffset = args(5)

    val ssc = new StreamingContext(new SparkConf, Seconds(batchSeconds))
    
    val kafkaParams = Map(
        "metadata.broker.list" -> seedBrokerList)
    
    val korpse = Korpse(seedBrokerList, 
        topics, 
        consumerGroup, 
        kafkaClientId, 
        defaultOffset.toLong)

    val fromOffsets = korpse.getFromOffsets()
    
    if (!fromOffsets.isDefined) {
      LOG.error("Unable to determine starting offsets")
      System.exit(1)
    }
    
    println("\n***\nfrom offsets: " + fromOffsets + "\n***\n")

    fromOffsets.get.foreach((t2) => println(t2._1 + " starting offset: " + t2._2))

    var offsetRanges = Array[OffsetRange]()

    //  Create a stream from our offset ranges that includes the Kafka message, partition and offset
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long)](ssc, kafkaParams, fromOffsets.get,
      (mmd: MessageAndMetadata[String, String]) => (mmd.message(), mmd.partition, mmd.offset))
      .transform { rdd =>
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }

    // Typical application specific Spark stream processing now follows  
    
    val results = stream
      .filter() // you can filter the stream 
      .map() // ...and apply a mapping function 
      .reduceByKey() // then reduce by key
      .foreachRDD(rdd => {
        rdd.foreach(println)
      })
      
    // Now we want to persist the offset ranges that we're consuming so we 
    // start from where we left off if/when the job is stoped/crashes and is 
    // restarted
    
    stream.foreachRDD { rdd =>
      korpse.storeOffsetRanges(offsetRanges)
    }

      

    ssc.start();
    ssc.awaitTermination();

  }

}