Reputation: 344
I have an event-sourced service that listens to a Kafka topic and saves state in a relational DB.
Considering a suitable restoration strategy for this service (i.e. how to restore the DB in a disaster recovery scenario), one option would be to save the current offset in the DB, take snapshots, and restore from a snapshot. In this scenario the service would need to seek to the offset when started in 'restoration mode'.
I am using Spring Cloud Stream, and was wondering if the framework provides any mechanism for seeking to an offset?
I realise another option for restoration would be to simply play all the events from scratch, but that's not an ideal option for some of my microservices.
Upvotes: 1
Views: 1960
Reputation: 13731
There is KafkaBindingRebalanceListener interface that you can use
@Slf4j
@Component
public class KafkaRebalanceListener implements KafkaBindingRebalanceListener {
@Value("${config.kafka.topics.offsets:null}")
private String topicOffsets;
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
if (topicOffsets != null && initial) {
final Optional<Map<TopicPartition, Long>> offsetsOptional = parseOffset(topicOffsets);
if (offsetsOptional.isPresent()) {
final Map<TopicPartition, Long> offsetsMap = offsetsOptional.get();
partitions.forEach(tp -> {
if (offsetsMap.containsKey(tp)) {
final Long offset = offsetsMap.get(tp);
try {
log.info("Seek topic {} partition {} to offset {}", tp.topic(), tp.partition(), offset);
consumer.seek(tp, offset);
} catch (Exception e) {
log.error("Unable to set offset {} for topic {} and partition {}", offset, tp.topic(), tp.partition());
}
}
});
}
}
}
private Optional<Map<TopicPartition, Long>> parseOffset(String offsetParam) {
if (offsetParam == null || offsetParam.isEmpty()) {
return Optional.empty();
}
return Optional.of(Arrays.stream(offsetParam.split(","))
.flatMap(slice -> {
String[] items = slice.split("\\|");
String topic = items[0];
return Arrays.stream(Arrays.copyOfRange(items, 1, items.length))
.map(r -> {
String[] record = r.split(":");
int partition = Integer.parseInt(record[0]);
long offset = Long.parseLong(record[1]);
return new AbstractMap.SimpleEntry<>(new TopicPartition(topic, partition), offset);
});
}).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)));
}
}
config.kafka.topics.offsets field look like this but you can use any format
String topicOffsets = "topic2|1:100|2:120|3:140,topic3|1:1000|2:1200|3:1400";
Upvotes: 2
Reputation: 6126
If you're talking disaster, what makes you think you can write anything to DB? In other words you may end up dealing with de-duplication on at least one event (at least you have to account for that) and if so, then de-duplication is still something you have to deal with.
I understand your concern with re-play (you simply don't want to reply from the beginning, but you can store periodic snapshots which would ensure you have a relatively fixed amount of events hat may need to be reprocessed/de-dupped.
That said, Kafka maintains the current offset, so you can rely on natural transaction features of Kafka to ensure that the next time you start your microservice it will begin from the last un-processed (successfully) offset.
Upvotes: 1