Reputation: 2922
I have a spring application that should process and store a socket incoming data, because of bottle neck problems this should be done with multi threads.
Incoming data belongs to many entities and each entity tasks should process serially, but I think assigning a single thread to each entity is not a good solution (thousands of single threads to process each entity's queue)
So how can I define a public ThreadPool
to process all entities's queues with a fare algorithm?
Upvotes: 2
Views: 1061
Reputation: 3072
You can use Project reactor or RxJava to split incoming messages stream by groups and to process events in every group serially.
With Project Reactor your code can looks like:
Scheduler groupScheduler = Schedulers.newParallel("groupByPool", 16);
Flux.fromStream(incomingMessages()) // stream of new data from socket
.groupBy(Message::getEntityId) // split incoming messages by groups, which should be processed serially
.map(g -> g.publishOn(groupScheduler)) //create new publisher for groups of messages
.subscribe( //create consumer for main stream
stream ->
stream.subscribe(this::processMessage) // create consumer for group stream and process messagaes
);
Upvotes: 0
Reputation: 2092
You've described a perfect problem to solve with a message-driven architecture.
Spring Integration is the module which provides this for you.
You can build your task services and annotate with @ServiceActivator and create your chain with Channels.
The channels can have options to execute on a different thread pool, and bottlenecks due to spike load can be overcome with queue settings on your channel.
Definitely worth a try to check out the documentation of Spring Integration.
Upvotes: 2