Sandro
Sandro

Reputation: 41

ReadFromKafka stuck in beam process with Dataflow

I am trying to read from a kafka topic using Apache Beam and Dataflow, print the data to the console and finally write them to a pubsub topic. But it seems to get stuck in the ReadFromKafka function. There are many data written into the kafka topic, but nothing happen in this pipeline when it runs.

import apache_beam as beam
import argparse

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None, save_main_session=True):

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project=sample-project',
    '--region=xxx',
    '--staging_location=gs://xxx',
    '--temp_location=gs://xxx',
    '--job_name=beam-streaming',
    '--worker_machine_type=n1-standard-16',
    '--num_workers=1',
    '--streaming'
])      

class PrintValue(beam.DoFn):
        def process(self, element):
            print(element)
            return [element]

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka( 
            consumer_config={'bootstrap.servers': 'ip:port' },
            topics=['local-events'])
        | 'print' >> beam.ParDo(PrintValue())
        | 'write to pubsub' >> beam.io.WriteToPubSub('projects/sample/topics/test')
        )



if __name__ == '__main__':
 run()            

I know there is an Issue https://issues.apache.org/jira/browse/BEAM-11998 but as i understand it, this problem only belongs to portable runners. Does anybody know if ReadFromKafka is working with unbounded data in Dataflow?

Upvotes: 4

Views: 865

Answers (1)

Yevgeny Khodorkovsky
Yevgeny Khodorkovsky

Reputation: 133

I had a similar issue, and switched to using a beam.Map transform instead (make sure your printValue function is defined within the run function, or you have a proper dependency management method):

| Map(lambda value: printValue(value))

Note that the type of elements you get from ReadFromKafka is an ad hoc class named BeamSchema_xxxxxxxxx, having the following attributes (assuming you configure reader with_metadata=True): 'topic', 'value', 'count', 'headers', 'index', 'key', 'offset', 'partition', 'timestamp', 'timestampTypeId', 'timestampTypeName'. It does't print nice if at all. So you want to decode your values first, for example:

def decode_kafka_message(record) -> str:
"""
Record attributes passed from ReadFromKafka transform:  'topic', 'value'
    'count', 'headers', 'index', 'key', 'offset', 'partition',
    'timestamp', 'timestampTypeId', 'timestampTypeName'.


:return: Message value as string
"""
if hasattr(record, 'value'):
    value = record.value
elif isinstance(record, tuple):
    value = record[1]
else:
    raise RuntimeError('unknown record type: %s' % type(record))

return value.decode("UTF-8") if isinstance(value, bytes) else value

That connector could use some work and better docs.

Upvotes: 1

Related Questions