Reputation: 838
It is the first time I am working with Apache Flink (1.3.1) and have a question. In more detail I am working with flink-core, flink-cep and flink-streaming library. My application is an Akka ActorSystem what consumes messages from RabbitMQ and various actors handle this messages. In some actors I want to instantiate a StreamExecutionEnvironment
from Flink and process the incoming messages. Therefore I wrote a custom source class what extends the RichSourceFunction
class. Everything works find, except one thing: I do not know how to send data to my Flink extension. Here is my setup:
public class FlinkExtension {
private static StreamExecutionEnvironment environment;
private DataStream<ValueEvent> input;
private CustomSourceFunction function;
public FlinkExtension(){
environment = StreamExecutionEnvironment.getExecutionEnvironment();
function = new CustomSourceFunction();
input = environment.addSource(function);
PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());
DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
@Override
public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
return null; //TODO
}
});
warnings.print();
try {
environment.execute();
} catch(Exception e){
e.printStackTrace();
}
}
private Pattern<ValueEvent, ?> _pattern(){
return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
@Override
public boolean filter(ValueEvent value) throws Exception {
return value.getValue() > 10;
}
});
}
public void sendData(ValueEvent value){
function.sendData(value);
}
}
And this is my custom source function:
public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {
private volatile boolean run = false;
private SourceContext<ValueEvent> context;
@Override
public void open(Configuration parameters){
run = true;
}
@Override
public void run(SourceContext<ValueEvent> ctx) throws Exception {
this.context = ctx;
while (run){
}
}
public void sendData(ValueEvent value){
this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
}
@Override
public void cancel() {
run = false;
}
}
So I want to call the method sendData
in my FlinkExtension
class from outside to write data in a continuous way to my FlinkExtension
. Here is my JUnit test what should send data to the extension and then write the data to the SourceContext
.
@Test
public void testSendData(){
FlinkExtension extension = new FlinkExtension();
extension.sendData(new ValueEvent(30));
}
But if I run the test, nothing happens, the application hangs in the run method of the CustomSourceFunction
. I also tried to create a new endless thread in the CustomSourceFunction
run method.
To summarize: Does anybody know how to write data from an application to a Flink instance in a continuous way?
Upvotes: 2
Views: 7416
Reputation: 1
The problem is that there are different object instances of CustomSourceFunction
being used by the run
method and the sendData
method. Hence, the context
object is not being shared between the methods and adding a new ValueEvent
does not work.
To fix this, store the object instance used by the run
method as a static member variable of the CustomSourceFunction
class. When you need to create a new ValueEvent
, invoke the sendData
method on this same object instance.
See the sample code below
package RuleSources;
import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.ArrayList;
public class DynamicRuleSource extends AlertingRuleSource {
private static DynamicRuleSource sourceObj;
private SourceContext<Rule> ctx;
public static DynamicRuleSource getSourceObject() {
return sourceObj;
}
public void run(SourceContext<Rule> ctx) throws Exception {
this.ctx = ctx;
sourceObj = this;
while(true) {
Thread.sleep(100);
}
}
public void addRule(Rule rule) {
ctx.collect(rule);
}
@Override
public void cancel() {
}
}
To add a new rule
public static void addRule(Rule rule) throws Exception {
AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
sourceObject.addRule(rule);
}
Upvotes: 0
Reputation: 43717
Flink source connectors emit a continuous stream of data by having their run() methods call collect() (or collectWithTimestamp()) inside of the while(run) loop. If you want to study an example, the Apache NiFi source isn't as complex as most; here's its run method.
Upvotes: 2