Reputation: 3665
I have a workflow constructed in Flink that consists of a custom source, a series of maps/flatmaps and a sink.
The run() method of my custom source iterates through the files stored in a folder and collects, through the collect() method of the context, the name and the contents of each file (I have a custom object that stores this info in two fields).
I then have a series of maps/flatmaps transforming such objects which are then printed into files using a custom sink. The execution graph as this is produced in the Flink's Web UI is the following:
I have a cluster or 2 workers setup to have 6 slots each (they both have 6 cores, too). I set the parallelism to 12. From the execution graph I see that the parallelism of the source is 1, while the rest of the workflow has parallelism 12.
When I run the workflow (I have around 15K files in the dedicated folder) I monitor, using htop, the resources of my workers. All the cores reach up to 100% utilisation for most of the time but every roughly 30 minutes or so, 8-10 of the cores become idle for about 2-3 minutes.
My questions are the following:
I understand that the source runs having parallelism 1 which I believe is normal when reading from a local storage (my files are located into the same directory in each worker as I don't know which worker will be selected to execute the source). Is it normal indeed? Could you please explain why this is the case?
The rest of my workflow is executed having parallelism 12 which looks to be correct as by checking the task managers' logs I get prints from all the slots (e.g., .... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ....
, .... [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO ....
, etc.)). What I don't understand though is if one slot is executing the source role and I have 12 slots in my cluster, how is the rest of the workflow executed by 12 slots? Is one slot acting for both the source and one instance of the rest of the workflow? If yes, how are the resources for this specific slot allocated? Would it be possible for someone to explain the steps undergoing in this workflow? For example (this might be wrong):
I believe what I describe above is wrong but I give it as an example to better explain my question
Upvotes: 0
Views: 1975
Reputation: 9245
To answer the specific question about parallelizing your read, I would do the following...
RichSourceFunction
.open()
method, call getRuntimeContext().getNumberOfParallelSubtasks()
to get the total parallelism and call getRuntimeContext().getIndexOfThisSubtask()
to get the index of the sub-task being initialized.run()
method, as you iterate over files, get the hashCode()
of each file name, modulo the total parallelism. If this is equal to your sub-task's index, then you process it.In this way you can spread the work out over 12 sub-tasks, without having sub-tasks try to process the same file.
Upvotes: 1
Reputation: 1641
Upvotes: 1