Reputation: 31
I am learning web scraping using asyncio and aiohttp with beautifulsoup. I want to create a RESTful API to get user input, scrape the data and then show the response in json format. This is how my scraper code looks like;
import asyncio
import aiohttp
from bs4 import BeautifulSoup, SoupStrainer
class TestScraper:
def __init__(self, query):
self.query = query
async def main(self):
urls = [
f"https://books.toscrape.com/catalogue/page-{self.query}.html",
f"https://quotes.toscrape.com/page/{self.query}/",
]
def get_urls(session):
tasks = []
for url in urls:
tasks.append(session.get(url))
return tasks
async with aiohttp.ClientSession() as session:
tasks = get_urls(session)
responses = await asyncio.gather(*tasks)
for r in responses:
if (str(r.url).split(".")[0][8:]) == "books":
soup = BeautifulSoup(
await r.read(), "lxml", parse_only=SoupStrainer("article")
)
books_list = []
for books in soup.find_all("article"):
book_name = books.find("h3").find("a").get("title")
book_price = books.find("p", class_="price_color").text
books_item = {
"book_name": book_name,
"book_price": book_price,
}
books_list.append(books_item)
yield books_list
elif (str(r.url).split(".")[0][8:]) == "quotes":
soup = BeautifulSoup(
await r.read(),
"lxml",
parse_only=SoupStrainer("div", {"class": "quote"}),
)
quotes_list = []
for quotes in soup.find_all("div", class_="quote"):
quote_text = quotes.find("span", class_="text").get_text()
quote_author = quotes.find("small", class_="author").get_text()
quotes_item = {
"quote_text": quote_text,
"quote_author": quote_author,
}
quotes_list.append(quotes_item)
yield quotes_list
else:
yield "No results found"
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(TestScraper(6).main())
# asyncio.run(TestScraper({query}).main())
It's working fine but when I try to use it with FastAPI it returns errors. Even after doing some changes which I found from the web the errors still appear. Here is my FastAPI code;
import asyncio
from fastapi import FastAPI
from scrapers.books_quotes import TestScraper
app = FastAPI()
@app.get("/")
def root():
return {"message": "Hello World"}
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
return asyncio.run(TestScraper(test_query).main())
And the error I get;
asyncio.run() cannot be called from a running event loop
How to solve it?
Upvotes: 0
Views: 4980
Reputation: 31
Instead of creating a list for each URL in TestScraper code, created a single list for all URLs.
#same code as before
async with aiohttp.ClientSession() as session:
tasks = get_urls(session)
responses = await asyncio.gather(*tasks)
results = []
for r in responses:
if (str(r.url).split(".")[0][8:]) == "books":
soup = BeautifulSoup(
await r.read(), "lxml", parse_only=SoupStrainer("article")
)
for books in soup.find_all("article"):
book_name = books.find("h3").find("a").get("title")
book_price = books.find("p", class_="price_color").text
books_item = {
"book_name": book_name,
"book_price": book_price,
}
results.append(books_item)
elif (str(r.url).split(".")[0][8:]) == "quotes":
soup = BeautifulSoup(
await r.read(),
"lxml",
parse_only=SoupStrainer("div", {"class": "quote"}),
)
for quotes in soup.find_all("div", class_="quote"):
quote_text = quotes.find("span", class_="text").get_text()
quote_author = quotes.find("small", class_="author").get_text()
quotes_item = {
"quote_text": quote_text,
"quote_author": quote_author,
}
results.append(quotes_item)
else:
results.append({"error": f"No results found for {r.url}"})
yield results
#print(results)
#same code as before
And thanks to @mkrieger1 changed the FastAPI file i.e. main.py code as shown below;
#same code as before
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
async for results in TestScraper(test_query).main():
return results
And now everything works fine. Thanks for reading and have a nice day.
Upvotes: 0
Reputation: 23310
asyncio.run
is meant as the top-level entry point for the async code, which the FastAPI app (or some other framework which you use to run it) should already call for you.
Normally, to run an async def
function (= coroutine) from within async code, simply await
it.
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
return await TestScraper(test_query).main()
In your case, TestScraper.main
is not a normal coroutine but an asynchronous generator (because it uses yield
statements). You run it by using it in an async for
loop.
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
async for result in TestScraper(test_query).main():
# do something with result
Upvotes: 2