RadRuss
RadRuss

Reputation: 524

Apache Beam To BigQuery

I am building a process in Google Cloud Dataflow that will consume messages in a Pub/Sub and based on a value of one key it will either write them to BQ or to GCS. I am able to split the messages, but I am not sure how to write the data to BigQuery. I've tried using the beam.io.gcp.bigquery.WriteToBigQuery, but no luck.

My full code is here: https://pastebin.com/4W9Vu4Km

Basically my issue is that I don't know, how to specify in the WriteBatchesToBQ (line 73) that the variable element should be written into BQ.

I've also tried using beam.io.gcp.bigquery.WriteToBigQuery directly in the pipeline (line 128), but then I got an error AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] . This is probably because I am not feeding it a dictionary, but a list of dictionaries (I would like to use 1-minute windows).

Any ideas please? (also if there is something too stupid in the code, let me know - I am playing with apache beam just for a short time and I might be overlooking some obvious issues).

Upvotes: 1

Views: 4029

Answers (2)

RadRuss
RadRuss

Reputation: 524

The second approach is the solution to this issue, you need to use WriteToBigQuery function directly in the pipeline. However, a beam.FlatMap step needs to be included so the WriteToBigQuery can process the list of dictionaries correctly.

Hence the complete pipeline splitting data, grouping them by time, and writing them into BQ is defined like this:

 accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
            window_size) | "FlatMap" >> beam.FlatMap(
            lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
                                                                                               schema=(
                                                                                                   output_table_bq_schema),
                                                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                                               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

The complete working code is here: https://pastebin.com/WFwBvPcU

Upvotes: 4

Vibhor Gupta
Vibhor Gupta

Reputation: 699

WriteToBigQuery sample format is given below:-

    project_id = "proj1"
    dataset_id = 'dataset1'
    table_id = 'table1'
    table_schema = ('id:STRING, reqid:STRING')

        | 'Write-CH' >> beam.io.WriteToBigQuery(
                                                    table=table_id,
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                    ))

You can refer this case it will give you a brief understanding of beam data pipeline.

Upvotes: 2

Related Questions