ImbaBalboa
ImbaBalboa

Reputation: 867

Questions about parallelism with Flink on YARN cluster

Being new with Apache Flink and in a general manner with stream processing frameworks I have several questions about it and in particular with parallelism.

First this is my code :

object KafkaConsuming {

  def main(args: Array[String]) {

    // **** CONFIGURATION & PARAMETERS ****
    val params: ParameterTool = ParameterTool.fromArgs(args)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(8)
        env.getConfig.setGlobalJobParameters(params)

    // **** Kafka CONNECTION ****
    val properties = new Properties();
    properties.setProperty("bootstrap.servers", params.get("server"));

    // **** Get KAFKA source ****
    val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))

    // **** PROCESSING ****
    val logs: DataStream[MinifiedLog] = stream.map(x => LogParser2.parse(x))

    val sessions = logs.map { x => (x.timestamp, x.bytesSent, 1l, 1)}

    val sessionCnt: DataStream[(Long, Long, Long, Int)] = sessions
      .keyBy(3).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce( (x: (Long, Long, Long, Int), y: (Long, Long, Long, Int)) => (x._1, x._2 + y._2, x._3 + y._3, x._4))
      .map { z => (z._1, z._2 / 10, z._3 / 10, z._4)}


    // **** OUTPUT ****
    val output: DataStream[String] = sessionCnt.map(x => (x.toString() + "\n"))
    output.writeToSocket("X.X.X.X", 3333, new SimpleStringSchema)

    env.execute("Kafka consuming")

  }
}

When I want to run it on my cluster I run this command :

./bin/flink run -m yarn-cluster -yn 8 /directories/myjar.jar --server X.X.X.X --topic mytopic

This is working fine. Now here are my questions :

I got this in the Flink Web UI :

Flink web UI 1

1. Why are the records received always half of the records sent whereas the data volumes are the same ?

Then if I go into the details for the windowing :

Flink web UI 2 Obviously all the process is done on my slave 4 and on only one thread ! The same is happening with the source. Only one thread is used to receive the data.

2. Why Flink is not using all the thread possible for that step ?

I noticed that the source, the windowing and the sink are processed by a different slave but still I wanted that the process was done in parallel on my cluster.

I read on this post : https://stackoverflow.com/a/32329010/5035392 that effectively if the Kafka source has only one partition (which is my case), Flink cannot share the task on the different nodes. However, my window processing should be able to do it ?

I am sorry if these are trivial questions. I am not sure if what I have done wrong is with Flink or my cluster configuration. Thank you.

Upvotes: 0

Views: 274

Answers (1)

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3432

Ad. 2 All values for the same key are processed on a single TaskManager. In your case each element of sessions.keyBy(3) stream has the same key -> 1 therefore all calculations are performed in a single task slot.

Upvotes: 2

Related Questions