Reputation: 524
I am building a process in Google Cloud Dataflow that will consume messages in a Pub/Sub and based on a value of one key it will either write them to BQ or to GCS. I am able to split the messages, but I am not sure how to write the data to BigQuery. I've tried using the beam.io.gcp.bigquery.WriteToBigQuery
, but no luck.
My full code is here: https://pastebin.com/4W9Vu4Km
Basically my issue is that I don't know, how to specify in the WriteBatchesToBQ
(line 73) that the variable element
should be written into BQ.
I've also tried using beam.io.gcp.bigquery.WriteToBigQuery
directly in the pipeline (line 128), but then I got an error AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
. This is probably because I am not feeding it a dictionary, but a list of dictionaries (I would like to use 1-minute windows).
Any ideas please? (also if there is something too stupid in the code, let me know - I am playing with apache beam just for a short time and I might be overlooking some obvious issues).
Upvotes: 1
Views: 4029
Reputation: 524
The second approach is the solution to this issue, you need to use WriteToBigQuery function directly in the pipeline. However, a beam.FlatMap step needs to be included so the WriteToBigQuery can process the list of dictionaries correctly.
Hence the complete pipeline splitting data, grouping them by time, and writing them into BQ is defined like this:
accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
window_size) | "FlatMap" >> beam.FlatMap(
lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
schema=(
output_table_bq_schema),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
The complete working code is here: https://pastebin.com/WFwBvPcU
Upvotes: 4
Reputation: 699
WriteToBigQuery sample format is given below:-
project_id = "proj1"
dataset_id = 'dataset1'
table_id = 'table1'
table_schema = ('id:STRING, reqid:STRING')
| 'Write-CH' >> beam.io.WriteToBigQuery(
table=table_id,
dataset=dataset_id,
project=project_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
You can refer this case it will give you a brief understanding of beam data pipeline.
Upvotes: 2