Dimitri
Dimitri

Reputation: 7013

Indefinitely running Producer/Consumer application

Problem I'm tasked to resolve is (from my understanding) a typical producer/consumer problem. We have data incoming 24/7/365. The incoming data (call it raw data) is stored in a table and is unusable for the end user. We then select all raw data that has not been processed and start processing one by one. After each unit of data is processed, its stored in another table and is now ready to be consumed by the client application. The process from loading the raw data till persisting processed data takes 2 - 5 seconds on average. But its highly dependent on the third party web services that we use to process the data. If the web services are slow, we are no longer processing data as fast as we're getting it in and accumulate backlog, hence causing our customers to loose live feed. We want to make this process a multithreaded one. From my research I can see that the process can be divided into three discreet parts:

  1. LOADING - A loader task (producer) that runs indefinitely and loads unprocessed data from DB to BlockingCollection<T> (or some other variation of a concurrent collection). My choice of BlockingCollection is due to the fact that it is designed with Producer/Consumer pattern in mind and offers GetConsumingEnumerable() method.

  2. PROCESSING - Multiple consumers that consume data from the above BlockingCollection<T>. In its current implementation I have a Parallel.ForEach loop through GetConsumingEnumerable() that on each iteration starts a task with two task continuations: First step of the task is to call a third party web service, wait for the result and output the result for the second task to consume. Second task does calculations based on the first task's output and outputs the result for the third task, which basically just stores that result into the second BlockingCollection<T> (this one being an output collection). So my consumers are effectively producers too. Ideally each unit of data that has been loaded by the task 1 would be queued for processing in parallel.

  3. PERSISTING - A single consumer runs against the second BlockingCollection mentioned above and persists processed data into database.

Problem I'm facing is the item number 2 from the list above. It does not seem to be fast enough (just by using Parallel.ForEach). I tried inside Parallel.ForEach instead of directly starting a task with continuation, start a wrapping thread that will in turn start the processing task. But this caused OutOfMemory exception, because thread count went out of control and reached 1200 very soon. I also tried scheduling work using ThreadPool with no avail.

Could you please advise if my approach is good enough for what we need done, or is there a better way of doing it?

Upvotes: 0

Views: 1274

Answers (2)

Samy S.Rathore
Samy S.Rathore

Reputation: 1823

I recently faced a problem which was very much similar to yours, Here's what i did, hope it might help:

  1. It seems like your 1st and 3rd part are rather simple, and can be managed on their respective threads without any problem,
  2. The 2nd part must firstly be started on a new thread, Then use System.Threading.timer, to make your web-service calls, the method that calls the web-service passes the response(result) to the processing method by Invoking it asynchronously and letting it process the data at it's own pace,

this solved my problem, i hope it helps you too, if any doubts ask me, i'll explain it here...

Upvotes: 2

Random Dev
Random Dev

Reputation: 52280

If the bottleneck is some 3rd party service and this will not handle parallel execution but will queue your request then you cannot do a thing.

But first you can try this:

  • use the ThreadPool or Tasks (those will use ThreadPool too) - don't fire up Threads yourself
  • try to make your request async instead of using the thread exclusively
  • run your service/app through an performance profiler and check where you are "wasting" your time
  • make a spike/check for the 3rd party service and see how it handles parallel requests
  • think about caching the answers from this service (if possible)

That's all I can think of without further info right now.

Upvotes: 3

Related Questions