micBighne.98
micBighne.98

Reputation: 31

How to use asyncio, aiohttp web scraper with fastapi?

I am learning web scraping using asyncio and aiohttp with beautifulsoup. I want to create a RESTful API to get user input, scrape the data and then show the response in json format. This is how my scraper code looks like;

import asyncio
import aiohttp
from bs4 import BeautifulSoup, SoupStrainer

class TestScraper:
    def __init__(self, query):
        self.query = query

    async def main(self):
        urls = [
            f"https://books.toscrape.com/catalogue/page-{self.query}.html",
            f"https://quotes.toscrape.com/page/{self.query}/",
        ]

        def get_urls(session):
            tasks = []
            for url in urls:
                tasks.append(session.get(url))
            return tasks

        async with aiohttp.ClientSession() as session:
            tasks = get_urls(session)
            responses = await asyncio.gather(*tasks)
            for r in responses:
                if (str(r.url).split(".")[0][8:]) == "books":
                    soup = BeautifulSoup(
                        await r.read(), "lxml", parse_only=SoupStrainer("article")
                    )
                    books_list = []
                    for books in soup.find_all("article"):
                        book_name = books.find("h3").find("a").get("title")
                        book_price = books.find("p", class_="price_color").text
                        books_item = {
                            "book_name": book_name,
                            "book_price": book_price,
                        }
                        books_list.append(books_item)
                    yield books_list

                elif (str(r.url).split(".")[0][8:]) == "quotes":
                    soup = BeautifulSoup(
                        await r.read(),
                        "lxml",
                        parse_only=SoupStrainer("div", {"class": "quote"}),
                    )
                    quotes_list = []
                    for quotes in soup.find_all("div", class_="quote"):
                        quote_text = quotes.find("span", class_="text").get_text()
                        quote_author = quotes.find("small", class_="author").get_text()
                        quotes_item = {
                            "quote_text": quote_text,
                            "quote_author": quote_author,
                        }
                        quotes_list.append(quotes_item)
                    yield quotes_list

                else:
                    yield "No results found"

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(TestScraper(6).main())
# asyncio.run(TestScraper({query}).main())

It's working fine but when I try to use it with FastAPI it returns errors. Even after doing some changes which I found from the web the errors still appear. Here is my FastAPI code;

import asyncio
from fastapi import FastAPI

from scrapers.books_quotes import TestScraper

app = FastAPI()

@app.get("/")
def root():
    return {"message": "Hello World"}

@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
    return asyncio.run(TestScraper(test_query).main())

And the error I get;

asyncio.run() cannot be called from a running event loop

How to solve it?

Upvotes: 0

Views: 4980

Answers (2)

micBighne.98
micBighne.98

Reputation: 31

Instead of creating a list for each URL in TestScraper code, created a single list for all URLs.

#same code as before
        async with aiohttp.ClientSession() as session:
            tasks = get_urls(session)
            responses = await asyncio.gather(*tasks)
            results = []
            for r in responses:
                if (str(r.url).split(".")[0][8:]) == "books":
                    soup = BeautifulSoup(
                        await r.read(), "lxml", parse_only=SoupStrainer("article")
                    )
                    for books in soup.find_all("article"):
                        book_name = books.find("h3").find("a").get("title")
                        book_price = books.find("p", class_="price_color").text
                        books_item = {
                            "book_name": book_name,
                            "book_price": book_price,
                        }
                        results.append(books_item)
                elif (str(r.url).split(".")[0][8:]) == "quotes":
                    soup = BeautifulSoup(
                        await r.read(),
                        "lxml",
                        parse_only=SoupStrainer("div", {"class": "quote"}),
                    )
                    for quotes in soup.find_all("div", class_="quote"):
                        quote_text = quotes.find("span", class_="text").get_text()
                        quote_author = quotes.find("small", class_="author").get_text()
                        quotes_item = {
                            "quote_text": quote_text,
                            "quote_author": quote_author,
                        }
                        results.append(quotes_item)
                else:
                    results.append({"error": f"No results found for {r.url}"})
            yield results
            #print(results)
#same code as before

And thanks to @mkrieger1 changed the FastAPI file i.e. main.py code as shown below;

#same code as before
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
    async for results in TestScraper(test_query).main():
        return results

And now everything works fine. Thanks for reading and have a nice day.

Upvotes: 0

mkrieger1
mkrieger1

Reputation: 23310

asyncio.run is meant as the top-level entry point for the async code, which the FastAPI app (or some other framework which you use to run it) should already call for you.

Normally, to run an async def function (= coroutine) from within async code, simply await it.

@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
    return await TestScraper(test_query).main()

In your case, TestScraper.main is not a normal coroutine but an asynchronous generator (because it uses yield statements). You run it by using it in an async for loop.

@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
    async for result in TestScraper(test_query).main():
        # do something with result

Upvotes: 2

Related Questions