Reputation: 2592
I have the following code (Python 2.7):
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
NUM_WORKERS = cpu_count()
c = 0
while True:
results = []
pages = [i for i in range(c, c + NUM_WORKERS)]
with ThreadPoolExecutor(NUM_WORKERS) as executor:
futures = [executor.submit(bro.get_content, page) for page in pages]
for future in as_completed(futures):
results.extend(future.result())
if len(results) < 1:
break
print("Get batch {0} with {1} results".format(c, len(results)))
df = DataFrame(results)
df.to_sql(sql_table_stage, engine, sql_schema, if_exists='append', index=False)
print("Pages {0} to {1} was insert".format(c, c + NUM_WORKERS))
c += NUM_WORKERS
The code runs and the action is executed as expected (Tough incredibly slow!). The thing is that when I look over my logs I see:
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,513] {bash_operator.py:70} INFO - Tmp dir root location:
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: /tmp
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpGyRCX2//tmp/airflowtmpGyRCX2/importwQaRgB
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:88} INFO - Running command: python /home/ubuntu/airflow/scripts/import.py
[2018-08-21 01:06:54,519] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,518] {bash_operator.py:97} INFO - Output:
[2018-08-21 05:45:48,758] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 0 with 20000 results
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 0 to 4 was insert
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 4 with 19996 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 4 to 8 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 8 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 8 to 12 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 12 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 12 to 16 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 16 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 16 to 20 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 20 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 20 to 24 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 24 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 24 to 28 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 28 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 28 to 32 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 32 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 32 to 36 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 36 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 36 to 40 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 40 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 40 to 44 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 44 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 44 to 48 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 48 with 19997 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 48 to 52 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 52 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 52 to 56 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 56 with 20000 results
[2018-08-21 05:45:48,764] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 56 to 60 was insert
cpu_count()
which is 4.This is a code I inherited I'm trying to get the sense of it before I'm converting the script to Python 3.
Upvotes: 1
Views: 873
Reputation: 2910
When creating a thread pool in python, the threads are user level threads and are run on the same processor, due to Global Interpreter Lock(GIL) in python. As only one thread can control the python interpreter at a time. So, using (python)threads we don't get any real concurrency in data-intensive tasks.
How to solve this? Easy. Spawn multiple python processes running on different processors(each with its own interpreter). This is where the multi processing(mp) module is used, to spawn multiple processes from the parent python process in which it is called.
You can verify this by running htop(on linux, mac) and analysing the number of python processes. In case of mp module, they all will have the same name as the parent script where the pool.map function is called.
Upvotes: 0