enzom83
enzom83

Reputation: 8320

A producer consumer queue with an additional thread for a periodic backup of data

I'm trying to implement a concurrent producer/consumer queue with multiple producers and one consumer: the producers add some data to the Queue, and the consumer dequeues these data from the queue in order to update a collection. This collection must be periodically backed up to a new file. For this purpose I created a custom serializable collection: serialization could be performed by using the DataContractSerializer.

The dequeuing of an item at a time may not be efficient, so I thought of using two queues: when the first queue becomes full, the producers notify the consumer by invoking Monitor.Pulse. As soon as the consumer receives the notification, the queues are swapped, so while producers enqueue new items, the consumer can process the previous ones.

The sample that I wrote seems to work properly. I think it is also thread-safe, but I'm not sure about that. The following code, for simplicity I used a Queue<int>. I also used (again for simplicity) an ArrayList instead of collection serializable.

public class QueueManager
{
    private readonly int m_QueueMaxSize;
    private readonly TimeSpan m_BackupPeriod;

    private readonly object m_SyncRoot_1 = new object();
    private Queue<int> m_InputQueue = new Queue<int>();
    private bool m_Shutdown;
    private bool m_Pulsed;

    private readonly object m_SyncRoot_2 = new object();
    private ArrayList m_CustomCollection = new ArrayList();

    private Thread m_ConsumerThread;
    private Timer m_BackupThread;
    private WaitHandle m_Disposed;

    public QueueManager()
    {
        m_ConsumerThread = new Thread(Work) { IsBackground = true };

        m_QueueMaxSize = 7;
        m_BackupPeriod = TimeSpan.FromSeconds(30);
    }

    public void Run()
    {
        m_Shutdown = m_Pulsed = false;

        m_BackupThread = new Timer(DoBackup);
        m_Disposed = new AutoResetEvent(false);

        m_ConsumerThread.Start();
    }

    public void Shutdown()
    {
        lock (m_SyncRoot_1)
        {
            m_Shutdown = true;
            Console.WriteLine("Worker shutdown...");

            Monitor.Pulse(m_SyncRoot_1);
        }

        m_ConsumerThread.Join();
        WaitHandle.WaitAll(new WaitHandle[] { m_Disposed });

        if (m_InputQueue != null) { m_InputQueue.Clear(); }
        if (m_CustomCollection != null) { m_CustomCollection.Clear(); }

        Console.WriteLine("Worker stopped!");
    }

    public void Enqueue(int item)
    {
        lock (m_SyncRoot_1)
        {
            if (m_InputQueue.Count == m_QueueMaxSize)
            {
                if (!m_Pulsed)
                {
                    Monitor.Pulse(m_SyncRoot_1); // it notifies the consumer...
                    m_Pulsed = true;
                }
                Monitor.Wait(m_SyncRoot_1);  // ... and waits for Pulse
            }

            m_InputQueue.Enqueue(item);
            Console.WriteLine("{0}    \t {1} >", Thread.CurrentThread.Name, item.ToString("+000;-000;"));
        }
    }

    private void Work()
    {
        m_BackupThread.Change(m_BackupPeriod, TimeSpan.FromMilliseconds(-1));

        Queue<int> m_SwapQueueRef, m_WorkerQueue = new Queue<int>();

        Console.WriteLine("Worker started!");
        while (true)
        {
            lock (m_SyncRoot_1)
            {
                if (m_InputQueue.Count < m_QueueMaxSize && !m_Shutdown) Monitor.Wait(m_SyncRoot_1);

                Console.WriteLine("\nswapping...");
                m_SwapQueueRef = m_InputQueue;
                m_InputQueue = m_WorkerQueue;
                m_WorkerQueue = m_SwapQueueRef;

                m_Pulsed = false;
                Monitor.PulseAll(m_SyncRoot_1); // all producers are notified
            }

            Console.WriteLine("Worker\t      < {0}", String.Join(",", m_WorkerQueue.ToArray()));

            lock (m_SyncRoot_2)
            {
                Console.WriteLine("Updating custom dictionary...");
                foreach (int item in m_WorkerQueue)
                {
                    m_CustomCollection.Add(item);
                }
                Thread.Sleep(1000);
                Console.WriteLine("Custom dictionary updated successfully!");
            }

            if (m_Shutdown)
            {
                // schedule last backup
                m_BackupThread.Change(0, Timeout.Infinite);
                return;
            }
            m_WorkerQueue.Clear();
        }
    }

    private void DoBackup(object state)
    {
        try
        {
            lock (m_SyncRoot_2)
            {
                Console.WriteLine("Backup...");
                Thread.Sleep(2000);
                Console.WriteLine("Backup completed at {0}", DateTime.Now);
            }
        }
        finally
        {
            if (m_Shutdown) { m_BackupThread.Dispose(m_Disposed); }
            else { m_BackupThread.Change(m_BackupPeriod, TimeSpan.FromMilliseconds(-1)); }
        }
    }
}

Some objects are initialized in the Run method to allow you to restart this QueueManager after it is stopped, as shown in the code below.

public static void Main(string[] args)
{
    QueueManager queue = new QueueManager();

    var t1 = new Thread(() =>
    {
        for (int i = 0; i < 50; i++)
        {
            queue.Enqueue(i);
            Thread.Sleep(1500);
        }
    }) { Name = "t1" };

    var t2 = new Thread(() =>
    {
        for (int i = 0; i > -30; i--)
        {
            queue.Enqueue(i);
            Thread.Sleep(3000);
        }
    }) { Name = "t2" };

    t1.Start(); t2.Start(); queue.Run();
    t1.Join(); t2.Join(); queue.Shutdown();
    Console.ReadLine();

    var t3 = new Thread(() =>
    {
        for (int i = 0; i < 50; i++)
        {
            queue.Enqueue(i);
            Thread.Sleep(1000);
        }
    }) { Name = "t3" };

    var t4 = new Thread(() =>
    {
        for (int i = 0; i > -30; i--)
        {
            queue.Enqueue(i);
            Thread.Sleep(2000);
        }
    }) { Name = "t4" };

    t3.Start(); t4.Start(); queue.Run();
    t3.Join(); t4.Join(); queue.Shutdown();
    Console.ReadLine();
}

Upvotes: 0

Views: 688

Answers (1)

Servy
Servy

Reputation: 203820

I would suggest using the BlockingCollection for a producer/consumer queue. It was designed specifically for that purpose. The producers add items using Add and the consumers use Take. If there are no items to take then it will block until one is added. It is already designed to be used in a multithreaded environment, so if you're just using those methods there's no need to explicitly use any locks or other synchronization code.

Upvotes: 2

Related Questions