Mich Talebzadeh
Mich Talebzadeh

Reputation: 125

Beam Python Dataflow, writing to BigQuery table with schema provided throws AttributeError: May not assign arbitrary value tpe to message

I am trying this code on dataflow. Reading from a csv file on gs:/ storage bucket, creating BigQuery table and appending data. The code is as follows:

from __future__ import absolute_import

import argparse
import logging
import os

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage

import regex as re

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://xxx/DUMMY.csv']

class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'DUMMY',
    dataset='test',
    schema = {
  "fields": [
  {
    "name" : "ID",
    "type" : "FLOAT",
    "mode" : "REQUIRED"
  },
  {
    "name" : "CLUSTERED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "SCATTERED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "RANDOMISED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "RANDOM_STRING",
    "tpe"  : "STRING",
    "mode" : "NULLABLE"
  },
  {
    "name" : "SMALL_VC",
    "tpe"  : "INTEGER",
    "mode" : "NULLABLE"
  },
  {
    "name" : "PADDING",
    "tpe"  : "STRING",
    "mode" : "NULLABLE"
  }
]
},

    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The code throws this error

  File "/usr/local/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 976, in __setattr__
    "to message %s" % (name, type(self).__name__))
AttributeError: May not assign arbitrary value tpe to message TableFieldSchema [while running 'Write to BigQuery/WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables']

If I try the following

schema='SCHEMA_AUTODETECT'

instead of adding Json schema as above, it works fine. Moreover, if I try to reference a Json file in schema=, it fails. What is the root cause of this? It is my first dataflow/beam program and appreciate any advice.

Upvotes: 1

Views: 3662

Answers (1)

Anjela B
Anjela B

Reputation: 1201

This is because schema accepts type string of the form 'field1:type1,field2:type2,field3:type3' or TableSchema object that defines a comma separated list of fields. You may refer to this documentation: https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery

from __future__ import absolute_import

import argparse
import logging
import os

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


import re 

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://<your_bucket>/1.csv']

class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    
    pipeline_options = PipelineOptions(
    runner='DirectRunner',
    project='your-project',
    temp_location='gs://<your_bucket>/temp',
    region='us-central1')
    
    p = beam.Pipeline(options=pipeline_options)


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'your_table',
    dataset='your_dataset',
    schema ='ID:FLOAT,CLUSTERED:FLOAT,SCATTERED:FLOAT,RANDOMISED:FLOAT,RANDOM_STRING:STRING,SMALL_VC:INTEGER,PADDING:STRING',
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Input file csv:

ID,CLUSTERED,SCATTERED,RANDOMISED,RANDOM_STRING,SMALL_VC,PADDING
12,5,23,55,team,5,bigdata

Output: enter image description here

Upvotes: 3

Related Questions