Reputation: 2522
I have a Python 3.7.4 script using multiprocessing & threading for scraping an API. It uses a GET request to get the stats of a hashtag and then on that page, gathers more hashtags to add to the list be scraped from the API. It works for several minutes and then starts printing many of the message:
Traceback (most recent call last):
File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
The code is here:
#!/usr/bin/env python
import sys
import requests
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse
from multiprocessing import Process, Queue, current_process, Manager
from threading import Lock
import multiprocessing
import numpy as np
import json
from pprint import pprint
NUM_WORKERS = multiprocessing.cpu_count()
# global variables
process_queue = Queue()
manager = Manager()
master_dict = manager.dict()
seed_hashes = [
'amwriting', 'lovewriting', 'poems', 'poemsporn', 'poet', 'poetry', 'quotes', 'word', 'wordgasm'
]
endpoint = 'https://example.com/api'
class MultiThreadScraper: # class does all processing
def __init__(self, base_url, masterdict):
self.base_url = base_url
self.root_url = '{}://{}'.format(urlparse(self.base_url).scheme, urlparse(self.base_url).netloc)
self.pool = ThreadPoolExecutor(max_workers=500) # was 50
self.scraped_pages = set([])
self.to_crawl = Queue()
self.to_crawl.put(base_url)
self.dict = masterdict
self.isascii = lambda s: len(s) == len(s.encode())
self.file_lock = Lock()
def parse_tags(self, source):
j = json.loads(source)
self.file_lock.acquire()
try:
with open("graph.json", "a") as fi:
fi.write(json.dumps(j))
fi.close()
finally:
self.file_lock.release()
if j['exists']:
for node in j['nodes']:
tag = node['id']
uri = f"{endpoint}{tag}"
if self.isascii(tag) and tag.isalnum() and uri not in self.scraped_pages:
print(f"Found: {tag}")
self.to_crawl.put(uri)
self.dict.append(uri)
for edge in j['edges']:
a = edge['a']
uri = f"{endpoint}{a}"
if self.isascii(a) and a.isalnum() and uri not in self.scraped_pages:
print(f"Found: {a}")
self.to_crawl.put(uri)
self.dict.append(uri)
b = edge['b']
uri = f"{endpoint}{b}"
if self.isascii(b) and b.isalnum() and uri not in self.scraped_pages:
print(f"Found: {b}")
self.to_crawl.put(uri)
self.dict.append(uri)
def scrape_info(self, html):
return
def post_scrape_callback(self, res):
result = res.result()
if result and result.status_code == 200:
self.parse_tags(result.text)
#self.scrape_info(result.text)
elif result and result.status_code != 200:
pprint('http_error')
def scrape_page(self, url):
try:
res = requests.get(url, timeout=(3, 10)) # was 30
return res
except requests.RequestException:
return
def run_scraper(self):
while True:
try:
target_url = self.to_crawl.get(timeout=3) # was 60
if target_url not in self.scraped_pages:
print("Scraping URL: {}".format(target_url))
self.scraped_pages.add(target_url)
job = self.pool.submit(self.scrape_page, target_url)
job.add_done_callback(self.post_scrape_callback)
except Empty:
return
except Exception as e:
print(e)
continue
def chunks(n, page_list):
"""Splits the list into n chunks"""
return np.array_split(page_list, n)
def threader(urls, master):
for url in urls:
s = MultiThreadScraper(url, master)
s.run_scraper()
if __name__ == '__main__':
# set up first dict
master_dict[0] = manager.list()
url_list = []
for seed in seed_hashes:
url_list.append(f"{endpoint}{seed}")
url_list = list(set(url_list))
clean_url_list = url_list # = parse(url_list, master_dict[0])
# split urls up into chunks if more than one
chunk = chunks(NUM_WORKERS, clean_url_list)
procs = []
# adjust actual size of processes if smaller to conserve cpu for threading
size = NUM_WORKERS
if len(clean_url_list) < NUM_WORKERS:
size = len(clean_url_list)
# create all processes
for i in range(size):
master_dict[i] = manager.list()
print(chunk[i])
p = Process(target=threader, args=(chunk[i], master_dict[i]))
procs.append(p)
p.start()
# join all created processes
for p in procs:
p.join()
#
for i in range(size):
master_dict[i] = list(set(master_dict[i]))
print(len(master_dict[i]))
for domain in master_dict[i]:
print('Found: ' + domain)
Here is sample API data:
[{
"query": "inspiration",
"exists": true,
"nodes": [{
"id": "dedication",
"relevance": 0.5637108009450206,
"weight": 0.45454545454545453,
"x": 0.21844181188954195,
"y": 0.05872978594377127
},
{
"id": "ifbb",
"relevance": 0.41447057552244726,
"weight": 0.18181818181818182,
"x": 0.04110103005950671,
"y": 0.031126457980437047
}],
"edges": [{
"a": "dedication",
"b": "ifbb",
"id": "dedication#ifbb",
"weight": 0.13807764918076493
},
{
"a": "dedication",
"b": "cleaneating",
"id": "dedication#cleaneating",
"weight": 0.05179527283184793
}]
}]
How can I fix this error?
Edit: I have also tried (With no success):
Upvotes: 1
Views: 3943
Reputation: 570
You may be getting rate limited because you are sending so many request so quickly. try adding a delay between requests. See: https://stackoverflow.com/a/28040423/9872244 - The broken pipe error usually occurs if your request is blocked or takes too long and after request-side timeout, it'll close the connection and then, when the respond-side (server) tries to write to the socket, it will throw a pipe broken error.
Upvotes: 2