Reputation: 300
I have one broadcast function in Flink that accepts two kinesis streams, one for the element A and one for broadcast element B. I noticed that all element A goes into one task slot even if I have already set the env parallelism to 4.
here is the main process function:
env.setParallelism(4);
BroadcastStream<ElementBroadcast> elementBroadcastStream =
env.addSource(elementBroadcastSource)
.uid("element-broadcast")
.name("broadcast")
.setParallelism(4)
.returns(ElementB.class)
.broadcast(Descriptors.ELEMENT_B_DESCRIPTORS);
DataStream<ElementA> elementAStream =
elementASourceStream
.connect(elementBroadcastStream)
.process(injector.getInstance(
ElementAElementBProcessFunction.class))
.uid("");
The strange thing is when I check the Flink job or read the metrics I added inside the ElementAElementBProcessFunction, only the metrics in processBroadcastElement() confirms that all 4 task slots can received Element B, the processElement() works like a single thread function and you can also see it from the attached screenshots all the records(Element A) are received on slot 3. The other three slots receives 2 broadcast elements(Element B) from my application, but no element A at all.
Does any one know why multi slots parallelism only appears inside the processBroadcastElement() but not processElement()?
Thank you!
Upvotes: 0
Views: 225
Reputation: 40
This might because the partition of soucre A is 1, you can check it on your AWS Management Console or use rebalance or rescale before process. As for element B, you broadcast it, this guarantees that all elements go to all downstream tasks.
Upvotes: 1