Reputation: 21
I have written a simple dataflow program that takes input from a pub/sub topic and calculates the fibanacci
number for that integer. However, my DoFn is not able to pickle the custom function fibonacci
which is giving me errors when running on Dataflowrunner. Can someone help me in telling me what I am doing wrong?
Below is my pipeline code.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions
class Fibonacci(beam.DoFn):
def fibonacci(self, n):
if n < 2:
return n
else:
return self.fibonacci(n-1) + self.fibonacci(n-2)
def process(self, element, fib):
import json
# do some processing
n = int(json.loads(element.data))
# call fibonnaci
return [fib(n)]
def Print(n):
print(n)
if __name__ == "__main__":
input_subscription = 'projects/consumerresearch/subscriptions/test-user-sub'
options = PipelineOptions()
options.view_as(StandardOptions).streaming=True
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
raw_pubsub_data = (
p | 'Read from topic' >> beam.io.ReadFromPubSub(subscription=input_subscription, with_attributes=True)
)
output = raw_pubsub_data | beam.ParDo(Fibonacci()) | beam.Map(Print)
result = p.run()
result.wait_until_finish()
Upvotes: 2
Views: 121
Reputation: 509
The signature of process
should be this:
process(self, element):
Your implementation has a 3rd parameter fib
; Beam would not know what to pass for this. Change your implementation to reference self.fibonacci
?
https://beam.apache.org/documentation/programming-guide/#pardo
Upvotes: 1