Jonathan
Jonathan

Reputation: 792

Beam/Dataflow pipeline writing to BigQuery fails to convert timestamps (sometimes)

I have a beam/dataflow pipeline that reads from Pub/Sub and writes to BiqQuery with WriteToBigQuery. I convert all timestamps to apache_beam.utils.timestamp.Timestamp. I am sure all timestamps are converted but I do get this error for some rows:

Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.UnsupportedOperationException: Converting BigQuery type 'class java.lang.String' to 'LOGICAL_TYPE<beam:logical_type:micros_instant:v1>' is not supported

The volume of data is too high to isolate the failed rows and WriteToBigQuery.failed_rows_with_errors does not have anything. It works fine with STREAMING_INSERTS mode only STORAGE_WRITE_API has this issue.

My pipeline is like this:

(
            p
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(
                subscription=config.get_arg("pub_sub_subscription"))
            | "DecodeMessage" >> beam.ParDo(ParsePubSubMessage())
            | "FilterEvents" >> beam.Filter(lambda element: element[0][1] == "event")
            | "ExtractParsedMessage" >> beam.Map(lambda element: element[1])
            | "LogBeforeSerialize" >> beam.Map(lambda x: log_element(x, "Before serialize"))
            | "SerializeDataField" >> beam.ParDo(SerializeDataFieldDoFn(events_schema))
            | "LogAfterSerialize" >> beam.Map(lambda x: log_element(x, "After serialize"))
            | "FilterValidRows" >> beam.Filter(lambda row: row is not None)
            | "WriteToBigQuery" >> WriteToBigQuery(
                table="xxx",
                dataset="xxx",
                project="xxx",
                schema={"fields": events_schema},
                method=WriteToBigQuery.Method.STORAGE_WRITE_API,
                use_at_least_once=True,
                validate=False,
                ignore_unknown_columns=True,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
                create_disposition=BigQueryDisposition.CREATE_NEVER,
            )
)

Any help will be much appreciated.

Upvotes: 0

Views: 34

Answers (1)

jggp1094
jggp1094

Reputation: 155

There’s a good chance that some are still Strings when reached the BigQuery sink. Try to focus first on identifying the problematic data types rather than specific rows.

import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp

def log_element_types(element, message):
    print(f"{message}:")
    for key, value in element.items():
        print(f"  {key}: {type(value)}")
    return element

 ... your pipeline ...
| "LogTypesAfterSerialize" >> beam.Map(lambda x: log_element_types(x, "Types After serialize"))

You can also try to implement a dead-letter queue to capture the failing elements. Filter the elements that have String timestamps after your conversion attempt and send them to a separate Pub/Sub topic or BigQuery table for analysis.

Upvotes: 0

Related Questions