koren maliniak
koren maliniak

Reputation: 153

Flink - behaviour of timesOrMore

I want to find pattern of events that follow

Inner pattern is:

  1. Have the same value for key "sensorArea".
  2. Have different value for key "customerId".
  3. Are within 5 seconds from each other.

And this pattern needs to

  1. Emit "alert" only if previous happens 3 or more times.

I wrote something but I know for sure it is not complete.

Two Questions

  1. I need to access the previous event fields when I'm in the "next" pattern, how can I do that without using the ctx command because it is heavy..

  2. My code brings weird result - this is my input

enter image description here

and my output is

3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}

even though my desired output should be all first six events (users 111/222 and sensor are 33 and then 44 and then 55

Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
            .followedBy("second").where(new IterativeCondition<Customer>() {
                @Override
                public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
                    List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
                    int i = firstPatternEvents.size();
                    int currSensorData = currCustomerEvent.getSensorData();
                    int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
                    int currCustomerId = currCustomerEvent.getCustomerId();
                    int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
                    return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
                }
            })
            .within(Time.seconds(5))
            .timesOrMore(3);



    PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
    DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);

Upvotes: 0

Views: 167

Answers (1)

David Anderson
David Anderson

Reputation: 43524

You will have an easier time if you first key the stream by the sensorArea. They you will be pattern matching on streams where all of the events are for a single sensorArea, which will make the pattern easier to express, and the matching more efficient.

You can't avoid using an iterative condition and the ctx, but it should be less expensive after keying the stream.

Also, your code example doesn't match the text description. The text says "within 5 seconds" and "3 or more times", while the code has within(Time.seconds(2)) and timesOrMore(2).

Upvotes: 1

Related Questions