shadesofdarkred
shadesofdarkred

Reputation: 465

Apache Beam Python SDK: How to access timestamp of an element?

I'm reading messages via ReadFromPubSub with timestamp_attribute=None, which should set timestamps to the publishing time.

This way, I end up with a PCollection of PubsubMessage elements.

How can I access the timestamps of these element in order, e.g. save them to a database? The only properties I can see are data and attributes, and the attributes only has keys coming from Pub/Sub.

Edit: Sample code

with beam.Pipeline(options=pipeline_options) as p:
    items = (p
        | ReadFromPubSub(topic=args.read_topic, with_attributes=True)
        | beam.WindowInto(beam.window.FixedWindows(args.time_window))
        | 'FormatMessage' >> beam.Map(format_message)
        | 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
            args.project, write_disposition='WRITE_APPEND')
    )

where format_message would take a PubsubMessage and return a dictionary representing a row to append to the table:

def format_message(message):
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes)
    }
    return formatted_message

Upvotes: 1

Views: 2469

Answers (4)

dnnshssm
dnnshssm

Reputation: 1305

I don't know at what time this was introduced, but in addition to shadesofdarkred's answer there is another way (and for example can be used in a lambda as well).

The PubsubMessage returned by ReadFromPubSub has an attribute publish_time.

Therefore, considering the code in the original question, you could easily access it using message.publish_time.

From the docs:

publish_time

(datetime) Time at which the message was published. Will be reset to None if the Message is being written to pubsub.

Upvotes: 0

Vibhor Jain
Vibhor Jain

Reputation: 1476

It seems there's (newly release?!) timestamp_attribute arguments when you call beam.io.gcp.pubsub.ReadFromPubSub()

but I tried on my end, it doesn't work as I had expected. Posted a new query on SO if someone wanna follow-up DataFlow (PY 2.x SDk) ReadFromPubSub :: id_label & timestamp_attribute behaving unexpectedly

Upvotes: 0

shadesofdarkred
shadesofdarkred

Reputation: 465

Turns out the mapped function can be modified to read additional arguments:

def format_message(message, timestamp=beam.DoFn.TimestampParam):    
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes),
        'timestamp': float(timestamp)
    }

    return formatted_message

More possible parameters: https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn

Upvotes: 5

Tina Iris
Tina Iris

Reputation: 581

Have you tried setting up with_attributes=True?

Hope the Beam docs be helpful. The parameters include:

with_attributes – True - output elements will be PubsubMessage objects. Default to False - output elements will be of type bytes (message data only).

Upvotes: 0

Related Questions