Philip M
Philip M

Reputation: 51

Dataflow fails when combining multiple side inputs in streaming pipeline

I've built a windowed streaming Dataflow pipeline with the Python SDK (Apache Beam Python 3.7 SDK 2.19.0). A representation of the initial data is:

| Phone Number | Call length |
|--------------|-------------|
| 1234         | 6           |
| 1234         | 2           |
| 5678         | 5           |

The idea is to find the mean length of phone-call for the number in each row for the given window. The data is read in as lines of a CSV from Pub/Sub and I add a value to all the rows corresponding to the mean call length of the number:

| Phone Number | Call length | mean call length |
|--------------|-------------|------------------|
| 1234         | 6           | 4                |
| 1234         | 2           | 4                |
| 5678         | 5           | 5                |

I use the following pipeline:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        calls = (pipeline
             | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=input_sub)
             | 'byte_to_string' >> beam.Map(lambda x: x.decode("utf-8"))
             | 'windows' >> beam.WindowInto(window.FixedWindows(10))   
            )

        mean_call_length = (calls
             | 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
             |  'mean_call_length_per_number' >> beam.combiners.Mean.PerKey()
            )

        recombine = (calls
              | 'Create dictionary from raw string' >> beam.ParDo(SplitToDict())
              |  'Add mean' >> beam.FlatMap(combine_calcs,pvalue.AsList(mean_call_length))
              | 'encode to bytes' >> beam.Map(lambda x: str(x).encode())
              | 'write to output topic' >> beam.io.WriteToPubSub(topic=output_topic)
            )

This works fine locally (with DirectRunner) but fails when ran in GCP (DataflowRunner). It also seems to work fine when I only calculate only 1 of the number frequency or mean call length.

I can see a java exception in the Dataflow logs that contains:

Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException 

Which looks like an end-of-file exception related to streaming.

The pipeline is visualised in Dataflow here:

enter image description here

Any ideas?

Upvotes: 0

Views: 656

Answers (1)

Philip M
Philip M

Reputation: 51

I worked around this issue by casting the results of the mean calculations into integers by changing the pipeline:

...
mean_call_length = (calls
             | 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
             |  'mean_call_length_per_number' >> beam.combiners.Mean.PerKey())
             | 'convert_mean_to_int' >> beam.Map(lambda elem: (elem[0],int(elem[1])))
...

It seems there was some typing problem between the Python SDK and the underlying Java code; the Java code seems to expect element[1] to be under a certain number of bytes, which is exceeded if you submit a float via the Python SDK.

Upvotes: 1

Related Questions