iniyanp
2/27/2017 - 11:05 PM

Kafka and Spark

Kafka and Spark

 kafka.api.OffsetRequest.EarliestTime() => finds the beginning of the data in the logs and starts streaming
 from there kafka.api.OffsetRequest.LatestTime()   => will only stream new messages
 
 ADT in dena kafka:
 Map[TopicAndPartition, Range]
 
 TopicAndPartition is a case class(topicName:String, partition:Int)
 
 Range => (Offset, Offset)
 Offset => Long
 
 
 DataSet => A Dataset is a distributed collection of data
 DataFrames => It is conceptually equivalent to a table in a relational database
 
 Kafka: To post value to topic. use this code.
 
def sendT(producer: KProducer)(ae: AuditEvent): Task[Unit] = Task.delay{
    val bytes = auditEventBytes(ae)

    //Key, value
    new ProducerRecord[Array[Byte], Array[Byte]](auditTopic, bytes) }.flatMap ( record =>
    //producer.send returns a Future. Task.async is used to convert callback based API into 
    //monadic API.
    Task.async{ cb => producer.send(record, new Callback{
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        if (exception != null) cb(-\/(exception)) else cb(\/-(()))
      }
    })
      ()
    }
  )

//Used in this below code.


import java.nio.ByteBuffer
import java.util.UUID
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}

import scalaz.concurrent.Task
import scalaz.{-\/, \/-}


object SimpleAudit {

  def wrapInEnvelope(payload: Array[Byte]): EventEnvelope = {
    EventEnvelope.newBuilder().
      setPayload(ByteBuffer.wrap(payload)).
      setTimestamp(System.currentTimeMillis()).
      setEventId(UUID.randomUUID().toString).
      setSourceType("service").
      setSource("janitor").
      setFormat("avro").
      setEventVersion(AuditEvent.getClassSchema.getEventVersion.getOrElse(1).toInt).
      setEventName(AuditEvent.getClassSchema.getFullName).
      build()
  }

  def auditEventBytes(ae: AuditEvent): Array[Byte] = {
    val payload: Array[Byte] = AvroUtil.recordToBytes(ae)
    val envelope: EventEnvelope = wrapInEnvelope(payload)
    AvroUtil.envToBytes(envelope)
  }

  def sendT(producer: KProducer)(ae: AuditEvent): Task[Unit] = Task.delay{
    val bytes = auditEventBytes(ae)

    new ProducerRecord[Array[Byte], Array[Byte]](auditTopic, bytes) }.flatMap ( record =>

    Task.async{ cb => producer.send(record, new Callback{
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        if (exception != null) cb(-\/(exception)) else cb(\/-(()))
      }
    })
      ()
    }
  )

}

 //To Serailize:
 def envToBytes(envelope: EventEnvelope): Array[Byte] = {
    val out = new ByteArrayOutputStream
    val enc = EncoderFactory.get.directBinaryEncoder(out, null)
    val writer = new SpecificDatumWriter(classOf[EventEnvelope])
    writer.write(envelope, enc)
    out.flush()
    out.toByteArray
  }