monstereo
monstereo

Reputation: 940

Scheduled Task with Apache Flink

I have a flink job with parallelism 5 (for now !!). And one of the richFlatMap stream opens one file in the open(Configuration parameters) method. In the flatMapoperation there is no any open action, it just read the file to search something. (There is a utility class which has a method like utilityClass.searchText('abc')). Here is the boilerplate code:

public class MyFlatMap extends RichFlatMapFunction<...> {

    private MyUtilityFile myFile;

    @Override
    public void open(Configuration parameters) throws Exception {
        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

This file is being updated by the python script every day at specific time. Therefore I should also open the newly created file (by python script) in the flatMap stream.

I just though that this can be done by ScheduledExecutorService with only one thread pool.

I can not open this file every flatMap calls because it is big.

Here is the boilerplate code I am trying to write:

public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private MyUtilityFile myFile;

    @Override
    public void run() {
       myFile.Open("fileLocation");     
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

Is this boilerplate okey for Flink environment? If not, how can i open the file with a scheduled manner? (There is no option such as "after updating file send event with kafka and read event by flink")

Upvotes: 1

Views: 946

Answers (1)

ChangLi
ChangLi

Reputation: 813

Perhaps you can directly implement the ProcessingTimeCallback interface, which supports timer operations

public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback { 
    private MyUtilityFile myFile;

 
    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        myFile.Open("fileLocation");

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);
    }
}

Upvotes: 2

Related Questions