gstackoverflow
gstackoverflow

Reputation: 37034

How to save message into database and send response into topic eventually consistent?

I have the following rabbitMq consumer:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
     public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            sendNotificationIntoTopic(message);
            saveIntoDatabase(message);
     }
};

Following situation can occur:

  1. Message was send into topic successfully
  2. Connection to database was lost so database insert was failed.

As a result we have data inconsistency.

Expected result either both action were successfully executed or both were not executed at all.

Any solutions how can I achieve it?

P.S.

Currently I have following idea(please comment upon)

We can suppose that broker doesn't lose any messages.

We have to be subscribed on topic we want to send.

  1. Save entry into database and set field status with value 'pending'
  2. Attempt to send data to topic. If send was successfull - update field status with value 'success'
  3. We have to have a sheduled job which have to check rows with pending status. At the moment 2 cases are possible:
    3.1 Notification wasn't send at all
    3.2 Notification was send but save into database was failed(probability is very low but it is possible)

    So we have to distinquish that 2 cases somehow: we may store messages from topic in the collection and job can check if the message was accepted or not. So if job found a message which corresponds the database row we have to update status to "success". Otherwise we have to remove entry from database.

I think my idea has some weaknesses(for example if we have multinode application we have to store messages in hazelcast(or analogs) but it is additional point of hypothetical failure)

Upvotes: 6

Views: 4463

Answers (4)

Krishna
Krishna

Reputation: 473

If there is enough time to modify the design, it is recommended to use JTA like APIs to manage 2phase commit. Even weblogic and WebSphere support XA resource for 2 phase commit.

If timeline is less, it is suggested perform as below to reduce the failure gap.

  • Send data topic (no commit) (incase topic is down, retry to be performed with an interval)
  • Write data into DB
  • Commit DB
  • Commit Topic

Here failure will happen only when step 4 fails. It will result in same message send again. So receiving system will receive duplicate message. Each message has unique messageID and CorrelationID in JMS2.0 structure. So finding duplicate is bit straight forward (but this is to be handled at receiving system)

Both case will work for clustered environment as well.


Strict to your case, thought below steps might help to overcome your issue

Subscribe a listener listener-1 to your topic.

Process-1

  • Add DB entry with status 'to be sent' for message msg-1
  • Send message msg-1 to topic. Retry sending incase of any topic failure If step 2 failed after certain retry, process-1 has to resend the msg-1 before sending any new messages OR step-1 to be rolled back

Listener-1

  • Using subscribed listener, read reference(meesageID/correlationID) from Topic, and update DB status to SENT, and read/remove message from topic. Incase reference-read success and DB update failed, topic still have message. So next read will update DB. Incase DB update success and message removal failed. Listener will read again and tries to update message which is already done. So can be ignored after validation.

Incase listener itself down, topic will have messages until listener reading the messages. Until then SENT messages will be in status 'to be sent'.

Upvotes: 0

Alexander Petrov
Alexander Petrov

Reputation: 9492

Here is an example of Try Cancel Confirm pattern https://servicecomb.apache.org/docs/distributed_saga_3/ that should be capable of dealing with your problem. You should tolerate some chance of double submission of the data via the queue. Here is an example:

  1. Define abstraction Operation and Assign ID to the operation plus a timestamp.
  2. Write status Pending to the database (you can do this in the same step as 1)
  3. Write a listener that polls the database for all operations with status pending and older than "timeout"
  4. For each pending operation send the data via the queue with the assigned ID.
  5. The recipient side should be aware of the ID and if the ID has been processed nothing should happen.

6A. If you need to be 100% that the operation has completed you need a second queue where the recipient side will post a message ID - DONE. If such consistency is not necessary skip this step. Alternatively it can post ID -Failed reason for failure.

6B. The submitting side either waits for a message from 6A of completes the operation by writing status DONE to the database.

  • Once a sertine timeout has passed or certain retry limit has passed. You write status to operation FAIL.
  • You can potentialy send a message to the recipient side opertaion with ID rollback.

Notice that all this steps do not involve a technical transactions. You can do this with a non transactional database.

What I have written is a variation of the Try Cancel Confirm Pattern where each recipient of message should be aware of how to manage its own data.

Upvotes: 1

Arpan Kanthal
Arpan Kanthal

Reputation: 503

Here's the pseudocode for how i'd do it: (Assuming the dao layer has transactional capability and your messaging layer doesnt)

    //Start a transaction
    try {
                String message = new String(body, "UTF-8");
               // Ordering is important here as I'm assuming the database has commit and rollback capabilities, but the messaging system doesnt. 
                saveIntoDatabase(message);
                sendNotificationIntoTopic(message);

    } catch (MessageDeliveryException) {
        // rollback the transaction
        // Throw a domain specific exception
    }
   //commit the transaction

Scenarios:
1. If the database fails, the message wont be sent as the exception will break the code flow .
2. If the database call succeeds and the messaging system fails to deliver, catch the exception and rollback the database changes

All the actions necessary for logging and replaying the failures can be outside this method

Upvotes: 0

gstackoverflow
gstackoverflow

Reputation: 37034

  1. In the listener save database row with field staus='pending'
  2. Another job(separated thread) will obtain all pending rows from DB and following for each row:
    2.1 send data to topic
    2.2 save into database

If we failured on the step 1 - everything is ok - data in consistent state because job won't know anything about that data

if we failured on the step 2.1 - no problem, next job invocation will attempt to handle it

if we failured on the step 2.2 - If we failured here - it means that next job invocation will handle the same data again. From the first glance you can think that it is a problem. But your consumer has to be idempotent - it means that it has to understand that message was already processed and skip the processing. This requirement is a consequence that all message brokers have guarantees that message will be delivered AT LEAST ONCE. So our consumers have to be ready for duplicated messages anyway. No problem again.

Upvotes: 0

Related Questions