Phoebe
Phoebe

Reputation: 3

Is there any way for .Net Worker Service to Recover After Restart?

I have a scenario wants to use .Net Worker service to handle some queue messages. Ideally, there maybe several service instances deployed, each instance will have a Worker service. And each Worker service will handle several messages. I will store all this message handle states into Mongo DB.

For example, if current there are 5 messages under handle, and Worker instances are 3 as below. Worker 1 handle 1.4. Worker 2 handle 3.5 Worker 3 handle 2. The Mongo DB will store those 5 messages' state, like "active", percentage **.

While Suddenly, Worker 2 crashed. And later Worker 2 instance recovered.

For now, is there any way to find which messages are previous handled by the crashed Worker? So that I can filtered them and continue handle?

Is something like Pod id or instance id will still be the same before and after this Worker restart so that I can also store into MongoDB as the relation between Worker and Message?

Upvotes: 0

Views: 88

Answers (1)

Suresh Chikkam
Suresh Chikkam

Reputation: 3242

As Arko said Generating GUID, for each worker instance checks that you can track and identify messages processed by that specific instance. This identifier can be associated with messages in MongoDB, enabling you to resume processing of messages that were being handled by the crashed worker.

  • When your worker service starts, retrieve the unique instance identifier. Use this identifier to query MongoDB and identify messages that were previously being processed by this instance before a crash.

handle worker restart and message recovery:

using MongoDB.Driver;
using System;
using System.Linq;

public class WorkerService
{
    private readonly IMongoCollection<MessageState> _messageCollection;
    private readonly string _workerInstanceId;

    public WorkerService(IMongoDatabase database)
    {
        _messageCollection = database.GetCollection<MessageState>("messages");
        _workerInstanceId = GetWorkerInstanceId(); // Retrieve or generate a unique instance identifier
    }

    public void Start()
    {
        // On service start, recover messages previously handled by this instance
        var recoveredMessages = _messageCollection
            .Find(msg => msg.WorkerInstanceId == _workerInstanceId && msg.Status == "processing")
            .ToList();

        foreach (var message in recoveredMessages)
        {
            // Process recovered messages
            ProcessMessage(message);
        }

        // Start processing new messages
        ListenForMessages();
    }

    private void ListenForMessages()
    {
        // Logic to listen for new messages and process them
    }

    private void ProcessMessage(MessageState message)
    {
        // Logic to process a message
    }

    private string GetWorkerInstanceId()
    {
        // Logic to retrieve or generate a unique instance identifier
        return "unique_instance_id";
    }
}

public class MessageState
{
    public string MessageId { get; set; }
    public string Status { get; set; }
    public string WorkerInstanceId { get; set; }
}
  • By this worker service starts and the first retrieves messages from MongoDB that were being processed by the current instance before it crashed.

Reference:

Upvotes: 0

Related Questions