Reputation: 21563
Earlier I asked about a simple hello world example for Flink. This gave me some good examples!
However I would like to ask for a more ‘streaming’ example where we generate an input value every second. This would ideally be random, but even just the same value each time would be fine.
The objective is to get a stream that ‘moves’ with no/minimal external touch.
Hence my question:
I found how to show this with generating data externally and writing to Kafka, or listening to a public source, however I am trying to solve it with minimal dependence (like starting with GenerateFlowFile in Nifi).
Upvotes: 0
Views: 2446
Reputation: 43499
Here's an example. This was constructed as an example of how to make your sources and sinks pluggable. The idea being that in development you might use a random source and print the results, for tests you might use a hardwired list of input events and collect the results in a list, and in production you'd use the real sources and sinks.
Here's the job:
/*
* Example showing how to make sources and sinks pluggable in your application code so
* you can inject special test sources and test sinks in your tests.
*/
public class TestableStreamingJob {
private SourceFunction<Long> source;
private SinkFunction<Long> sink;
public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
this.source = source;
this.sink = sink;
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> LongStream =
env.addSource(source)
.returns(TypeInformation.of(Long.class));
LongStream
.map(new IncrementMapFunction())
.addSink(sink);
env.execute();
}
public static void main(String[] args) throws Exception {
TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
job.execute();
}
// While it's tempting for something this simple, avoid using anonymous classes or lambdas
// for any business logic you might want to unit test.
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1 ;
}
}
}
Here's the RandomLongSource
:
public class RandomLongSource extends RichParallelSourceFunction<Long> {
private volatile boolean cancelled = false;
private Random random;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (!cancelled) {
Long nextLong = random.nextLong();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextLong);
}
}
}
@Override
public void cancel() {
cancelled = true;
}
}
Upvotes: 5