user2720864
user2720864

Reputation: 8161

Adding timestamp in kafka message payload

Is there any way I can a timestamp header in Kafka message payload? I want to check when the message was created at the consumer end and apply custom logic based on that.

EDIT:

I'm trying to find a way to attach some custom value (basically timestamp) to the message published by producers, so that I may able to consume message for a specific time duration. Right now Kafka only make sure that the message will be delivered in a order they were put in the queue. But in my case a previously generated record might arrive after a certain delay( so a message generated at time T1 might have a higher offset 1 than another generated at later time T2 with offset 0). For this reason they will not be in the order I expect at the consumer's end. So I am basically looking for a way out to consume them in a ordered way.

The current Kafka 0.8 release provides no way to attach anything other than the "Message Key" at the producer end, found a similar topic here where it was advised to encode the same in the message payload. But I did a lot of searching but couldn't find a possible approach.

Also I don't know if such approach have any impact on the overall performance of Kafka as it manages the message offset internally and there are no such API exposed so far as can be seen from this page

Really appreciate any clue if this at all the right way I am thinking or if there is any probable approach, I am all set to give it a try

Upvotes: 2

Views: 11537

Answers (4)

Joe Atzberger
Joe Atzberger

Reputation: 3306

Note, Kafka introduced timestamps to the internal representation of a message pursuant to this discussion: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

and these tickets: https://issues.apache.org/jira/browse/KAFKA-2511

It should be available in all versions of Kafka 0.10.0.0 and greater.

The problem here is that you ingested messages in an order you no longer want. If the order matters, then you need to abandon parallelism in the relevant Producer(s). Then the problem at the Consumer level goes away.

Upvotes: 0

Leandro
Leandro

Reputation: 67

This looks like it will help you achieve your goals. It allows you with little effort define and write your message headers hiding the (de)serialization burden. The only thing you have to provide is a (de)serializer for the actual object you're sending through the wire. This implementation actually delays the deserialization process of the payload object as much as possible, this means that you can (in a very performant and transparent way) deserialize the headers, check the timestamp and only deserialize the payload (the heavy bit) if/when you are sure the object is useful to you.

Upvotes: 0

laughing_man
laughing_man

Reputation: 3978

You can make a class that contains your partitioning information and the timestamp when this message was created, and then use this as the key to the Kafka message. You can then use a wrapper Serde that transforms this class into a byte array and back because Kafka can only understand bytes. Then, when you receive the message at the consumer end as a bag of bytes, you can deserialize it and retrieve the timestamp and then channel that into your logic.

For example:

public class KafkaKey implements Serializable {
    private long mTimeStampInSeconds;
    /* This contains other partitioning data that will be used by the
    appropriate partitioner in Kafka. */
    private PartitionData mPartitionData;

    public KafkaKey(long timeStamp, ...) {
        /* Initialize key */
        mTimeStampInSeconds = timestamp;
    }

    /* Simple getter for timestamp */
    public long getTimeStampInSeconds() {
        return mTimeStampInSeconds;
    }

    public static byte[] toBytes(KafkaKey kafkaKey) {
        /* Some serialization logic. */
    }

    public static byte[] toBytes(byte[] kafkaKey) throws Exception {
        /* Some deserialization logic. */
    }
}

/* Producer End */

KafkaKey kafkaKey = new KafkaKey(System.getCurrentTimeMillis(), ... );
KeyedMessage<byte[], byte[]> kafkaMessage = new KeyedMessage<>(topic, KafkaKey.toBytes(kafkaKey), KafkaValue.toBytes(kafkaValue));

/* Consumer End */
MessageAndMetadata<byte[],byte[]> receivedMessage = (get from consumer);
KafkaKey kafkaKey = KafkaKey.fromBytes(receivedMessage.key());

long timestamp = kafkaKey.getTimeStampInSeconds();
/*
 * And happily ever after */

This will be more flexible than making specific partitions correspond to time intervals. Else, you'll have to keep adding partitions for different time ranges, and keep a separate, synchronized tabulation of what partition corresponds to what time range, which can get unwieldy quickly.

Upvotes: 1

Biks
Biks

Reputation: 705

If you want to consume message for specific time duration then I can provide you a solution, however to consume messages in ordered way from that time duration is difficult. I am also looking for the same solution. Check the below link

Message Sorting in Kafka Qqueue

Solution to fetch data for specific time

For time T1,T2,...TN , where T is the range of time; divide the topic in N number of partition. Now produced the messages using Partitioner Class in such a way that messages generation time should be used to decide which partition should be used for this message.

Similarly while consuming subscribe to the exact partition for the time range you want to consume.

Upvotes: 4

Related Questions