Minhaj Shakeel
Minhaj Shakeel

Reputation: 21

Dataflow python pickeling issue

I have written a simple dataflow program that takes input from a pub/sub topic and calculates the fibanacci number for that integer. However, my DoFn is not able to pickle the custom function fibonacci which is giving me errors when running on Dataflowrunner. Can someone help me in telling me what I am doing wrong? Below is my pipeline code.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions

class Fibonacci(beam.DoFn):
    def fibonacci(self, n):
        if n < 2:
            return n
        else:
            return self.fibonacci(n-1) + self.fibonacci(n-2)
    
    def process(self, element, fib):
        import json
        # do some processing
        n = int(json.loads(element.data))
        # call fibonnaci
        return [fib(n)]

def Print(n):
    print(n)

if __name__ == "__main__":
    
    input_subscription = 'projects/consumerresearch/subscriptions/test-user-sub'
    
    options = PipelineOptions()
    options.view_as(StandardOptions).streaming=True
    options.view_as(SetupOptions).save_main_session = True
    
    p = beam.Pipeline(options=options)

    raw_pubsub_data = (
        p | 'Read from topic' >> beam.io.ReadFromPubSub(subscription=input_subscription, with_attributes=True)
    )
    
    output = raw_pubsub_data | beam.ParDo(Fibonacci()) | beam.Map(Print)
    
    result = p.run()
    result.wait_until_finish()

Upvotes: 2

Views: 121

Answers (1)

Alec
Alec

Reputation: 509

The signature of process should be this:

process(self, element):

Your implementation has a 3rd parameter fib; Beam would not know what to pass for this. Change your implementation to reference self.fibonacci?

https://beam.apache.org/documentation/programming-guide/#pardo

Upvotes: 1

Related Questions