K.E.
K.E.

Reputation: 838

Write data from custom source to flink in continuous way

It is the first time I am working with Apache Flink (1.3.1) and have a question. In more detail I am working with flink-core, flink-cep and flink-streaming library. My application is an Akka ActorSystem what consumes messages from RabbitMQ and various actors handle this messages. In some actors I want to instantiate a StreamExecutionEnvironment from Flink and process the incoming messages. Therefore I wrote a custom source class what extends the RichSourceFunction class. Everything works find, except one thing: I do not know how to send data to my Flink extension. Here is my setup:

public class FlinkExtension {

    private static StreamExecutionEnvironment environment;
    private DataStream<ValueEvent> input;
    private CustomSourceFunction function;

    public FlinkExtension(){

        environment = StreamExecutionEnvironment.getExecutionEnvironment();

        function = new CustomSourceFunction();
        input = environment.addSource(function);

        PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());

        DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
            @Override
            public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
                return null; //TODO
            }
        });

        warnings.print();

        try {
            environment.execute();
        } catch(Exception e){
            e.printStackTrace();
        }

    }

    private Pattern<ValueEvent, ?> _pattern(){

        return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
            @Override
            public boolean filter(ValueEvent value) throws Exception {
                return value.getValue() > 10;
            }
        });
    }

    public void sendData(ValueEvent value){
        function.sendData(value);
    }
}

And this is my custom source function:

public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {

    private volatile boolean run = false;
    private SourceContext<ValueEvent> context;

    @Override
    public void open(Configuration parameters){
        run = true;
    }

    @Override
    public void run(SourceContext<ValueEvent> ctx) throws Exception {
        this.context = ctx;

        while (run){

        }
    }

    public void sendData(ValueEvent value){
        this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
    }

    @Override
    public void cancel() {
        run = false;
    }
}

So I want to call the method sendData in my FlinkExtension class from outside to write data in a continuous way to my FlinkExtension. Here is my JUnit test what should send data to the extension and then write the data to the SourceContext.

@Test
public void testSendData(){
    FlinkExtension extension = new FlinkExtension();
    extension.sendData(new ValueEvent(30));
}

But if I run the test, nothing happens, the application hangs in the run method of the CustomSourceFunction. I also tried to create a new endless thread in the CustomSourceFunction run method.

To summarize: Does anybody know how to write data from an application to a Flink instance in a continuous way?

Upvotes: 2

Views: 7416

Answers (2)

Aarti Gupta
Aarti Gupta

Reputation: 1

The problem is that there are different object instances of CustomSourceFunction being used by the run method and the sendData method. Hence, the context object is not being shared between the methods and adding a new ValueEvent does not work.

To fix this, store the object instance used by the run method as a static member variable of the CustomSourceFunction class. When you need to create a new ValueEvent, invoke the sendData method on this same object instance.

See the sample code below

package RuleSources;

import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.ArrayList;

public class DynamicRuleSource extends AlertingRuleSource {
    private static DynamicRuleSource sourceObj;

    private SourceContext<Rule> ctx;

    public static DynamicRuleSource getSourceObject() {
        return sourceObj;
    }

    public void run(SourceContext<Rule> ctx) throws Exception {
        this.ctx = ctx;
        sourceObj = this;
        while(true) {
            Thread.sleep(100);
        }
    }

    public void addRule(Rule rule) {
        ctx.collect(rule);
    }

    @Override
    public void cancel() {
    }
}

To add a new rule

 public static void addRule(Rule rule) throws Exception {
        AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
        sourceObject.addRule(rule);
    }

Upvotes: 0

David Anderson
David Anderson

Reputation: 43717

Flink source connectors emit a continuous stream of data by having their run() methods call collect() (or collectWithTimestamp()) inside of the while(run) loop. If you want to study an example, the Apache NiFi source isn't as complex as most; here's its run method.

Upvotes: 2

Related Questions