Reputation: 142
I am trying to read a Kafka topic from a startTime to an endTime, it's ok to read few more messages outside this interval but I want to definitely process all messages in the interval. I checked Simple Consumer and found getOffsetBefore() which will give me offsets before my startTime. But I am not sure how to get offsets for each partition after an endTime. Please help!
Upvotes: 1
Views: 4107
Reputation: 1876
Below kafka consumer api is available since 0.10.1 ver
/**
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
*
* This is a blocking call. The consumer does not have to be assigned the partitions.
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
* will be returned for that partition.
*
* Notice that this method may block indefinitely if the partition does not exist.
*
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws IllegalArgumentException if the target timestamp is negative.
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// we explicitly exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
Upvotes: 1
Reputation: 62285
There is no guarantee about end time, because nobody can foresee the future.
Assume you know the start offset and read all data to the end of the topic. There could still be a producer, that writes a record with a timestamp that belongs to you range...
Note, Kafka's record timestamps are metadata and thus, any record can have any timestamp. Brokers do not interpret this timestamp in any way (only Streams API does). Thus, Kafka brokers only guarantee offset based message ordering, not timestamp based ordering. If records are not ordered by time, ie, a record with larger offset has a timestamp with that is smaller than record with smaller offset -- the record is a so-called "late record" (with regard to time), and there is no upper-bound on lateness.
You can only decide in you business logic how far you want to read. Thus, given the start offset, you would just consumer you messages and monitor the timestamp at the same time. Than, you can either stop processing when you see the first record with timestamp larger than you interval -- this would be the most strict processing and it would not allow for any late arriving record. Probability, that you "miss" some data is relatively high.
Or you apply a less restrictive upper bound, and read until you see a record with timestamp greater than interval upper bound + X
with X
being your config parameter of choice. As larger X
as smaller the probability that you miss any record.
Upvotes: 0