Jonathan Sylvester
Jonathan Sylvester

Reputation: 1329

How does Hazelcast Jet assign task-to-CPU priority?

If I have the following code and let's say I'm running on 10 nodes of 32 cores each:

IList<...> ds = ....; //large collection, eg 1e6 elements

ds
 .map() //expensive computation
 .flatMap()//generates 10,000x more elements for every 1 incoming element
 .rebalance()
 .map() //expensive computation
 ....//other transformations (ie can be a sink, keyby, flatmap, map etc)

What will Hazelcast do with respect to task-to-CPU assignment priority when the SECOND map operation wants to process 10,000 elements that was generated from the 1st original element? Will it devote the 320 CPU cores (from 10 nodes) to processing the 1st original element's 10,000 generated elements? If so, will it "boot off" already running tasks? Or, will it wait for already running tasks to complete, and then give priority to the 10,000 elements resulting from the output of the flatmap-rebalance operations? Or, would the 10,000 elements be forced to run on a single core, since the remaining 319 cores are already being consumed by the output of the ds operation (ie the input of the 1st map). Or, is there some random competition for who gets access to the CPU cores?

What I would ideally like to have happen is that a) Hazelcast does NOT boot off running tasks (it lets them complete), but when deciding which tasks gets priority to run on a core, it chooses the path that would lead to the lowest latency, ie it would process all 10,000 elements which result from the output of the flatmap-rebalance operation on all 320 cores.

Note: I asked a virtually identical question to Flink a few weeks ago, but have since switched to trying out Hazelcast: How does Flink (in streaming mode) assign task-to-CPU priority?

Upvotes: 0

Views: 61

Answers (1)

Oliv
Oliv

Reputation: 10812

First, IList is a non-distributed data structure, all its data are stored on a single node. The IList source therefore produces all data on that node. So the 1st expensive map will be all done on that member, but map is backed, by default by as many workers as there are cores, so 32 workers in your case.

The rebalance stage will cause that the 2nd map is run on all members. Each of the 10,000 elements produced in the 1st map is handled separately, so if you have 1 element in your IList, the 10k elements produced from it will be processed concurrently by 320 workers.

The workers backing different stages of the pipeline compete for cores normally. There will be total 96 workers for the 1st map, 2nd map and for the flatMap together. Jet uses cooperative scheduling for these workers, which means it cannot preempt the computation if it's taking too long. This means that one item taking a long time to process will block other workers.

Also keep in mind that the map and flatMap functions must be cooperative, that means they must not block (by waiting on IO, sleeping, or by waiting for monitors). If they block, you'll see less than 100% CPU utilization. Check out the documentation for more information.

Upvotes: 0

Related Questions