Reputation: 1329
We have a custom combine function (on beam sdk 2.0) in which the millions of objects get accumulated but they do NOT necessarily get reduced....that is, they sometimes get added to a List such that eventually, the List might get quite large (hundreds of megabytes, even gigabytes).
To minimize the problem of having to "pass around" these objects (during merging of accumulators) between nodes, we've created a SINGLE giant node (of 64 cores, tonnes of RAM).
So, in "theory", dataflow does not need to serialize the List object (and any of these big objects in the List) even during "merge accumulator" operations, since all the objects are on the same node. But, does dataflow still serialize even if all the objects of interest are on the same node or is it smart enough to know that an object is on the same node vs separate nodes?
Ideally, when objects are on same node, we can just pass around references to the objects (rather than serializing/deserializing the contents of these objects, which can be very very large.) (I understand, of course, than when dealing with multiple nodes, there's no choice but to serialize/deserialize since the data has to be passed around somehow; but within a node, is beam sdk 2.0 smart enough to not serialize/deserialize during these combine functions, group by's etc.?)
Upvotes: 2
Views: 598
Reputation: 6023
The Dataflow service aggressively optimizes your pipeline to avoid needless serialization. The optimization you are interested in is fusion, described here in the Dataflow documentation. When data moves through a fused "stage" (a sequence of low-level instructions roughly corresponding to steps in your input pipeline), it is not serialized and deserialized.
However, if your CombineFn
builds a list, and that list grows large, you should try to rephrase your pipeline to use a raw GroupByKey
. Another important optimization is "combiner lifting" or "mapper-side combine" where your CombineFn
is applied per-key locally prior to shuffling your data between machines, based on the assumption that the accumulator will be smaller than just a list of elements. So the whole list will be serialized, shuffled, and deserialized prior to completing the Combine
transform. If, instead, you use a GroupByKey
directly, your elements would be much more efficiently streamed, without serializing an entire list.
I should note that Beam's other runners also perform standard fusion optimization and others. These all generally come from functional programming work in the late 80s / early 90s and was applied to distributed data processing in FlumeJava, circa 2010, so it is a baseline expectation now.
Upvotes: 0