Reputation: 465
I'm reading messages via ReadFromPubSub
with timestamp_attribute=None
, which should set timestamps to the publishing time.
This way, I end up with a PCollection
of PubsubMessage
elements.
How can I access the timestamps of these element in order, e.g. save them to a database? The only properties I can see are data
and attributes
, and the attributes
only has keys coming from Pub/Sub.
Edit: Sample code
with beam.Pipeline(options=pipeline_options) as p:
items = (p
| ReadFromPubSub(topic=args.read_topic, with_attributes=True)
| beam.WindowInto(beam.window.FixedWindows(args.time_window))
| 'FormatMessage' >> beam.Map(format_message)
| 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
args.project, write_disposition='WRITE_APPEND')
)
where format_message
would take a PubsubMessage
and return a dictionary representing a row to append to the table:
def format_message(message):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes)
}
return formatted_message
Upvotes: 1
Views: 2469
Reputation: 1305
I don't know at what time this was introduced, but in addition to shadesofdarkred's answer there is another way (and for example can be used in a lambda
as well).
The PubsubMessage
returned by ReadFromPubSub
has an attribute publish_time
.
Therefore, considering the code in the original question, you could easily access it using message.publish_time
.
From the docs:
publish_time
(datetime) Time at which the message was published. Will be reset to None if the Message is being written to pubsub.
Upvotes: 0
Reputation: 1476
It seems there's (newly release?!) timestamp_attribute
arguments when you call beam.io.gcp.pubsub.ReadFromPubSub()
but I tried on my end, it doesn't work as I had expected. Posted a new query on SO if someone wanna follow-up DataFlow (PY 2.x SDk) ReadFromPubSub :: id_label & timestamp_attribute behaving unexpectedly
Upvotes: 0
Reputation: 465
Turns out the mapped function can be modified to read additional arguments:
def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}
return formatted_message
More possible parameters: https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
Upvotes: 5