duskandawn
duskandawn

Reputation: 675

BlockingQueue inside Transaction java

I am in process of building a system which has a basic Produce Consumer paradigm flavor to it, but the Producer part needs to be a in a Transaction mode. Here is my exact scenario :

Poller Thread - 
 [Transaction START]
   Polls the DB for say 10 records
     * Sets the status in DB for those records as IN-Progress
     * Puts the above 10 records in a LinkedBlockingQueue - workqueue
 [Transaction END]

Worker Thread Pool of 5
   * Polls the workqueue for the tasks, do some lookups and update the same records in DB with some looked up values.

Now my problem is with the Part 1 of the process because if for lets says some reason my process of extracting and updating from DB is successful but inserting in the queue fails for one record, i can rollback the whole transaction, and all my records in DB will be in state NOT Processed, but there can some elements which will be inserted in this work queue, and my worker threadpool can pickup them and start processing, which should not happen.

Trying to find if there is a way to have the writes to the blockingqueue in transnational manner.

Thinking of adding some writelock() readlock() mechanisms, if i can stop the worker threads from reading if something is being written in the queue.

Any thoughts for a better approach.

Thanks,

Upvotes: 0

Views: 518

Answers (1)

vanOekel
vanOekel

Reputation: 6538

Consider the worst case scenario: the unplug case (database connection lost) and the crash case (program out of memory). How would you recover from that?

Some hints:

  • If you can think of a reason why inserting in the queue would fail (queue full), don't start the transaction. Just skip one poll.
  • First commit the in-progress transaction, then add all records to the workqueue. Or use one transaction per record so you can add the records to the workqueue one by one.
  • Maintain an in-memory HashSet of IDs of all records being processed. If the ID is in the set but the record is not in-progress, or vice versa, something is very wrong (e.g. task for a record did not complete/crashed).
  • Set a timestamp when in-progress was set. Have another background-process check for records that are in-progress for too long. Reset the in-progress state if the ID is not in the in-progress HashSet and your normal process will retry to operation.
  • Make your tasks idempotent: see if you can find a way that tasks can recognize work for the record has already been done. This might be a relative expensive operation, but it will give you the guarantee that work is only done once in case of retries.

Upvotes: 1

Related Questions