Reputation: 7013
Problem I'm tasked to resolve is (from my understanding) a typical producer/consumer problem. We have data incoming 24/7/365. The incoming data (call it raw data) is stored in a table and is unusable for the end user. We then select all raw data that has not been processed and start processing one by one. After each unit of data is processed, its stored in another table and is now ready to be consumed by the client application. The process from loading the raw data till persisting processed data takes 2 - 5 seconds on average. But its highly dependent on the third party web services that we use to process the data. If the web services are slow, we are no longer processing data as fast as we're getting it in and accumulate backlog, hence causing our customers to loose live feed. We want to make this process a multithreaded one. From my research I can see that the process can be divided into three discreet parts:
LOADING - A loader task (producer) that runs indefinitely and loads unprocessed data from DB to BlockingCollection<T>
(or some other variation of a concurrent collection). My choice of BlockingCollection
is due to the fact that it is designed with Producer/Consumer pattern in mind and offers GetConsumingEnumerable()
method.
PROCESSING - Multiple consumers that consume data from the above BlockingCollection<T>
. In its current implementation I have a Parallel.ForEach
loop through GetConsumingEnumerable()
that on each iteration starts a task with two task continuations: First step of the task is to call a third party web service, wait for the result and output the result for the second task to consume. Second task does calculations based on the first task's output and outputs the result for the third task, which basically just stores that result into the second BlockingCollection<T>
(this one being an output collection). So my consumers are effectively producers too. Ideally each unit of data that has been loaded by the task 1 would be queued for processing in parallel.
PERSISTING - A single consumer runs against the second BlockingCollection
mentioned above and persists processed data into database.
Problem I'm facing is the item number 2 from the list above. It does not seem to be fast enough (just by using Parallel.ForEach
). I tried inside Parallel.ForEach
instead of directly starting a task with continuation, start a wrapping thread that will in turn start the processing task. But this caused OutOfMemory exception, because thread count went out of control and reached 1200 very soon. I also tried scheduling work using ThreadPool with no avail.
Could you please advise if my approach is good enough for what we need done, or is there a better way of doing it?
Upvotes: 0
Views: 1274
Reputation: 1823
I recently faced a problem which was very much similar to yours, Here's what i did, hope it might help:
this solved my problem, i hope it helps you too, if any doubts ask me, i'll explain it here...
Upvotes: 2
Reputation: 52280
If the bottleneck is some 3rd party service and this will not handle parallel execution but will queue your request then you cannot do a thing.
But first you can try this:
That's all I can think of without further info right now.
Upvotes: 3