Reputation: 3742
protected void consume(KafkaReadStream<String, JsonObject> consumer) {
consumer.handler(record -> {
// call a REST API to write a value to DB
doRestApi(record); // such as client.postAbs(...).send(...)
});
}
For example, records in topic like,
{"createtime":"2021-10-29 00:03:49.0"}
{"createtime":"2021-10-29 00:03:50.0"}
{"createtime":"2021-10-29 00:03:56.0"}
...
{"createtime":"2021-10-29 00:13:56.0"}
When the record {"createtime":"2021-10-29 00:03:49.0"}
was coming, the handler did a REST api call to save the createtime=2021-10-29 00:03:49.0
to mongodb.
when the second record came, createtime=2021-10-29 00:03:50.0
would be saved.
Then the third record, createtime=2021-10-29 00:03:56.0
would be saved.
And so on.
I hope these messages to be handled sequentially. As a result, the createtime in the last message 2021-10-29 00:13:56.0
would be the final value in DB.
When I ran my code, the result value was random and the former value(such as 2021-10-29 00:03:56.0
) might be the final value.
How could I make the handler to run sequentially?
Upvotes: 0
Views: 233