user9441243
user9441243

Reputation:

PubSub watermark not advancing

I've written an Apache Beam job using Scio with the purpose of generating session ids for incoming data records and then enriching them in some manner, before outputting them to BigQuery. Here's the code:

val measurements = sc.customInput("ReadFromPubsub",
  PubsubIO
    .readMessagesWithAttributes()
    .withTimestampAttribute("ts")
    .fromSubscription(subscription)
)

measurements
    .map(extractMeasurement).flatMap {
      case Success(event) =>
        Some(event)
      case Failure(ex) =>
        None
    }
    .timestampBy(_.timestamp)
    .withSessionWindows(sessionGap, WindowOptions(
      trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
      accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
      allowedLateness = Duration.standardDays(1),
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
    .keyBy(_.clientID)
    .groupByKey
    .toWindowed
    .map(assignSessionID)
    .toSCollection.flatMap(_.results)
    .map(enrich)
    .saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)

I'm using the event timestamp, which is the value of the attribute key ts in the PubsubMessage, as my timestamp attribute. This is the exact same timestamp as the one that I'm using in .timestampBy, before windowing my data. What I'm expecting is the output trigger firing as soon as the watermark advances past the sessionGap (30 minutes default).

Using both the Dataflow runner and the DirectRunner the trigger never fires, even though I simulate data with timestamps more than 30 minutes apart. In the Dataflow UI I can see that the watermark never advances based on the event timestamps, but only every other minute, as if no data was received.

I've verified that the data was actually received, as the transformation prior the windowing is executed. I've also tested with around 10 records per second, but maybe that's still insufficient for the watermark to be updated? I've also set up a JobTest in which I get the expected output, also signaling to me that the issue is timestamp/watermark based.

I'm sure that I've missed something important in the documentation or made a silly mistake somewhere and was hoping that someone could point me in the right direction.

Upvotes: 2

Views: 853

Answers (2)

Slava Chernyak
Slava Chernyak

Reputation: 819

When publishing your messages to pubsub, how are you generating the timestamps that you write to the "ts" attribute of your message, and how are you encoding them?

If I recall correctly the timestamps must be encoded to RFC3339 spec, eg something like this "2020-10-02T10:00:00-05:00"

Another thing you can try is to temporarily remove the line ".withTimestampAttribute("ts")" so that the timestamps used are automatically generated. Then verify if your watermark is advancing. If thats the case, this points to a problem with the timestamp values (eg maybe the values are not what you expected) or their encoding.

Finally, if using the cloud dataflow runner, take a look at the job status page. That should show you the current value of the data watermark. You can inspect it to see if it matches your expectation.

Upvotes: 1

Amruth Bahadursha
Amruth Bahadursha

Reputation: 74

You could try adding early and late firings to AfterWatermark.pastEndofWindows to see if the watermark gets updated and also check for any late-arriving data. Also you can find documentation on triggers here.

Upvotes: 0

Related Questions