user21195533
user21195533

Reputation: 1

Flink CEP not working with inEventTime() but works with inProcessingTime() when appied on a pattern

I am working on following program and have set WatermarkStrategy however when I run the program using inEventTime() method on pattern it does not give any output.

Note : the same program works when I use inProcessingTime() on pattern.

public class FlinkCEPTest {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        
        final String bootstrapServers = parameter.get("kafka.broker", "localhost:9092,broker:29092");
        final String inputTopic_1 = parameter.get("input.topic.1","acctopic");
        final String inputTopic_2 = parameter.get("input.topic.2","txntopic");
        final String outputTopic = parameter.get("output.topic.q","alerttopic");
        final String groupID = parameter.get("group.id","flink-demo-grp-id");

        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        
        KafkaSource<EventMessage> source_1 = KafkaSource.<EventMessage>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(inputTopic_1).setGroupId(groupID)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(new EventSchema())
                .build();    

        DataStream<EventMessage> text_1 = env.fromSource(source_1,
                WatermarkStrategy
                .<EventMessage>forBoundedOutOfOrderness(Duration.ofSeconds(300))
                .withTimestampAssigner((event, trtimestamp)-> {
                    //System.err.println("Kafka ingetstion ts : " + trtimestamp);
                    //System.err.println("Event ts : "+ event.getTxnDate().getTime());
                    return event.getTxnDate().getTime();}) 
                , "Kafka Source 1");


        DataStream<EventMessage> partitionedInput = text_1.keyBy(evt -> evt.getAccountId());


        //partitionedInput.print();

        Pattern<EventMessage, ?> relaxedAlarmPattern = Pattern.<EventMessage>begin("first").subtype(EventMessage.class)
                .where(new SimpleCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value) throws Exception {
                        return value.getEvent().equalsIgnoreCase("PASSWORD_CHANGE_SUCC");
                    }
                }).followedBy("second").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
                        Iterable<EventMessage> test = ctx.getEventsForPattern("first");
                        Integer accid = 0;
                        for (EventMessage te : test) {
                            accid = te.getAccountId();
                        }
                        return value.getEvent().equalsIgnoreCase("BENIFICIARY_ADDED")
                                && value.getAccountId().equals(accid);
                    }
                }).followedBy("third").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
                        Integer accid = 0;
                        Iterable<EventMessage> test = ctx.getEventsForPattern("first");
                        for (EventMessage te : test) {
                            accid = te.getAccountId();
                        }
                        return value.getEvent().equalsIgnoreCase("TXN_NEW")
                                && value.getAccountId().equals(accid) && value.getAmt() <= 10;
                    }
                }).followedBy("last").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
                        Integer accid = 0;
                        Iterable<EventMessage> test = ctx.getEventsForPattern("first");
                        for (EventMessage te : test) {
                            accid = te.getAccountId();
                        }
                        return value.getEvent().equalsIgnoreCase("TXN_NEW")
                                && value.getAccountId().equals(accid) && value.getAmt() >= 100 ;
                    }
                }).within(Time.seconds(300));
        
        


        PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern)
                .inEventTime();
                //.inProcessingTime();

        DataStream<String> alarms = patternStream.select(new PatternSelectFunction<EventMessage, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String select(Map<String, List<EventMessage>> pattern) throws Exception {
                EventMessage first = (EventMessage) pattern.get("first").get(0);
                EventMessage middle = (EventMessage) pattern.get("second").get(0);
                EventMessage third = (EventMessage) pattern.get("third").get(0);
                EventMessage last = (EventMessage) pattern.get("last").get(0);
                return "WARNING : Possible fraud scenario [ Party ID " + first.getPartyId()
                        + " recently changed his password and added a beneficiary and later made transcations of "
                        + third.getAmt() + " and " + last.getAmt()+" ]";
            }

        });

        alarms.print();
    
        env.execute(" CEP ");
    }

}

If I change the following line

PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern).inEventTime();

To


PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern).inProcessingTime();

The code works,any suggestions how can I make it work with inEventTime() method.

Upvotes: 0

Views: 107

Answers (1)

xjmdoo
xjmdoo

Reputation: 1736

Usually with Kafka sources the issue is that the parallelism is higher than the number of partitions or not all partitions receive data which doesn't let the watermarks advance forward. You can solve this by adjusting the parallelism or use withIdleness with your watermark strategy.

See more info in the Kafka connector docs.

Upvotes: 1

Related Questions