Reputation: 24442
I'm evaluating Event Sourcing with Apache Kafka Streams to see how viable it is for complex scenarios. As with relational databases I have come across some cases were atomicity/transactionality is essential:
Shopping app with two services:
Flow:
OrderService publishes an OrderCreated event (with productId, orderId, userId info)
ProductService gets the OrderCreated event and queries its KafkaStreams Store (ProductStockStore) to check if there is stock for the product. If there is stock it publishes an OrderUpdated event (also with productId, orderId, userId info)
The point is that this event would be listened by ProductService Kafka Stream, which would process it to decrease the stock, so far so good.
But, imagine this:
The obvious problem is that our materialized view (the store) should be updated directly when we process the first OrderUpdated event. However the only way (I know) of updating the Kafka Stream Store is publishing another event (OrderUpdated) to be processed by the Kafka Stream. This way we can't perform this update transactionally.
I would appreciate ideas to deal with scenarios like this.
UPDATE: I'll try to clarify the problematic bit of the problem:
ProductService has a Kafka Streams Store, ProductStock with this stock (productId=1, quantity=1)
OrderService publishes two OrderPlaced events on the orders topic:
Event1 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")
Event2 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")
ProductService has a consumer on the orders topic. For simplicity let's suppose a single partition to assure messages consumption in order. This consumer executes the following logic:
if("OrderPlaced".equals(event.get("eventType"))){
Order order = new Order();
order.setId((String)event.get("orderId"));
order.setProductId((Integer)(event.get("productId")));
order.setUid(event.get("uid").toString());
// QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
Integer productStock = getProductStock(order.getProductId());
if(productStock > 0) {
Map<String, Object> event = new HashMap<>();
event.put("name", "ProductReserved");
event.put("orderId", order.getId());
event.put("productId", order.getProductId());
// WRITES A PRODUCT RESERVED EVENT TO orders topic
orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
}else{
//XXX CANCEL ORDER
}
}
ProductService also has a Kafka Streams processor that is responsible to update the stock:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");
Event1 would be processed first and since there is still 1 available product it would generate the ProductReserved event.
Now, it's Event2's turn. If it is consumed by ProductService consumer BEFORE the ProductService Kafka Streams Processor processes the ProductReseved event generated by Event1, the consumer would still see that the ProductStore stock for product1 is 1, generating a ProductReserved event for Event2, then producing an inconsistency in the system.
Upvotes: 3
Views: 1519
Reputation: 156
This answer is a little late for your original question, but let me answer anyway for completeness.
There are a number of ways to solve this problem, but I would encourage addressing this is an event driven way. This would mean you (a) validate there is enough stock to process the order and (b) reserve the stock as a single, all within a single KStreams operation. The trick is to rekey by productId, that way you know orders for the same product will be executed sequentially on the same thread (so you can't get into the situation where Order1 & Order2 reserve stock of the same product twice).
There is a post that talks discusses how to do this: https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
Maybe more usefully there is some sample code also showing how it can be done: https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76
Note how in this KStreams code the first line rekeys to productId, then a Transformer is used to (a) validate there is sufficient stock to process the order and (b) reserve the stock required by updating the state store. This is done atomically, using Kafka's Transactions feature.
Upvotes: 4
Reputation: 4314
This same problem is typical in assuring consistency in any distributed system. Instead of going for strong consistency, typically the process manager/saga pattern is used. This is somewhat similar to the 2-phase commit in distributed transactions but implemented explicitly in application code. It goes like this:
The Order Service asks the Product Service to reserve N items. The Product Service either accepts the command and reduces stock or rejects the command if it doesn't have enough items available. Upon positive reply to the command the Order Service can now emit OrderCreated event (although I'd call it OrderPlaced, as "placed" sounds mode idiomatic to the domain and "created" is more generic, but that's a detail). The Product Service either listens for OrderPlaced events or an explicit ConfirmResevation command is sent to it. Alternatively, if something else happened (e.g. failed to clear funds), an appropriate event can be emitted or CancelReservation command sent explicitly to the ProductService. To cater for exceptional circumstances, the ProductService may also have a scheduler (in KafkaStreams punctuation can come in handy for this) to cancel reservations that weren't confirmed or aborted within a timeout period.
The technicalities of the orchestration of the two services and handling the error conditions and compensating actions (cancelling reservation in this case) can be handled in the services directly, or in an explicit Process Manager component to segregate this responsibility. Personally I'd go for an explicit Process Manager that could be implemented using Kafka Streams Processor API.
Upvotes: 4