davideanastasia
davideanastasia

Reputation: 366

Trigger windows in order in Apache Beam

Using Apache Beam, I am trying to publish metrics into StackDriver from Dataflow. However, StackDriver doesn't allow to write a value for t0 if a value for t1 has already been written, and unfortunately I haven't found a way yet in Apache Beam to enforce the fact that I want windows to be emitted in temporal order (an EARLY pane for t1 could still go before an ON_TIME for t0, as far as I understood).

So I decided then to disallow any lateness into a 1 minute fix windows, as follow:

input
  .apply("IntoOneMinFixedWindow", Window.<T>into(FixedWindows.of(Duration.standardMinutes(1)))
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes())
  .apply("GloballyCount", Combine.globally(Count.<T>combineFn()).withoutDefaults())
  .apply("StackDriverWriterFn", ParDo.of(new StackDriverWriterFn(metricName)));

The data in "input" comes from Pub/Sub, while the code in the StackDriverWriterFn is pretty much verbatim from: https://cloud.google.com/monitoring/custom-metrics/creating-metrics

When the pipeline is at steady state, this works as expected. However, if for some reason, the pipeline is down for a few minutes and get restarted, the watermark grows very quickly while the catch up with unprocessed data happens and multiple windows gets emitted more or less at the same time, in a not-predictable order, which causes the following error:

com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more TimeSeries could not be written: Points must be written in order. One or more of the points specified had an older end time than the most recent point.: timeSeries[0]

I wonder if I am missing something, or I am really forced to buffer samples somehow, and sort them before writing into StackDriver.

Upvotes: 3

Views: 868

Answers (2)

shamma
shamma

Reputation: 151

The error “INVALID_ARGUMENT: One or more TimeSeries could not be written” most typically results when multiple concurrent writers add points to a single time series where there should really be separate TimeSeries for the different writers, distinguished by the monitored resource or a metric label. Here it says that “The point's time interval must be later than any point already in the time series.”

Upvotes: 0

Kenn Knowles
Kenn Knowles

Reputation: 6023

You are correct that early outputs for a window ending at time t1 may be output before any output for a window ending at a later time t2. Also notable is that PCollections have no inherent order and transport is not required to be order-preserving.

You are also correct that if you want to send event timestamped data to a system that requires it to be in order, waiting until the event time watermark assures that there will be no later data is the only option.

If you provide some more details about exactly how you are using StackDriver I might have more to add about how to use it most effectively.

Upvotes: 0

Related Questions