Kafka Streams - Custom timestamp extractor, from a long
field named "timestamp"
/**
* Handle records with a timestamp in their Avro value.
* Expects a LONG field named "timestamp".
* Any problem makes this extractor return the record's internal timestamp.
*/
public class InValueTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
if (record != null && record.value() != null) {
// Is it an Avro record ?
if (record.value() instanceof GenericRecord) {
GenericRecord value = (GenericRecord) record.value();
// Does it have a timestamp field, and is the field a LONG ?
Schema.Field field = value.getSchema().getField("timestamp");
if (field != null && field.schema().getType().equals(Schema.Type.LONG)) {
// Get the timestamp from the record value
return (long) value.get(field.pos());
}
}
}
return record.timestamp();
}
}