Kafka and Spark
kafka.api.OffsetRequest.EarliestTime() => finds the beginning of the data in the logs and starts streaming
from there kafka.api.OffsetRequest.LatestTime() => will only stream new messages
ADT in dena kafka:
Map[TopicAndPartition, Range]
TopicAndPartition is a case class(topicName:String, partition:Int)
Range => (Offset, Offset)
Offset => Long
DataSet => A Dataset is a distributed collection of data
DataFrames => It is conceptually equivalent to a table in a relational database
Kafka: To post value to topic. use this code.
def sendT(producer: KProducer)(ae: AuditEvent): Task[Unit] = Task.delay{
val bytes = auditEventBytes(ae)
//Key, value
new ProducerRecord[Array[Byte], Array[Byte]](auditTopic, bytes) }.flatMap ( record =>
//producer.send returns a Future. Task.async is used to convert callback based API into
//monadic API.
Task.async{ cb => producer.send(record, new Callback{
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) cb(-\/(exception)) else cb(\/-(()))
}
})
()
}
)
//Used in this below code.
import java.nio.ByteBuffer
import java.util.UUID
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
import scalaz.concurrent.Task
import scalaz.{-\/, \/-}
object SimpleAudit {
def wrapInEnvelope(payload: Array[Byte]): EventEnvelope = {
EventEnvelope.newBuilder().
setPayload(ByteBuffer.wrap(payload)).
setTimestamp(System.currentTimeMillis()).
setEventId(UUID.randomUUID().toString).
setSourceType("service").
setSource("janitor").
setFormat("avro").
setEventVersion(AuditEvent.getClassSchema.getEventVersion.getOrElse(1).toInt).
setEventName(AuditEvent.getClassSchema.getFullName).
build()
}
def auditEventBytes(ae: AuditEvent): Array[Byte] = {
val payload: Array[Byte] = AvroUtil.recordToBytes(ae)
val envelope: EventEnvelope = wrapInEnvelope(payload)
AvroUtil.envToBytes(envelope)
}
def sendT(producer: KProducer)(ae: AuditEvent): Task[Unit] = Task.delay{
val bytes = auditEventBytes(ae)
new ProducerRecord[Array[Byte], Array[Byte]](auditTopic, bytes) }.flatMap ( record =>
Task.async{ cb => producer.send(record, new Callback{
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) cb(-\/(exception)) else cb(\/-(()))
}
})
()
}
)
}
//To Serailize:
def envToBytes(envelope: EventEnvelope): Array[Byte] = {
val out = new ByteArrayOutputStream
val enc = EncoderFactory.get.directBinaryEncoder(out, null)
val writer = new SpecificDatumWriter(classOf[EventEnvelope])
writer.write(envelope, enc)
out.flush()
out.toByteArray
}