Theo
Theo

Reputation: 84

Combine aiohttp with multiprocessing

I am making a script that gets the HTML of almost 20 000 pages and parses it to get just a portion of it.

I managed to get the 20 000 pages' content in a dataframe with aynchronous requests using asyncio and aiohttp but this script still wait for all the pages to be fetched to parse them.

async def get_request(session, url, params=None):
    async with session.get(url, headers=HEADERS, params=params) as response:
        return await response.text()


async def get_html_from_url(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_request(session, url))
        html_page_response = await asyncio.gather(*tasks)
    return html_page_response


html_pages_list = asyncio_loop.run_until_complete(get_html_from_url(urls))

Once I have the content of each page I managed to use multiprocessing's Pool to parallelize the parsing.

get_whatiwant_from_html(html_content):

    parsed_html = BeautifulSoup(html_content, "html.parser")
    clean = parsed_html.find("div", class_="class").get_text()

    # Some re.subs
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)  

    return clean


pool = Pool(4)
what_i_want = pool.map(get_whatiwant_from_html, html_content_list)

This code mixes asynchronously the fetching and the parsing but I would like to integrate multiprocessing into it:

async def process(url, session):
    html = await getRequest(session, url)
    return await get_whatiwant_from_html(html)

async def dispatch(urls):
    async with aiohttp.ClientSession() as session:
        coros = (process(url, session) for url in urls)
        return await asyncio.gather(*coros)

result = asyncio.get_event_loop().run_until_complete(dispatch(urls))

Is there any obvious way to do this? I thought about creating 4 processes that each run the asynchronous calls but the implementation looks a bit complex and I'm wondering if there is another way.

I am very new to asyncio and aiohttp so if you have anything to advise me to read to get a better understanding, I will be very happy.

Upvotes: 2

Views: 4984

Answers (2)

Danyla Hulchuk
Danyla Hulchuk

Reputation: 451

You can use ProcessPoolExecutor.

With run_in_executor you can do IO in your main asyncio process.

But your heavy CPU calculations in separate processes.

async def get_data(session, url, params=None):
    loop = asyncio.get_event_loop()
    async with session.get(url, headers=HEADERS, params=params) as response:
        html = await response.text()
        data = await loop.run_in_executor(None, partial(get_whatiwant_from_html, html))
        return data

async def get_data_from_urls(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_data(session, url))
        result_data = await asyncio.gather(*tasks)
    return result_data

executor = concurrent.futures.ProcessPoolExecutor(max_workers=10)
asyncio_loop.set_default_executor(executor)
results = asyncio_loop.run_until_complete(get_data_from_urls(urls))

Upvotes: 2

Prayson W. Daniel
Prayson W. Daniel

Reputation: 15568

You can increase your parsing speed by changing your BeautifulSoup parser from html.parser to lxml which is by far the fastest, followed by html5lib. html.parser is the slowest of them all.

Your bottleneck is not processing issue but IO. You might want multiple threads and not process:

E.g. here is a template program that scraping and sleep to make it slow but ran in multiple threads and thus complete task faster.

from concurrent.futures import ThreadPoolExecutor
import random,time
from bs4 import BeautifulSoup as bs
import requests

URL = 'http://quotesondesign.com/wp-json/posts'

def quote_stream():
    '''
    Quoter streamer
    '''
    param = dict(page=random.randint(1, 1000))
    quo = requests.get(URL, params=param)

    if quo.ok:
        data = quo.json()
        author = data[0]['title'].strip()

        content = bs(data[0]['content'], 'html5lib').text.strip()

        print(f'{content}\n-{author}\n')

    else:
        print('Connection Issues :(')

def multi_qouter(workers=4):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        _ = [executor.submit(quote_stream) for i in range(workers)]

if __name__ == '__main__':
    now = time.time()

    multi_qouter(workers=4)

    print(f'Time taken {time.time()-now:.2f} seconds')

In your case, create a function that performs the task you want from starry to finish. This function would accept url and necessary parameters as arguments. After that create another function that calls the previous function in different threads, each thread having its our url. So instead of i in range(..), for url in urls. You can run 2000 threads at once, but I would prefer chunks of say 200 running parallel.

Upvotes: 0

Related Questions