Reputation: 397
In a GCP Dataflow pipeline, I am trying to write the transformed data from Transform component into Bigquery and I get the error below. First I would appreciate if someone could let me know if there is any standard solution to write the transformed data emitted by the Transform to BigQuery. If there is none, any advice what should be corrected into my code. Thank you.
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
writer.write(row)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
return self._file_handle.write(self._coder.encode(row) + b'\n')
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
default=default_encoder).encode('utf-8')
File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
"Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'RecordBatch' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-128']
The code is:
def analyze_and_transform(raw_dataset, step):
transformed_dataset, transform_fn = (
raw_dataset
| '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
preprocess_fn, output_record_batches=True)
)
return transformed_dataset, transform_fn
def transform(raw_dataset, transform_fn, step):
transformed_dataset = (
(raw_dataset, transform_fn)
| '{} - Transform'.format(step) >> tft_beam.TransformDataset(output_record_batches=True)
)
return transformed_dataset
from tensorflow_metadata.proto.v0 import schema_pb2
def convert_schema_to_string(schema):
schema_string = ''
for feature in schema.feature:
if feature.type==schema_pb2.FLOAT:
feature_type = 'FLOAT'
elif feature.type == schema_pb2.INT and feature.int_domain.is_categorical == True:
feature_type = 'STRING'
else:
feature_type = 'INTEGER'
schema_string += '{}:{}'.format(feature.name, feature_type)
schema_string += ','
schema_string = schema_string.rstrip(',')
return schema_string
def write_to_bigquery(transformed_dataset, step):
transformed_data, transformed_metadata = transformed_dataset
schema_string = convert_schema_to_string(transformed_metadata.schema)
transformed_data | '{} - Write to BigQuery'.format(step) >> beam.io.WriteToBigQuery(
table=f'{PROJECT}.{OUT_DATASET_ID}.{OUT_TABLE_NAME}',
schema=schema_string,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
And the pipeline:
with beam.Pipeline(runner, options=pipeline_options) as pipeline:
logging.info(f'pipeline_options: {pipeline_options}')
logging.getLogger().setLevel(logging.INFO)
print(pipeline_options)
with tft_beam.Context(temporary_dir):
# Preprocess train data
step = 'train'
# Read raw train data from BQ
raw_train_dataset = read_from_bq(pipeline, step, data_size)
# Analyze and transform raw_train_dataset
transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)
# transformed_train_data, transformed_train_metadata = transformed_train_dataset
# Write transformed train data to sink as tfrecords
write_tfrecords(transformed_train_dataset, transformed_data_location, step)
# Write transformed train data to BigQuery
write_to_bigquery(transformed_train_dataset, step)
[Update]: As a first test, in order to have row dict output, I have set: tft_beam.TransformDataset(output_record_batches=False)
because according to the documentation: "".... If True, AnalyzeAndTransformDataset outputs pyarrow.RecordBatches; otherwise, outputs instance dicts.". I get another error:
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
writer.write(row)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
return self._file_handle.write(self._coder.encode(row) + b'\n')
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
default=default_encoder).encode('utf-8')
File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
"Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'float32' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-110']
Upvotes: 0
Views: 56
Reputation: 397
I solved the problem by casting dtypes to int and float which are requested by BigQuery.
Upvotes: 0