chtefi
7/19/2017 - 9:19 AM

Kafka Streams - Custom timestamp extractor, from a `long` field named "timestamp"

Kafka Streams - Custom timestamp extractor, from a long field named "timestamp"

/**
 * Handle records with a timestamp in their Avro value.
 * Expects a LONG field named "timestamp".
 * Any problem makes this extractor return the record's internal timestamp.
 */
public class InValueTimestampExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if (record != null && record.value() != null) {
      // Is it an Avro record ?
      if (record.value() instanceof GenericRecord) {
        GenericRecord value = (GenericRecord) record.value();
        // Does it have a timestamp field, and is the field a LONG ?
        Schema.Field field = value.getSchema().getField("timestamp");
        if (field != null && field.schema().getType().equals(Schema.Type.LONG)) {
          // Get the timestamp from the record value
          return (long) value.get(field.pos());
        }
      }
    }
    return record.timestamp();
  }
}