Kristof Jozsa
Kristof Jozsa

Reputation: 7462

Kafka Streams handling timeout on a cluster

In a Kafka based distributed JVM application running in several instances, I need to act on the event of "not receiving" a certain message in a specific Kafka topic for a certain configurable amount of time (this timeout value is driven by the business logic, is subject to change).

How can I accomplish this in a cluster-safe way?

Upvotes: 2

Views: 91

Answers (1)

mike1234569
mike1234569

Reputation: 666

Is the goal to trace latency of the E2E flow or is there some trigger which causes a second message to be expected in some configurable time?

If tracking latency, some options include:

  1. Add a timestamp to the message. When the message is received, the latency can be calculated and used.
  2. Add UUID, timestamp, and current component to the message and delegate message tracking to a separate service partitioned on UUID.

If some trigger causes a second message to be expected, some options include:

  1. Partition the relevant topic in a way that guarantees the expected message will either arrive or not arrive at only 1 JVM (similar to 2 above). This will allow a list of expected messages to be kept in memory. Remove the expected messages when received and every N seconds handle 'not received' messages.
  2. Keep track of the expected messages in a data store (DB/distributed cache). When received, remove the records. Periodically, handle 'not received' messages.

Edit:

With details in the comment, one way to approach this with a callback style approach. Messages can be routed to a specific server by setting a partition key. By adding an intermediate topic/service partitioned on UUID it should be possible to achieve this as follows:

  1. Send Message A to ttl_routing_service. Message A should include a UUID, TTL, where to send the message (functional topic), and what to do on expiry.
  2. Routing Service picks up the message and tracks some metadata (ex: TTL/what to do on timeout) in a local cache or starts a delayed coroutine then routes message A to the functional topic including the UUID.
  3. On completion of message A processing, a message can be sent to ttl_routing_service with the UUID preventing the coroutine or removing the cached record.
  4. If not removed, 'what to do on expiry' is performed.

Upvotes: 1

Related Questions