Albert Bikeev
Albert Bikeev

Reputation: 387

How to concatenate two streams in Apache Flink

E.g. i want to compose stream of 1, 2, 3 and 4, 5 in single one, so result should be: 1, 2, 3, 4, 5. In other words: if first source is exhausted - get elements from second one. My closest attempt, which unfortunately does not preserve items order, is:

val a = streamEnvironment.fromElements(1, 2, 3)

val b = streamEnvironment.fromElements(4, 5)

val c = a.union(b)

c.map(x => println(s"X=$x")) // X=4, 5, 1, 2, 3 or something like that

Also did similar attempt with datetime included, but with same result.

Upvotes: 1

Views: 2650

Answers (3)

kkrugler
kkrugler

Reputation: 9280

If you have N sources (not streams) that you want to order consecutively, then you could wrap them with an outer source. Something like:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

@SuppressWarnings("serial")
public class SequentialSources<T> implements SourceFunction<T>, ResultTypeQueryable<T> {

    private TypeInformation<T> type;
    private SourceFunction<T>[] sources;
    private volatile boolean isRunning = true;

    public SequentialSources(TypeInformation<T> type, SourceFunction<T>...sources) {
        this.type = type;
        this.sources = sources;
    }

    @Override
    public void run(SourceContext<T> context) throws Exception {
        int index = 0;

        while (isRunning) {
            sources[index++].run(context);
            isRunning = index < sources.length;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;

        for (SourceFunction<T> source : sources) {
            source.cancel();
        }
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return type;
    }
}

Upvotes: 3

Fabian Hueske
Fabian Hueske

Reputation: 18987

This is not possible right now, at least not with the high level DataStream API.

It might be possible to implement a low-level operator that first reads on input and then the other input. However, this will completely block one input which does not work well with the way that Flink handles watermarks and performs checkpoints.

In the future, this will be possible using so-called side inputs.

Upvotes: 4

BrightFlow
BrightFlow

Reputation: 1294

You could achieve this approximately through a flatMap with a heap buffer in it. But actually, it depends some issues. If, e.g. elements from some input stream are delayed, the output will not be strictly ordered.

def process(): StreamExecutionEnvironment = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

implicit val typeInfo = TypeInformation.of(classOf[Int])
implicit val typeInfo2 = TypeInformation.of(classOf[Unit])

val BUF_SIZE = 3
val STREAM_NUM = 2

val a = env.fromElements(1, 2, 3, 3, 4, 5, 6, 7, Int.MaxValue)
val b = env.fromElements(4, 5, 9, 10 , 11, 13, Int.MaxValue)

val c = a.union(b).flatMap(new FlatMapFunction[Int, Int] {
  val heap = collection.mutable.PriorityQueue[Int]().reverse
  var endCount = 0

  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    if (value == Int.MaxValue) {
      endCount += 1

      if (endCount == STREAM_NUM) {
        heap.foreach(out.collect)
      }
    }
    else {
      heap += value

      while (heap.size > BUF_SIZE) {
        val v = heap.dequeue()
        out.collect(v)
      }
    }
  }
}).setParallelism(1)

c.map(x => println(s"X=$x")).setParallelism(1)

env
}

Upvotes: 1

Related Questions