Reputation: 23
I have written a Python dataflow job to read data from a csv file and populate a BigQuery table with that data. However, an error keeps popping up whenever I run this job. If I remove the write to Big Query part and write to file instead, the code is executing fine and the table is being written to the output file in dict format. The code is the following:
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import json
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
class ToTableRowDoFn(beam.DoFn):
def process(self,x):
values = x.split(',')
rows={}
rows["Name"]=values[0]
rows["Place_of_Birth"]=values[1]
rows["Age"]=values[2]
return [rows]
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(known_args.input)
lines | 'ToTableRows' >> beam.ParDo(ToTableRowDoFn()) | 'write' >>
beam.io.Write(beam.io.BigQuerySink(
'xxxx:ZZZZZZ.YYYYY',
schema='Name:STRING, Place_of_Birth:STRING, Age:STRING'))
# Actually run the pipeline (all operations above are deferred).
result = p.run()
I am loading the following csv file:
Name1,Place1,40
Name2,Place2,20
The error I get when I run this code on the csv file is the following:
AttributeError: 'FieldList' object has no attribute '_FieldList__field'
If I remove the WritetoBigQuery part and write to file instead, the code is working fine. Please help me resolve this issue.
Upvotes: 0
Views: 1143
Reputation: 121
I had this same problem. Posting for others that happen upon this thread. It has something to do with pickling. You have to disable save_main_session option. I just commented that out in my pipeline options to test. https://issues.apache.org/jira/browse/BEAM-3134
https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors
Upvotes: 1