Piyush Shrivastava
Piyush Shrivastava

Reputation: 1098

Flink Custom Trigger giving Unexpected Output

I want to create a Trigger which gets fired in 20 seconds for the first time and in every five seconds after that. I have used GlobalWindows and a custom Trigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())

Here is the code in TradeTrigger:

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        } else {
            if((System.currentTimeMillis()-ctime) >= 5000){
                ctime = System.currentTimeMillis();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

    }

    @Override
    public TriggerResult onEventTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }


    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

}

So basically, when flag is false, i.e. the first time, the Trigger should get fired in 20 seconds and set the flag to true. From the next time, it should get fired every 5 seconds.

The problem I am facing is, I am getting only one message in the output every time the Trigger is fired. That is, I get a single message after 20 seconds and single messages in every five seconds. I am expecting twenty messages in the output on each triggering.

If I use .timeWindow(Time.seconds(5)) and create a time window of five seconds, I get 20 messages in output every 5 seconds. Please help me get this code right. Is there something I am missing?

Upvotes: 0

Views: 1342

Answers (2)

Piyush Shrivastava
Piyush Shrivastava

Reputation: 1098

Got it working with the help of the answer from Fabian and Flink mailing lists. Stored the state in a ValueState variable through the TriggerContext. Checked the variable in onEvent() method and if it was the first time, registered a processingTimeTimer for 20 seconds more than the current time and updated the state. In the onProcessingTime method, registered another ProcessingTimeTimer for 5 seconds more than current time, updated the state and fired the Window.

Upvotes: 1

Fabian Hueske
Fabian Hueske

Reputation: 18987

There are a few issues with your Trigger implementation:

  1. You should never store the state of a function in a static variable. Flink does not isolate user processes in JVMs. Instead it uses a single JVM per TaskManager and starts multiple threads. Hence, your static boolean flag is shared across multiple instances of triggers. You should store the flag Flink's ValueState interface which is accessible from the TriggerContext. Flink will take care to checkpoint your state and recover it in case of a failure.

  2. Trigger.onEvent() is only called when a new event arrives. So it cannot be used to trigger a Window computation at a specific time. Instead you should register an event time timer or processing time timer (again via the TriggerContext). The timer will call Trigger.onEventTime() or Trigger.onProcessingTime() respectively. Whether to use event or processing time depends on your use case.

Upvotes: 5

Related Questions