George 2.0 Hope
George 2.0 Hope

Reputation: 648

Thread-safe buffer for .NET

(Note: Though I would like ideas for the future for .Net 4.0, I'm limited to .Net 3.5 for this project.)

I have a thread, which is reading data asynchronously from an external device (simulated in the code example by the ever-so-creative strSomeData :-) and storing it in a StringBuilder 'buffer' (strBuilderBuffer :-)

In the 'main code' I want to 'nibble' at this 'buffer'. However, I am unsure as to how to do this in a thread safe manner, from a 'operational' perspective. I understand it is safe from a 'data' perspective, because according to msdn, "Any public static members of this (StringBuilder) type are thread safe. Any instance members are not guaranteed to be thread safe." However, my code below illustrates that it is possibly not thread-safe from an 'operational' perspective.

The key is that I'm worried about two lines of the code:

string strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
// Thread 'randomly' slept due to 'inconvenient' comp resource scheduling...
ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;

if the computer OS sleeps my thread between the 'reading' of the buffer & the 'clearing' of the buffer, I can lose data (which is bad :-(

Is there any way to guarantee the 'atomocy?' of those two lines & force the computer to not interrupt them?

With respect to Vlad's suggestion below regarding the use of lock, I tried it but it didn't work (at all really):

    public void BufferAnalyze()
    {
        String strCurrentBuffer;
        lock (ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer)
        {
            strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
            Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept due to comp resource scheduling");
            Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...
            ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;
        }
        Console.WriteLine("[BufferAnalyze()]\r\nstrCurrentBuffer[{0}] == {1}", strCurrentBuffer.Length.ToString(), strCurrentBuffer);
    }

Is there a better way of implementing a thread safe buffer?

Here's the full code:

namespace ExploringThreads
{
    /// <summary>
    /// Description of BasicThreads_TestThreadSafety_v1a
    /// </summary>
    class ThreadWorker_TestThreadSafety_v1a
    {
        private Thread thread;
        public static StringBuilder strBuilderBuffer = new StringBuilder("", 7500);
        public static StringBuilder strBuilderLog = new StringBuilder("", 7500);

        public bool IsAlive
        {
            get { return thread.IsAlive; }
        }

        public ThreadWorker_TestThreadSafety_v1a(string strThreadName)
        {
            // It is possible to have a thread begin execution as soon as it is created.
            // In the case of MyThread this is done by instantiating a Thread object inside MyThread's constructor.
            thread = new Thread(new ThreadStart(this.threadRunMethod));
            thread.Name = strThreadName;
            thread.Start();
        }

        public ThreadWorker_TestThreadSafety_v1a() : this("")
        {
            //   NOTE:  constructor overloading ^|^
        }

        //Entry point of thread.
        public void threadRunMethod()
        {
            Console.WriteLine("[ThreadWorker_TestThreadSafety_v1a threadRunMethod()]");
            Console.WriteLine(thread.Name + " starting.");
            int intSomeCounter = 0;
            string strSomeData = "";
            do
            {
                Console.WriteLine("[ThreadWorker_TestThreadSafety_v1a threadRunMethod()] running.");
                intSomeCounter++;
                strSomeData = "abcdef" + intSomeCounter.ToString() + "|||";
                strBuilderBuffer.Append(strSomeData);
                strBuilderLog.Append(strSomeData);
                Thread.Sleep(200);
            } while(intSomeCounter < 15);

            Console.WriteLine(thread.Name + " terminating.");
        }
    }
    /// <summary>
    /// Description of BasicThreads_TestThreadSafety_v1a.
    /// </summary>
    public class BasicThreads_TestThreadSafety_v1a
    {
        public BasicThreads_TestThreadSafety_v1a()
        {
        }

        public void BufferAnalyze()
        {
            string strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
            Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept due to comp resource scheduling");
            Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...
            ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;
            Console.WriteLine("[BufferAnalyze()]\r\nstrCurrentBuffer[{0}] == {1}", strCurrentBuffer.Length.ToString(), strCurrentBuffer);
        }

        public void TestBasicThreads_TestThreadSafety_v1a()
        {
            Console.Write("Starting TestBasicThreads_TestThreadSafety_v1a  >>>  Press any key to continue . . . ");
            Console.Read();

            // First, construct a MyThread object.
            ThreadWorker_TestThreadSafety_v1a threadWorker_TestThreadSafety_v1a = new ThreadWorker_TestThreadSafety_v1a("threadWorker_TestThreadSafety_v1a Child");

            do
            {
                Console.WriteLine("[TestBasicThreads_TestThreadSafety_v1a()]");
                Thread.Sleep(750);
                BufferAnalyze();
                //} while (ThreadWorker_TestThreadSafety_v1a.thread.IsAlive);
            } while (threadWorker_TestThreadSafety_v1a.IsAlive);
            BufferAnalyze();
            Thread.Sleep(1250);
            Console.WriteLine("[TestBasicThreads_TestThreadSafety_v1a()]");
            Console.WriteLine("ThreadWorker_TestThreadSafety_v1a.strBuilderLog[{0}] == {1}", ThreadWorker_TestThreadSafety_v1a.strBuilderLog.Length.ToString(), ThreadWorker_TestThreadSafety_v1a.strBuilderLog);

            Console.Write("Completed TestBasicThreads_TestThreadSafety_v1a  >>>  Press any key to continue . . . ");
            Console.Read();
        }
    }
}

Upvotes: 4

Views: 8024

Answers (3)

Brian Gideon
Brian Gideon

Reputation: 48949

Download the Reactive Extensions backport for 3.5 here. There is also a NuGet package for it. After you have it downloaded then just reference System.Threading.dll in your project.

Now you can use all of the new concurrent collections standard in .NET 4.0 within .NET 3.5 as well. The best one for your situation is the BlockingCollection. It is basically a buffer that allows threads to enqueue items and dequeue them like a normal queue. Except that the dequeue operation blocks until an item is available.

There is no need to use the StringBuilder class at all now. Here is how I would refactor your code. I tried to keep my example short so that it is easier to understand.

public class Example
{
  private BlockingCollection<string> buffer = new BlockingCollection<string>();

  public Example()
  {
    new Thread(ReadFromExternalDevice).Start();
    new Thread(BufferAnalyze).Start();
  }

  private void ReadFromExteneralDevice()
  {
    while (true)
    {
      string data = GetFromExternalDevice();
      buffer.Add(data);
      Thread.Sleep(200);
    }
  }

  private void BufferAnalyze()
  {
    while (true)
    {
      string data = buffer.Take(); // This blocks if nothing is in the queue.
      Console.WriteLine(data);
    }
  } 
}

For future reference the BufferBlock<T> class from the TPL Data Flow library will do basically the same thing as BlockingCollection. It will be available in .NET 4.5.

Upvotes: 6

Vlad
Vlad

Reputation: 35594

Using StringBuffer is not thread safe, but you can switch to ConcurrentQueue<char>.

In case you need other data structure, there are other thread-safe collections in .NET 4, see http://msdn.microsoft.com/en-us/library/dd997305.aspx.


Edit: in .NET 3.5 there are less synchronization primitives. You can make a simple solution by adding a lock around Queue<char>, though it will be less efficient than the .NET 4's ConcurrentQueue. Or use the same StrignBuffer, again with locking reading/writing operations:

public static StringBuilder strBuilderBuffer = new StringBuilder("", 7500);
private object BufferLock = new object();

...

lock (BufferLock)
    strBuilderBuffer.Append(strSomeData);

...

string strCurrentBuffer;
lock (BufferLock)
{
    strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
    ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Clear();
}
Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept ...");
Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...

Edit:

You cannot guarantee that the OS won't suspend your working thread which is holding the lock. However the lock guarantees that the other threads will be unable to interfere and change the buffer as long as one thread is processing it.

That's why your time of holding the lock should be as short as possible:

  • taken the lock, added data, released the lock, -or-
  • taken the lock, copied data, emptied the buffer, released the lock, started processing the copied data.

Upvotes: 3

jjxtra
jjxtra

Reputation: 21140

If you are doing a lot of reads out of the buffer, perhaps this will help:

http://msdn.microsoft.com/en-us/library/system.threading.readerwriterlock.aspx

Multiple readers are possible, but only one writer.

It is available in .NET 1.X and up...

Upvotes: 1

Related Questions