Reputation: 41
I am trying to read from a kafka topic using Apache Beam and Dataflow, print the data to the console and finally write them to a pubsub topic. But it seems to get stuck in the ReadFromKafka function. There are many data written into the kafka topic, but nothing happen in this pipeline when it runs.
import apache_beam as beam
import argparse
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=sample-project',
'--region=xxx',
'--staging_location=gs://xxx',
'--temp_location=gs://xxx',
'--job_name=beam-streaming',
'--worker_machine_type=n1-standard-16',
'--num_workers=1',
'--streaming'
])
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'ip:port' },
topics=['local-events'])
| 'print' >> beam.ParDo(PrintValue())
| 'write to pubsub' >> beam.io.WriteToPubSub('projects/sample/topics/test')
)
if __name__ == '__main__':
run()
I know there is an Issue https://issues.apache.org/jira/browse/BEAM-11998 but as i understand it, this problem only belongs to portable runners. Does anybody know if ReadFromKafka is working with unbounded data in Dataflow?
Upvotes: 4
Views: 865
Reputation: 133
I had a similar issue, and switched to using a beam.Map
transform instead (make sure your printValue
function is defined within the run function, or you have a proper dependency management method):
| Map(lambda value: printValue(value))
Note that the type of elements you get from ReadFromKafka
is an ad hoc class named BeamSchema_xxxxxxxxx, having the following attributes (assuming you configure reader with_metadata=True
): 'topic', 'value', 'count', 'headers', 'index', 'key', 'offset', 'partition', 'timestamp', 'timestampTypeId', 'timestampTypeName'. It does't print
nice if at all.
So you want to decode your values first, for example:
def decode_kafka_message(record) -> str:
"""
Record attributes passed from ReadFromKafka transform: 'topic', 'value'
'count', 'headers', 'index', 'key', 'offset', 'partition',
'timestamp', 'timestampTypeId', 'timestampTypeName'.
:return: Message value as string
"""
if hasattr(record, 'value'):
value = record.value
elif isinstance(record, tuple):
value = record[1]
else:
raise RuntimeError('unknown record type: %s' % type(record))
return value.decode("UTF-8") if isinstance(value, bytes) else value
That connector could use some work and better docs.
Upvotes: 1