Viktor Stolbin
Viktor Stolbin

Reputation: 2939

Hazelcast Jet stream processing end window emission

I've stomped across an interesting observation trying to cross check results of aggregation for my stream processing. I've created a test case when pre-defined data set was fed into a journaled map and aggregation was supposed to populate 1 result as it was inline with window size/sliding and amount of data with pre-determined timestamps. However result was never published. Window was not emitted however few accumulate/combine operations where executed. It works differently with real data, but result of aggregation is always 'behind' the amount of data drawn from the source. I guess this has something to do with Watermarks? How can I make sure in my test case that it doesn't wait for more data to come. Will allowed lateness help?

Upvotes: 0

Views: 268

Answers (2)

Oliv
Oliv

Reputation: 10812

Points in Can Gencer's answer are valid. But for test, you can also use a batch source, such as Sources.list. By adding timestamps to a BatchStage you convert it to a StreamStage, on which you can do window aggregation. The aggregate transform will emit pending windows at the end of the batch.

    JetInstance inst = Jet.newJetInstance();
    IListJet<TimestampedEntry<String, Integer>> list = inst.getList("data");
    list.add(new TimestampedEntry(1, "a", 1));
    list.add(new TimestampedEntry(1, "b", 2));
    list.add(new TimestampedEntry(1, "a", 3));
    list.add(new TimestampedEntry(1, "b", 4));

    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<TimestampedEntry<String, Integer>>list("data"))
     .addTimestamps(TimestampedEntry::getTimestamp, 0)
     .groupingKey(TimestampedEntry::getKey)
     .window(tumbling(1))
     .aggregate(AggregateOperations.summingLong(TimestampedEntry::getValue))
     .drainTo(Sinks.logger());

    inst.newJob(p).join();
    inst.shutdown();

The above code prints:

TimestampedEntry{ts=01:00:00.002, key='a', value='4'}
TimestampedEntry{ts=01:00:00.002, key='b', value='6'}

Remember to keep your data in the list ordered by time as we use allowedLag=0.

Answer is valid for Jet 0.6.1.

Upvotes: 1

Can Gencer
Can Gencer

Reputation: 8885

First, I'll refer you to the two sections in the manual which describe how watermarks work and also talk about the concept of stream skew:

  1. http://docs.hazelcast.org/docs/jet/0.6.1/manual/#unbounded-stream-processing
  2. http://docs.hazelcast.org/docs/jet/0.6.1/manual/#stream-skew

The concept of "current time" in Jet only advances as long as there's events with advancing timestamps. There's typically several factors at play here:

Allowed lateness: This defines your lag per partition, assuming you are using a partitioned source like Kafka. This describes the tolerable degree of out of orderness in terms of timestamps in a single partition. If allowed lateness is 2 sec, the window will only close when you have received an event at N + 2 seconds across all input partitions.

Stream skew: This can happen when for example you have 10 Kafka partitions but only 3 are producing any events. As Jet coalesces watermarks from all partitions, this will cause the stream to wait until the other 7 partitions have some data. There's a timeout after which these partitions are considered idle, but this is by default 60 sec and currently not configurable in the pipeline API. So in this case you won't have any output until these partitions are marked as idle.

When using test data, it's quite common to have very low volume of events and many partitions, which can make it a challenge to advance the time correctly.

Upvotes: 2

Related Questions