Saurabh khandelwal
Saurabh khandelwal

Reputation: 53

How to handle the messages in Kafka producer if the retries exhausted

We have to implement the User registration module, We have two services

  1. Identity service
  2. UserRegistration service

Now when the User is registered from the UserRegistration service we want to send user details to the Identity service so the user can login to the system. We are using Kafka to achieve this.

In our cases UserRegistration service act as a Kafka producer. And the flow would be as follows:

  1. The request comes for registering the user.
  2. Store the User Data in the Database.
  3. Send a message to Kafka.

Cases:

  1. If the producer's request succeeded, i.e message store it in the topic.

    • Return Success message to End-User.
  2. If the producer's request fails,

    • Retry by sending the message until the retries limit.
  3. If the retries are exhausted.

    - What to do in this situation? A message could be lost in this case.

I think we should store the message in the Database (ie. in table "failed-messages") if the retries are exhausted and have a background service that will Loop over the failed messages to try sending them again to Kafka after every time interval.

Please suggest to us the best practices to handle this case.

Thanks, Saurabh.

Upvotes: 2

Views: 7106

Answers (4)

monish001
monish001

Reputation: 690

Transactional Outbox pattern in compliment to the Polling publisher pattern can be explored as another possible option!

Transactional Outbox pattern - https://microservices.io/patterns/data/transactional-outbox.html Polling publisher pattern - https://microservices.io/patterns/data/polling-publisher.html

Upvotes: 0

Saurabh khandelwal
Saurabh khandelwal

Reputation: 53

I have used Approach 1 as mentioned in the above post.

I have created the Cosmos DB source connector to fetch User data from the cosmos DB and publish it to the Kafka topic "new_user_registered". Cosmos DB source connector configuration:

{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "connect.cosmos.task.poll.interval": "1000",
    "connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "UserDb",
    "connect.cosmos.containers.topicmap": "new_user_registered#User",
    "connect.cosmos.offset.useLatest": true,
    "topic.creation.enable": "false",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "output.data.format": "JSON",
    "transforms": "replacefield",
    "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.replacefield.exclude": "id,_rid,_self,_etag,_attachements,_ts",
    "transforms.replacefield.include": "Login,Email,Password,Name"
  }
}

Then created an Azure SQL sink connector which fetches data from the Kafka topic "new_user_registered". Azure SQL sink connector configuration:

{
    "name": "sqlserver-sink-azure-connector",
    "config": {
        "name": "sqlserver-sink-azure-connector",
        "connector.class": "io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "transforms": "RenameFields",
        "transforms.RenameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameFields.renames": "Email:vchEmail,Login:vchLogin,Name:vchName,Password:vchPassword",
        "topics": "NEW_USER_REGISTERED_AVRO",
        "azure.sql.dw.url": "jdbc:sqlserver://192.168.154.131:1433;",
        "azure.sql.dw.user": "sa",
        "azure.sql.dw.password": "password123",
        "azure.sql.dw.database.name": "DatabaseName",
        "table.name.format": User"
        "insert.mode": "insert",
        "auto.create": "true",
        "auto.evolve": "true",
        "tasks.max": "1",
        "confluent.topic.bootstrap.servers": "broker:29092"
    }
}

But the sink connector throws the exception "No fields found using key and value schemas for table: User"

For this, I found the solution in the following post: Kafka Connect JDBC sink connector not working

Solution 1: We need to send schema and payload in messages. (this is not suitable for us)

Solution 2: Use the Confluent Avro serializer. To go with Avro we found a video (https://www.youtube.com/watch?v=b-3qN_tlYR4&t=1300s) provided by confluent where we can use KSQL and streams to convert JSON to Avro and then fetch data using a connector from the newly created topic by the stream.

But I was thinking should I use a sink connector with KSQL/Streams lengthy stuff for my case where I just want to Sync Users between two Services without any transformation and schema.

Can anybody suggest, should I go with traditional Consumer or Kafka connect?

Thanks, Saurabh

Upvotes: 0

wengkee
wengkee

Reputation: 1

I think it depends how critical is the business use case here. While you can store the message in a separate database and continue the workflow, chances are something is wrong with Kafka brokers or things that require a closer look. IMO, moving the message temporarily to a database is a way to handle the backpressure, not a solution - and it creates more loosing ends, like how do you handle the reprocessing of the messages?

In this kind of situation, I prefer to KISS (keep it simple and stupid). If situation allows, just raise an alert and failed the process

Upvotes: 0

JavaTechnical
JavaTechnical

Reputation: 9357

If retries are exhausted...

The retries are typically set to Integer.MAX_VALUE in KafkaProduer with a limit set on the time rather than the no. of retries.

Please suggest to us the best practices to handle this case.

Approach #1

When you store the data in the database, you can use a Kafka source connector like Debezium (for example) to stream the database changes to Kafka so that you can avoid writing a producer in your UserRegistration service. This would be a cleaner approach IMO because of several reasons:

  1. You need not let your user wait till the kafka message is published and acknowledged by the broker
  2. You need not worry about the retrying logic in your UserRegistration service.
  3. Even if Kafka is down for a while, your user should not be impacted.

Approach #2

An alternative approach would be to delegate the producing task to another thread that runs periodically.

The work of this thread is to check for any updates to the database and push those updates to Kafka. Should anything fail, this thread is responsible for retrying and ensuring that the message lands in Kafka.

One thing to mention here about this approach is that, if you have multiple instances of your UserRegistration service running, you need to ensure that you distribute the records-to-be-sent amongst different instances so that you don't end up sending duplicates to Kafka. This becomes a stateful service because you need to co-ordinate amongst multiple instances. It is relatively difficult to implement this.


Approach #3

If your code has already been written where your UserRegistration service is using KafkaProducer which is producing the records in the request handling thread itself, you can try increasing the timeout to a larger value (delivery.timeout.ms) depending upon your Kafka cluster and leave the retries to Integer.MAX_VALUE. Remember that you would need to ensure that your message is delivered somehow. There are 2 ways:

  1. Async: Trust your timeout value and retries by setting to larger value. At some point you should hope that your message will be sent to Kafka. Be extra cautious about other errors like serialization issues, buffer memory limit etc.
  2. Sync: Call the get() (or) get(time, TimeUnit) and block your request thread till it is sent to Kafka. This delays the response to your client.

If I had to choose, I would go with Approach #1 because it is clean and straightforward.

Upvotes: 1

Related Questions