Reputation: 387
E.g. i want to compose stream of 1, 2, 3
and 4, 5
in single one, so result should be: 1, 2, 3, 4, 5
. In other words: if first source is exhausted - get elements from second one. My closest attempt, which unfortunately does not preserve items order, is:
val a = streamEnvironment.fromElements(1, 2, 3)
val b = streamEnvironment.fromElements(4, 5)
val c = a.union(b)
c.map(x => println(s"X=$x")) // X=4, 5, 1, 2, 3 or something like that
Also did similar attempt with datetime included, but with same result.
Upvotes: 1
Views: 2650
Reputation: 9280
If you have N sources (not streams) that you want to order consecutively, then you could wrap them with an outer source. Something like:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@SuppressWarnings("serial")
public class SequentialSources<T> implements SourceFunction<T>, ResultTypeQueryable<T> {
private TypeInformation<T> type;
private SourceFunction<T>[] sources;
private volatile boolean isRunning = true;
public SequentialSources(TypeInformation<T> type, SourceFunction<T>...sources) {
this.type = type;
this.sources = sources;
}
@Override
public void run(SourceContext<T> context) throws Exception {
int index = 0;
while (isRunning) {
sources[index++].run(context);
isRunning = index < sources.length;
}
}
@Override
public void cancel() {
isRunning = false;
for (SourceFunction<T> source : sources) {
source.cancel();
}
}
@Override
public TypeInformation<T> getProducedType() {
return type;
}
}
Upvotes: 3
Reputation: 18987
This is not possible right now, at least not with the high level DataStream API.
It might be possible to implement a low-level operator that first reads on input and then the other input. However, this will completely block one input which does not work well with the way that Flink handles watermarks and performs checkpoints.
In the future, this will be possible using so-called side inputs.
Upvotes: 4
Reputation: 1294
You could achieve this approximately through a flatMap
with a heap buffer in it. But actually, it depends some issues. If, e.g. elements from some input stream are delayed, the output will not be strictly ordered.
def process(): StreamExecutionEnvironment = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
implicit val typeInfo = TypeInformation.of(classOf[Int])
implicit val typeInfo2 = TypeInformation.of(classOf[Unit])
val BUF_SIZE = 3
val STREAM_NUM = 2
val a = env.fromElements(1, 2, 3, 3, 4, 5, 6, 7, Int.MaxValue)
val b = env.fromElements(4, 5, 9, 10 , 11, 13, Int.MaxValue)
val c = a.union(b).flatMap(new FlatMapFunction[Int, Int] {
val heap = collection.mutable.PriorityQueue[Int]().reverse
var endCount = 0
override def flatMap(value: Int, out: Collector[Int]): Unit = {
if (value == Int.MaxValue) {
endCount += 1
if (endCount == STREAM_NUM) {
heap.foreach(out.collect)
}
}
else {
heap += value
while (heap.size > BUF_SIZE) {
val v = heap.dequeue()
out.collect(v)
}
}
}
}).setParallelism(1)
c.map(x => println(s"X=$x")).setParallelism(1)
env
}
Upvotes: 1