Reputation: 940
Could you explain differences between task slot and parallelism in Apache Flink v1.9?
Here is the my understanding so far
numberOfSlot can be >= numberOfCpuCores
?If task slots mean thread, this may lead "shared to access data problem, race condition" etc..? This is the my first question.
apply()
method because of threads environment?:
public class AverageSensorReadings {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int paralellism = env.getParallelism();
int maxParal = env.getMaxParallelism();
// ingest sensor stream
DataStream < SensorReading > sensorData = env
// SensorSource generates random temperature readings
.addSource(new SensorSource())
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner());
DataStream < SensorReading > avgTemp = sensorData
// convert Fahrenheit to Celsius using and inlined map function
.map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
// organize stream by sensor
.keyBy(r -> r.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(4))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager());
// print result stream to standard out
//avgTemp.print();
System.out.println("paral: " + paralellism + " max paral: " + maxParal);
// execute application
env.execute("Compute average sensor temperature");
}
public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {
/**
* apply() is invoked once for each window.
*
* @param sensorId the key (sensorId) of the window
* @param window meta data for the window
* @param input an iterable over the collected sensor readings that were assigned to the window
* @param out a collector to emit results from the function
*/
@Override
public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
System.out.println("APPLY FUNCTION START POINT");
System.out.println("sensorId: " + sensorId + "\n");
// compute the average temperature
int cnt = 0;
double sum = 0.0;
for (SensorReading r: input) {
System.out.println("collected item: " + r);
cnt++;
sum += r.temperature;
}
double avgTemp = sum / cnt;
System.out.println("APPLY FUNCTION END POINT");
System.out.println("----------------------------\n\n");
// emit a SensorReading with the average temperature
out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
}
}
}
Upvotes: 7
Views: 9485
Reputation: 43499
Typically each slot will run one parallel instance of your pipeline. The parallelism of the job is therefore the same as the number of slots required to run it. (By using slot sharing groups you can force specific tasks into their own slots, which would then increase the number of slots required.)
Each task (which comprises one or more operators chained together) runs in one Java thread.
A task manager can create as many slots as you want. Typical configurations use 1 CPU core per slot, but for pipelines with heavy processing requirements you might want to have 2 or more cores per slot, and for pipelines that are mostly idle you might go in the other direction and configure several slots per core.
All of the tasks/threads running within a task manager will simply compete for the CPU resources that the task manager can get from the machine or container that hosts it.
All state is local to the one operator instance (task) that uses it, so all access occurs within that one thread. The one place where there hypothetically could be a race condition is between the onTimer and processElement callbacks in a ProcessFunction, but these methods are synchronized, so you don't have to worry about this. Because all state access is local, this leads to high throughput, low latency, and high scalability.
In your example, if the parallelism is two, then you will have two slots independently executing the same logic on different slices of your data. If they are using state, then this will be key-partitioned state that is managed by Flink, which you can think of as a sharded key/value store.
In the case of the sensor data in time windows, you don't have to be concerned at all about the multi-threading. The keyBy will partition the data so that one instance will handle all of the events and windows for some of the sensors, and the other instance (assuming there are two) will handle the rest.
Upvotes: 19