Himansu
Himansu

Reputation: 21

Priority Producer Consumer

I am currently working on a priority workflow use case where I have to implement the producer consumer logic. The use case is - Different sets of jobs which are classified into 3 types that go into the blocking queue, and there are 3 threads or a thread group for consuming.

Jobs in the Queue

a1, a2, a3...an, b1,b2,b3...bn c1,c2,c3...cn, d1..., e1.....

Consumer Thread

CT1, CT2, CT3

My problem is how can I co-ordinate this Consumer Thread or Group so that:

CT1 process a1-an jobs

CT2 process b1-bn jobs

CT3 process c1-cn jobs

. . . . and more threads for a new set of jobs.

Please provide any pointers for the approach.

Upvotes: 0

Views: 901

Answers (2)

Soana
Soana

Reputation: 711

You could also distribute the jobs to different queues and let the different threads or threadpools look in the different queues for jobs.

Jobs in queue a: a1, a2, ..., an

Jobs in queue b: b1, b2, ..., bn

Jobs in queue c: c1, c2, ..., cn

...

Threads, which are executing jobs: CT1, CT2, CT3

CT1 is executing jobs from queue a, CT2 executes jobs from queue b, CT3 executes jobs from queue c.

Upvotes: 1

tariksbl
tariksbl

Reputation: 1119

If I understand the problem, you want a worker thread or pool bound to a blockingqueue, and this thread-queue pair to be created on demand as new work types are added.

If this is the case you could

  • Define a WorkHandler class that pairs a blockingqueue with a thread pool, configuring the thread pool as a consumer of the queue
  • Use guava LoadingCache or java 8 ConcurrentHashMap (using .computeIfAbsent()) to get or lazy-create a WorkHandler for a given work type
  • To process each new work item, lookup its work type in the cache, and add the item to the WorkHandler it provides.

Upvotes: 0

Related Questions