Pranav
Pranav

Reputation: 81

Run process on any one Pod in the Kubernetes deployment

I have an application that runs on multiple pods and scales out when traffic increases. One of the features of the application is that "It picks messages from Kafka and triggers email".

When multiple pods are running, all of them trigger the email as all pods are supposed to pick up the msg as per the design.

How can I restrict the email functionality to work on any one of the pods at a time?

Cluster - EKS , Programming language - Scala AKKA

Upvotes: 0

Views: 1441

Answers (2)

JavaTechnical
JavaTechnical

Reputation: 9357

How can I restrict the email functionality to work on any one of the pods at a time?

In short: Use same consumer group for all the pods that trigger the email. Usually, workloads are categorized into groups based on the work they do. Members of the same group share workload amongst themselves.

You could certainly have given Kafka consumer configurations like bootstrap.servers etc to your pods. In that configuration, give a property with name group.id to some value like email-trigger-group for example and then the workload will be shared as you would expect.

You could have used labels for your pods that trigger email. You can use the same label value for your consumer group.id for all of your pods.


We can divide the problem into two sub-problems:

1. Trigger email

This workload can be shared by multiple consumers in the group.

2. Answer requests to the frontend

Use manual consumer.assign() for the whole topic (all partitions).

The frontend would specify the timestamp from where it would want new messages from i.e. messages with timestamps > this timestamp will be retrieved from all partitions of the topic. To do this use consumer.offsetsForTimes() to get the timestamps, poll and send the messages as response.

List<TopicPartition> topicPartitions = consumer.partitionsFor("your_topic").stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()).toList();
consumer.assign(topicPartitions);

// Populate the map
Map<TopicPartition, Long> partitionTimestamp = new LinkedHashMap<>();

// Add the same timestamp received from frontend for all partitions
topicPartitions.forEach(topicPartition -> partitionTimestamp.put(topicPartition, timestampFromFrontend));

// Get the offsets and seek
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(offsetsForTimestamp);

// Seek to the offsets
offsetsForTimes.forEach( (tp, oft) -> consumer.seek(tp, oft.offset()) );

// Poll and return
consumer.poll();

Upvotes: 3

Ghokun
Ghokun

Reputation: 3465

If you are using Kafka you can use partitions.

For each topic there can be multiple partitions. These partitions are shared between consumers.

For example:

Email Topic: Partitions[0,1,2,3,4,5]

Email Consumer Group:
   Consumer 1: Partitions[0,3]
   Consumer 2: Partitions[1,4]
   Consumer 3: Partitions[2,5]

On Scale Up Event:
   Consumer 1: Partitions[0]
   Consumer 2: Partitions[1]
   Consumer 3: Partitions[2,5]
   Consumer 4: Partitions[3]
   Consumer 5: Partitions[4]

On Scale Down Event:
   Consumer 1: Partitions[0,2,4]
   Consumer 2: Partitions[1,3,5]

This way only one consumer in particular group will consume message.

Upvotes: 0

Related Questions