Reputation: 516
The Apache Beam programming guide contains the following rule:
3.2.2. Immutability
A PCollection is immutable. Once created, you cannot add, remove, or change individual elements. A Beam Transform might process each element of a PCollection and generate new pipeline data (as a new PCollection), but it does not consume or modify the original input collection.
Does this mean I cannot, must not, or should not modify individual elements in a custom transform?
Specifically, I am using the python SDK and considering the case of a transform that takes a dict {key: "data"}
as input, does some processing and adds further fields {other_key: "some more data"}
.
My interpretation of rule 3.2.2 above is that I should so something like
def process(self,element):
import copy
output = copy.deepcopy(element)
output[other_key] = some_data
yield output
but I am wondering if this may be a bit overkill.
Using a TestPipeline, I found that the elements of the input collection are also modified if I act on them in the process()
method (unless the elements are basic types such as int, float, bool...).
Is mutating elements considered an absolute no-go, or just a practice one has to be careful with ?
Upvotes: 4
Views: 3214
Reputation: 17913
Mutating elements is an absolute no-go, and it can and will lead to violations of the Beam model semantics, i.e. to incorrect and unpredictable results. Beam Java direct runner intentionally detects mutations and fails pipelines that do that - this is not yet implemented in Python runner, but it should be.
The reason for that is, primarily, fusion. Eg. imagine that two DoFn's are applied to the same PCollection "C" (f(C) and g(C) - not f(g(C))), and a runner schedules them to run in the same shard. Imagine the first DoFn modifies the element, then by the time the second DoFn runs, the element has been changed - i.e. the second DoFn is not really being applied to "C". There are a number of other scenarios where mutations will lead to incorrect results.
Upvotes: 15