Reputation: 16465
I have a usecase:
I need to read and aggregate messages from a kafka topic at regular intervals and publish to a different topic. Localstorage is not an option. This is how I am planning to address this, any suggestions to improve are welcome
To schedule the aggregation and publishing of kafka messages, planning to use completionInterval option of Aggregator EIP. Here is the code.
@Autowired ObjectMapper objectMapper;
JacksonDataFormat jacksonDataFormat;
@PostConstruct
public void initialize(){
//objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
}
and the route:
public void configure() throws Exception {
from("kafka:localhost:9092?topic=item-events" +
"&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
.routeId("kafkapoller")
.unmarshal(jacksonDataFormat)
.aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
.marshal().json(JsonLibrary.Jackson)
.to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
}
Upvotes: 7
Views: 1891
Reputation: 7067
This looks ok. Things to keep in mind:
PersistentAggregationRepository
to store/replay messages, although you can replay the messages you lost from kafka (this would be my biggest operational concern) AggregateController
to enable you to externally force completion of the exchange, so you can do things like issue a shutdown to camel and then call this to complete the inflight exchange Upvotes: 3