user1564381
user1564381

Reputation:

Apache Flink: How to create two datasets from one dataset using Flink DataSet API

I'm writing an application using DataSet API of Flink 0.10.1. Can I get multiple collectors using a single operator in Flink?

What I want to do is something like below:

val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
  (iterator, collector1, collector2) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector1.collect(elem1)
      collector2.collect(elem2)
    }
  } 
} 

Currently I'm calling mapPartition twice to make two datasets from one source dataset.

val lines = env.readTextFile(...)
val out_small = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem1)
    }
  } 
}
val out_large = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem2)
    }
  } 
}

As doParsing function is quite expensive, I want to call it just once per each line.

p.s. I would be very appreciated if you can let me know other approaches to do this kind of stuff in a simpler way.

Upvotes: 2

Views: 677

Answers (2)

drojas
drojas

Reputation: 373

Not sure if this was possible when this question was posted 7 years ago but today you can achieve this with Side Outputs. See https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/

Upvotes: 0

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Flink does not support multiple collectors. However, you can change the output of your parsing step by adding an additional field that indicates the output type:

val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(0, elem1) // 0 indicates small
      collector.collect(1, elem2) // 1 indicates large
    }
  } 
} 

Next you consume the output intermediate twice and filter each for the first attribute. The first filter filters for 0 the second filter for 1 (you an also add a projection to get rid of the first attribute).

               +---> filter("0") --->
               | 
intermediate --+
               | 
               +---> filter("1") --->

Upvotes: 7

Related Questions