Reputation: 940
I have a flink job with parallelism 5 (for now !!). And one of the richFlatMap stream opens one file in the open(Configuration parameters)
method. In the flatMap
operation there is no any open action, it just read the file to search something. (There is a utility class which has a method like utilityClass.searchText('abc')
). Here is the boilerplate code:
public class MyFlatMap extends RichFlatMapFunction<...> {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}
This file is being updated by the python script every day at specific time. Therefore I should also open the newly created file (by python script) in the flatMap stream.
I just though that this can be done by ScheduledExecutorService
with only one thread pool.
I can not open this file every flatMap calls because it is big.
Here is the boilerplate code I am trying to write:
public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private MyUtilityFile myFile;
@Override
public void run() {
myFile.Open("fileLocation");
}
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}
Is this boilerplate okey for Flink environment? If not, how can i open the file with a scheduled manner? (There is no option such as "after updating file send event with kafka and read event by flink")
Upvotes: 1
Views: 946
Reputation: 813
Perhaps you can directly implement the ProcessingTimeCallback interface, which supports timer operations
public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
myFile.Open("fileLocation");
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
}
}
Upvotes: 2