monstereo
monstereo

Reputation: 940

Apache Flink, number of Task Slot vs env.setParallelism

Could you explain differences between task slot and parallelism in Apache Flink v1.9?


public class AverageSensorReadings {
 public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  int paralellism = env.getParallelism();
  int maxParal = env.getMaxParallelism();

  // ingest sensor stream
  DataStream < SensorReading > sensorData = env
   // SensorSource generates random temperature readings
   .addSource(new SensorSource())
   // assign timestamps and watermarks which are required for event time
   .assignTimestampsAndWatermarks(new SensorTimeAssigner());

  DataStream < SensorReading > avgTemp = sensorData
   // convert Fahrenheit to Celsius using and inlined map function
   .map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
   // organize stream by sensor
   .keyBy(r -> r.id)
   // group readings in 1 second windows
   .timeWindow(Time.seconds(4))
   // compute average temperature using a user-defined function
   .apply(new TemperatureAverager());

  // print result stream to standard out
  //avgTemp.print();
  System.out.println("paral: " + paralellism + " max paral: " + maxParal);
  // execute application
  env.execute("Compute average sensor temperature");
 }

 public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {

  /**
   * apply() is invoked once for each window.
   *
   * @param sensorId the key (sensorId) of the window
   * @param window meta data for the window
   * @param input an iterable over the collected sensor readings that were assigned to the window
   * @param out a collector to emit results from the function
   */
  @Override
  public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
   System.out.println("APPLY FUNCTION START POINT");
   System.out.println("sensorId: " + sensorId + "\n");

   // compute the average temperature
   int cnt = 0;
   double sum = 0.0;
   for (SensorReading r: input) {
    System.out.println("collected item: " + r);
    cnt++;
    sum += r.temperature;
   }
   double avgTemp = sum / cnt;
   System.out.println("APPLY FUNCTION END POINT");
   System.out.println("----------------------------\n\n");
   // emit a SensorReading with the average temperature
   out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
  }
 }
}

Upvotes: 7

Views: 9485

Answers (1)

David Anderson
David Anderson

Reputation: 43499

Typically each slot will run one parallel instance of your pipeline. The parallelism of the job is therefore the same as the number of slots required to run it. (By using slot sharing groups you can force specific tasks into their own slots, which would then increase the number of slots required.)

Each task (which comprises one or more operators chained together) runs in one Java thread.

A task manager can create as many slots as you want. Typical configurations use 1 CPU core per slot, but for pipelines with heavy processing requirements you might want to have 2 or more cores per slot, and for pipelines that are mostly idle you might go in the other direction and configure several slots per core.

All of the tasks/threads running within a task manager will simply compete for the CPU resources that the task manager can get from the machine or container that hosts it.

All state is local to the one operator instance (task) that uses it, so all access occurs within that one thread. The one place where there hypothetically could be a race condition is between the onTimer and processElement callbacks in a ProcessFunction, but these methods are synchronized, so you don't have to worry about this. Because all state access is local, this leads to high throughput, low latency, and high scalability.

In your example, if the parallelism is two, then you will have two slots independently executing the same logic on different slices of your data. If they are using state, then this will be key-partitioned state that is managed by Flink, which you can think of as a sharded key/value store.

In the case of the sensor data in time windows, you don't have to be concerned at all about the multi-threading. The keyBy will partition the data so that one instance will handle all of the events and windows for some of the sensors, and the other instance (assuming there are two) will handle the rest.

Upvotes: 19

Related Questions