user1254053
user1254053

Reputation: 765

c# data not processing correctly in multithreading

I have a product dropdown and selecting the product connects to a websocket and get the feed messages for that product. Once the (1) feed messages starts coming I have to then (2) get the order book and then (3) process the feed messages. So the first and last task would run asynchronously. For this I have written the following code:

void OnReceivingFeedMessage()
{
     concurrentQueue.Enqueue(message);

     if (!messageStreamStarted)   // only first time get order book
     {
         messageStreamStarted = true;
         GetOrderBookData();
     }
}

private void GetOrderBookData()
{
    MarketData m = new MarketData();
    ProductOrderBook p = m.GetProductOrderBook(productId);           
    bidsList = p.bids;
    asksList = p.asks;
    isOrderBookUpdated = true;

    Task task3 = Task.Run(() => KickStartToProcessQueue());         
}

private void KickStartToProcessQueue()
{
    while (threadProcessQueueExist)
    {
        int recordCountNew = concurrentQueue.Count();
        if (recordCountNew != 0)
        {
            if (isOrderBookUpdated)
            {
                ProcessQueueMessages();
            }
        }
    }
}

private void ProcessQueueMessages()
{
    if (!concurrentQueue.IsEmpty)
    {
        string jsonString;
        while (concurrentQueue.TryDequeue(out jsonString))
        {
          // have to insert the record in existing order book
        }
     }
}

This works perfectly for the first time. But when I change the product and reconnect the things mess up and the data is not processed properly. The code written on product selectedindex change

private void CloseAndReconnectToGetWebsocketFeed()
{ 
    w.CloseWebsocketConnection();                 
    messageStreamStarted = false;            
    isOrderBookUpdated = false;
    ConcurrentQueue<string> wssMessagesQueue = new ConcurrentQueue<string>();
    concurrentQueue = wssMessagesQueue;

    ConnectAndGetWebsocketFeedMessages(); // this calls OnReceivingFeedMessage
}

I am new to multi-threading so not sure if I need to use lock or async/await or something else. What am I doing wrong in the above code?

It is running fine when run first time but the moment I change the product and do the same processing again it starts giving problems. Can someone please advise how can I clear all the resources before doing the same steps again and again

Upvotes: 0

Views: 97

Answers (1)

georch
georch

Reputation: 1444

I think you are writing unneccesarily complicated code. I'm not 100 % sure, what your problem is, but here are some things that might help you.

Use a BlockingCollection<T>

With that class, you can stop your consumer-thread until new messages are coming in. Here's a simple example on how these things are working:

BlockingCollection<string> collection = new BlockingCollection<string>(new ConcurrentQueue<string>());
Task t = Task.Run(() =>
{
    while (collection.TryTake(out string item, Timeout.Infinite))
    {
        Console.WriteLine($"Started reading {item}...");
        Thread.Sleep(1000); //simulate intense work
        Console.WriteLine($"Done reading {item}");
    }
});
while (true)
{
    //This could be your OnReceivingFeedMessage()
    string input = Console.ReadLine();
    if (input == "stop")
    {
        Console.WriteLine("Stopping...");
        collection.CompleteAdding();
        break;
    }
    else
    {
        collection.Add(input);
    }
}
t.Wait();

The task t will wait until there are items in collection. When items are "received" (here simply via console input), they are added to your list.

Dispatch new tasks to work on the input

Pretty simple:

while (true)
{
    string item = Console.ReadLine();
    Task.Run(() =>
    {
        Console.WriteLine($"Started reading {item}...");
        Thread.Sleep(1000); //simulate intense work
        Console.WriteLine($"Done reading {item}");
    });
}

This also has the advantage (or disadvantage) that the tasks are running all in parallel. That means that you can't rely on the order they are worked on, but they will process much faster.

By the way: Both of these approaches have the advantage that you don't have busy waiting. From your question:

while (threadProcessQueueExist)
{
    int recordCountNew = concurrentQueue.Count();
    if (recordCountNew != 0)
    {
        if (isOrderBookUpdated)
        {
            ProcessQueueMessages();
        }
    }
}

This code will create busy waiting as long as nothing is in your queue, meaning that one core of your processor will be at very high load without actually doing anything. It is considered to be bad practice.

Upvotes: 1

Related Questions