Xiang Zhang
Xiang Zhang

Reputation: 2973

how to assign and check event time of a message in apache beam

My source is a KafkaIO.read(), and now I want to use a ParDo, to decode the messages come from kafka, and use one field of the message as the event time of this message. How can I do it? I didn't find any example on how to do it.

Upvotes: 1

Views: 316

Answers (1)

Stefan Repcek
Stefan Repcek

Reputation: 2603

First You need to implement CustomTimestampPolicy, by extending TimestampPolicy<KeyT,ValueT>

For example:

public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {


protected Instant currentWatermark;

public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
    currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}


@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
    currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return currentWatermark;
}

@Override
public Instant getWatermark(PartitionContext ctx) {
    return currentWatermark;
}

}

Then you need to pass your custom TimestampPolicy, when you setting up your KafkaIO source using functional interface TimestampPolicyFactory

KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
                .withTopic("foo")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
                .withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
                .updateConsumerProperties(kafkaProperties))

This line is responsible for creating a new timestampPolicy, passing a related partition and previous checkpointed watermark see the documentation

withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))

Upvotes: 1

Related Questions