learning cloud
learning cloud

Reputation: 41

Google dataflow job failing on writeToBiqquery step : 'list' object and 'str' object has no attribute'items'

I have a beam pipeline running using a dataflow runner. It takes in XML and outputs a JSON which has to be then stored in Bigquery table. Earlier I was writing the newline delimited JSON into GCS bucket using beam pipeline and creating a BQ table from the file without making any changes into it (using the bigquery console). The job runs successfully and the data gets imported into BQ without any hiccups.

Now I have modified the pipeline in order to directly write the output JSON rows into BQ table. I am using the apache beams beam.io.WriteToBigQuery function. Pcollections are json objects where each line contains one single object (row) for the BQ.

below is the sample input that goes into the WriteToBigQuery :

{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}

Part of my code that is as below:

from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def run(argv = None):

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)  
    google_cloud_options.project = 'project_name'
    google_cloud_options.job_name = 'jobid'
    google_cloud_options.staging_location = 'gs://bucket/staging'
    google_cloud_options.temp_location = 'gs://bucket/temp'
    options.view_as(StandardOptions).runner = 'DataflowRunner'


    p = beam.Pipeline(options=options)

    table_spec = 'project:dataset.table'

    data = (p
        | 'Create' >> beam.Create([input_file_path])
        | 'GetXML' >> beam.ParDo(ReadGCSfile())
        #| 'Convert2JSON' >> beam.ParDo(converttojson())
        | 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
        #| beam.Map(print_row)
        )


    project_id = "project1"
    dataset_id = 'dataset'
    table_id = 'table'
    table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')

    data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                    )

    p.run()

The error when I run this pipeline is as below:

AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']

I think the error is due to the return type from previous step, or something related to execution of straming and batch loading into BigQuery. I want to do a batch load in mycase. I have tried to work with the example insert pipeline given at Apache BEam documentation-Writing to a bigquery table That pipeline works. The form of data there is as below:

quotes = p | beam.Create([
    {'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
    {'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])

How can I modify my pipeline so that the string type data in my case gets written to bigquery table.

Upvotes: 3

Views: 1193

Answers (1)

learning cloud
learning cloud

Reputation: 41

Just posting it out here, if anyone stumbles over same problem. It was a very minute detail that I overlooked. beam.io.WriteToBigquery() takes dictionaries as input. My pcollection before the sink part was returning a list of single element or a string (depending on some versions that I tried). I just added another step in the pipeline to convert the json string into python dictionary using json.loads upon which the rows were loaded into BQ successfully.

Upvotes: 1

Related Questions