Reputation: 6342
I am reading at https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/overview/#keyby
It says:
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning.
What does logically
mean here? I think they are physically partitioned.
Take Source->keyBy->Process
for an example
SourceFunction.setParallism(2).keyBy(._id).process(ProcessFunction).setParallism(3)
The process operator has 3 subtasks, then the data in source will be routed to the corresponding partition of process operator based on its id (data shuffle)?
Is there difference between keyBy
and partitionCustom
(which is said to be physically partitioned). Both of them involve shuffle.
Upvotes: 2
Views: 519
Reputation: 9255
You are correct that ultimately a .keyBy()
has to determine which slot in your TMs will receive a given record, based on its key. The difference is that .key()
first goes through a "logical partitioning", where each key is assigned to a key group index, which doesn't depend on the specific parallelism of the downstream operator, only the max parallelism you've set for your job.
Then the key group index is mapped to an "operator index" (aka subtask index), which then gets mapped to a slot on a Task Manager.
When you do a .partitionCustom()
, you are in control of the operator index that is computed from the key. But because you aren't basing the index on key groups, you don't wind up with a KeyedStream, and thus you don't have keyed state/timers.
Upvotes: 2