Reputation: 48356
I want to separate event from a bunch of multiple events, given data
{"type": "A", "k1": "v1"}
{"type": "B", "k2": "v2"}
{"type": "C", "k3": "v3"}
And I want to separate type: A
events to table A
in bigquery, type:B
events to table B
, type: C
events to table C
.
Here are my codes implemented through apache beam
python sdk and write data to bigquery
,
A_schema = 'type:string, k1:string'
B_schema = 'type:string, k2:string'
C_schema = 'type:string, k2:string'
class ParseJsonDoFn(beam.DoFn):
A_TYPE = 'tag_A'
B_TYPE = 'tag_B'
C_TYPE = 'tag_C'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)
if data['type'] == 'A':
yield pvalue.TaggedOutput(self.A_TYPE, data)
elif data['type'] == 'B':
yield pvalue.TaggedOutput(self.B_TYPE, data)
elif data['type'] == 'C':
yield pvalue.TaggedOutput(self.C_TYPE, data)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=seperate-bi-events-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.A_TYPE,
ParseJsonDoFn.B_TYPE,
ParseJsonDoFn.C_TYPE)))
a_line = multiple_lines.tag_A
b_line = multiple_lines.tag_B
c_line = multiple_lines.tag_C
(a_line
| "output_a" >> beam.io.WriteToBigQuery(
'temp.a',
schema = A_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
(b_line
| "output_b" >> beam.io.WriteToBigQuery(
'temp.b',
schema = B_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
(c_line
| "output_c" >> beam.io.WriteToBigQuery(
'temp.c',
schema = (C_schema),
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
p.run().wait_until_finish()
The output:
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
However, there are two issues here
bigquery
?Is there something wrong in my codes or something am I missing?
Upvotes: 1
Views: 3271
Reputation: 7058
there is no data in bigquery?
Your code seems to be perfectly fine as data is written to BigQuery (C_schema
should be k3
instead of k2
). Keep in mind that you are streaming data so you won't see it if you click on the Preview
table button until data in the streaming buffer is committed. Running a SELECT *
query will display the expected results:
From the logs it seems the codes does NOT run parallel rather than run 3 times sequence?
Ok, this is interesting. By tracing the WARNING
message in the code we can read the following:
# if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
# the table before this point.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
# BigQuery can route data to the old table for 2 mins max so wait
# that much time before creating the table and writing it
logging.warning('Sleeping for 150 seconds before the write as ' +
'BigQuery inserts can be routed to deleted table ' +
'for 2 mins after the delete and create.')
# TODO(BEAM-2673): Remove this sleep by migrating to load api
time.sleep(150)
return created_table
else:
return created_table
After reading BEAM-2673 and BEAM-2801, seems like this is due to an issue with the BigQuery sink using the Streaming API with the DirectRunner
. This will cause the process to sleep for 150s when re-creating the table and this is not done in parallel.
If, instead, we run it on Dataflow (using the DataflowRunner
, providing a staging and temp bucket path as well as loading the input data from GCS too) this will run three import jobs in parallel. See, in the below image, that all three start at 22:19:45
and finish at 22:19:56
:
Upvotes: 4