crbl
crbl

Reputation: 397

Dataflow Tensorflow Transform write transformed data to BigQuery

In a GCP Dataflow pipeline, I am trying to write the transformed data from Transform component into Bigquery and I get the error below. First I would appreciate if someone could let me know if there is any standard solution to write the transformed data emitted by the Transform to BigQuery. If there is none, any advice what should be corrected into my code. Thank you.

 File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
    writer.write(row)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
    default=default_encoder).encode('utf-8')
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
    "Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'RecordBatch' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-128']

The code is:

def analyze_and_transform(raw_dataset, step):
    
    transformed_dataset, transform_fn = (
        raw_dataset 
        | '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
            preprocess_fn, output_record_batches=True)
    )
    
    return transformed_dataset, transform_fn

def transform(raw_dataset, transform_fn, step):
    
    transformed_dataset = (
        (raw_dataset, transform_fn) 
        | '{} - Transform'.format(step) >> tft_beam.TransformDataset(output_record_batches=True)
    )
    
    return transformed_dataset

from tensorflow_metadata.proto.v0 import schema_pb2

def convert_schema_to_string(schema):
    schema_string = ''
    for feature in schema.feature:
        if feature.type==schema_pb2.FLOAT:
            feature_type = 'FLOAT'
        elif feature.type == schema_pb2.INT and feature.int_domain.is_categorical == True:
            feature_type = 'STRING'
        else:
            feature_type = 'INTEGER'
            
        schema_string += '{}:{}'.format(feature.name, feature_type)

        schema_string += ','
    schema_string = schema_string.rstrip(',')
    return schema_string


def write_to_bigquery(transformed_dataset, step):
    
    transformed_data, transformed_metadata = transformed_dataset
    schema_string = convert_schema_to_string(transformed_metadata.schema)
    
    transformed_data | '{} - Write to BigQuery'.format(step) >> beam.io.WriteToBigQuery(
        table=f'{PROJECT}.{OUT_DATASET_ID}.{OUT_TABLE_NAME}',
        schema=schema_string,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
    )

And the pipeline:

with beam.Pipeline(runner, options=pipeline_options) as pipeline:
    logging.info(f'pipeline_options: {pipeline_options}')
    logging.getLogger().setLevel(logging.INFO)
    
    print(pipeline_options)
    
    with tft_beam.Context(temporary_dir):
        
        # Preprocess train data
        step = 'train'
        # Read raw train data from BQ
        raw_train_dataset = read_from_bq(pipeline, step, data_size)
        # Analyze and transform raw_train_dataset 
        transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)
        # transformed_train_data, transformed_train_metadata = transformed_train_dataset
        
        # Write transformed train data to sink as tfrecords
        write_tfrecords(transformed_train_dataset, transformed_data_location, step)
        # Write transformed train data to BigQuery

        write_to_bigquery(transformed_train_dataset, step)

[Update]: As a first test, in order to have row dict output, I have set: tft_beam.TransformDataset(output_record_batches=False) because according to the documentation: "".... If True, AnalyzeAndTransformDataset outputs pyarrow.RecordBatches; otherwise, outputs instance dicts.". I get another error:

File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
    writer.write(row)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
    default=default_encoder).encode('utf-8')
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
    "Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'float32' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-110']

Upvotes: 0

Views: 56

Answers (1)

crbl
crbl

Reputation: 397

I solved the problem by casting dtypes to int and float which are requested by BigQuery.

Upvotes: 0

Related Questions