Reputation: 792
I have a beam/dataflow pipeline that reads from Pub/Sub and writes to BiqQuery with WriteToBigQuery
. I convert all timestamps to apache_beam.utils.timestamp.Timestamp
. I am sure all timestamps are converted but I do get this error for some rows:
Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.UnsupportedOperationException: Converting BigQuery type 'class java.lang.String' to 'LOGICAL_TYPE<beam:logical_type:micros_instant:v1>' is not supported
The volume of data is too high to isolate the failed rows and WriteToBigQuery.failed_rows_with_errors
does not have anything. It works fine with STREAMING_INSERTS
mode only STORAGE_WRITE_API
has this issue.
My pipeline is like this:
(
p
| "ReadFromPubSub" >> beam.io.ReadFromPubSub(
subscription=config.get_arg("pub_sub_subscription"))
| "DecodeMessage" >> beam.ParDo(ParsePubSubMessage())
| "FilterEvents" >> beam.Filter(lambda element: element[0][1] == "event")
| "ExtractParsedMessage" >> beam.Map(lambda element: element[1])
| "LogBeforeSerialize" >> beam.Map(lambda x: log_element(x, "Before serialize"))
| "SerializeDataField" >> beam.ParDo(SerializeDataFieldDoFn(events_schema))
| "LogAfterSerialize" >> beam.Map(lambda x: log_element(x, "After serialize"))
| "FilterValidRows" >> beam.Filter(lambda row: row is not None)
| "WriteToBigQuery" >> WriteToBigQuery(
table="xxx",
dataset="xxx",
project="xxx",
schema={"fields": events_schema},
method=WriteToBigQuery.Method.STORAGE_WRITE_API,
use_at_least_once=True,
validate=False,
ignore_unknown_columns=True,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_NEVER,
)
)
Any help will be much appreciated.
Upvotes: 0
Views: 34
Reputation: 155
There’s a good chance that some are still Strings
when reached the BigQuery sink. Try to focus first on identifying the problematic data types rather than specific rows.
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp
def log_element_types(element, message):
print(f"{message}:")
for key, value in element.items():
print(f" {key}: {type(value)}")
return element
... your pipeline ...
| "LogTypesAfterSerialize" >> beam.Map(lambda x: log_element_types(x, "Types After serialize"))
You can also try to implement a dead-letter queue to capture the failing elements. Filter the elements that have String
timestamps after your conversion attempt and send them to a separate Pub/Sub topic or BigQuery table for analysis.
Upvotes: 0