Reputation: 51
I've built a windowed streaming Dataflow pipeline with the Python SDK (Apache Beam Python 3.7 SDK 2.19.0). A representation of the initial data is:
| Phone Number | Call length |
|--------------|-------------|
| 1234 | 6 |
| 1234 | 2 |
| 5678 | 5 |
The idea is to find the mean length of phone-call for the number in each row for the given window. The data is read in as lines of a CSV from Pub/Sub and I add a value to all the rows corresponding to the mean call length of the number:
| Phone Number | Call length | mean call length |
|--------------|-------------|------------------|
| 1234 | 6 | 4 |
| 1234 | 2 | 4 |
| 5678 | 5 | 5 |
I use the following pipeline:
with beam.Pipeline(options=pipeline_options) as pipeline:
calls = (pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=input_sub)
| 'byte_to_string' >> beam.Map(lambda x: x.decode("utf-8"))
| 'windows' >> beam.WindowInto(window.FixedWindows(10))
)
mean_call_length = (calls
| 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
| 'mean_call_length_per_number' >> beam.combiners.Mean.PerKey()
)
recombine = (calls
| 'Create dictionary from raw string' >> beam.ParDo(SplitToDict())
| 'Add mean' >> beam.FlatMap(combine_calcs,pvalue.AsList(mean_call_length))
| 'encode to bytes' >> beam.Map(lambda x: str(x).encode())
| 'write to output topic' >> beam.io.WriteToPubSub(topic=output_topic)
)
This works fine locally (with DirectRunner) but fails when ran in GCP (DataflowRunner). It also seems to work fine when I only calculate only 1 of the number frequency or mean call length.
I can see a java exception in the Dataflow logs that contains:
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
Which looks like an end-of-file exception related to streaming.
The pipeline is visualised in Dataflow here:
Any ideas?
Upvotes: 0
Views: 656
Reputation: 51
I worked around this issue by casting the results of the mean calculations into integers by changing the pipeline:
...
mean_call_length = (calls
| 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
| 'mean_call_length_per_number' >> beam.combiners.Mean.PerKey())
| 'convert_mean_to_int' >> beam.Map(lambda elem: (elem[0],int(elem[1])))
...
It seems there was some typing problem between the Python SDK and the underlying Java code; the Java code seems to expect element[1] to be under a certain number of bytes, which is exceeded if you submit a float via the Python SDK.
Upvotes: 1