senko
senko

Reputation: 157

How to convert Flink DataSet tuple to one column

I've a graph data like

1 2
1 4
4 1
4 2
4 3
3 2
2 3

But I couldn't find a way to convert it a one column dataset like

1
2
1
4
4
1
...

here is my code, I used scala ListBuffer, but couldn't find a way doing it in Flink DataSet

    val params: ParameterTool = ParameterTool.fromArgs(args)
    val env = ExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setGlobalJobParameters(params)
    val text = env.readTextFile(params.get("input"))
    val tupleText = text.map { line =>
      val arr = line.split(" ")
      (arr(0), arr(1))
    }

    var x: Seq[(String, String)] = tupleText.collect()
    var tempList = new ListBuffer[String]
    x.foreach(line => {
      tempList += line._1
      tempList += line._2
    })

    tempList.foreach(println)

Upvotes: 1

Views: 392

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

You can do that with flatMap:

// get some input
val input: DataSet[(Int, Int)] = env.fromElements((1, 2), (2, 3), (3, 4))

// emit every tuple element as own record
val output: DataSet[Int] = input.flatMap( (t, out) => {
  out.collect(t._1)
  out.collect(t._2)
})

// print result
output.print()

Upvotes: 1

Related Questions