xendi
xendi

Reputation: 2522

Python 3 multiprocess scaper: BrokenPipeError (Broken pipe)

I have a Python 3.7.4 script using multiprocessing & threading for scraping an API. It uses a GET request to get the stats of a hashtag and then on that page, gathers more hashtags to add to the list be scraped from the API. It works for several minutes and then starts printing many of the message:

Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/user/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

The code is here:

#!/usr/bin/env python

import sys
import requests
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse
from multiprocessing import Process, Queue, current_process, Manager
from threading import Lock
import multiprocessing
import numpy as np
import json
from pprint import pprint


NUM_WORKERS = multiprocessing.cpu_count()

# global variables
process_queue = Queue()
manager = Manager()
master_dict = manager.dict()

seed_hashes = [
    'amwriting', 'lovewriting', 'poems', 'poemsporn', 'poet', 'poetry', 'quotes', 'word', 'wordgasm'
]

endpoint = 'https://example.com/api'


class MultiThreadScraper:  # class does all processing

    def __init__(self, base_url, masterdict):
        self.base_url = base_url
        self.root_url = '{}://{}'.format(urlparse(self.base_url).scheme, urlparse(self.base_url).netloc)
        self.pool = ThreadPoolExecutor(max_workers=500)  # was 50
        self.scraped_pages = set([])
        self.to_crawl = Queue()
        self.to_crawl.put(base_url)
        self.dict = masterdict
        self.isascii = lambda s: len(s) == len(s.encode())
        self.file_lock = Lock()

    def parse_tags(self, source):
        j = json.loads(source)
        self.file_lock.acquire()
        try:
            with open("graph.json", "a") as fi:
                fi.write(json.dumps(j))
                fi.close()
        finally:
            self.file_lock.release()
        if j['exists']:
            for node in j['nodes']:
                tag = node['id']
                uri = f"{endpoint}{tag}"
                if self.isascii(tag) and tag.isalnum() and uri not in self.scraped_pages:
                    print(f"Found: {tag}")
                    self.to_crawl.put(uri)
                    self.dict.append(uri)
            for edge in j['edges']:
                a = edge['a']
                uri = f"{endpoint}{a}"
                if self.isascii(a) and a.isalnum() and uri not in self.scraped_pages:
                    print(f"Found: {a}")
                    self.to_crawl.put(uri)
                    self.dict.append(uri)
                b = edge['b']
                uri = f"{endpoint}{b}"
                if self.isascii(b) and b.isalnum() and uri not in self.scraped_pages:
                    print(f"Found: {b}")
                    self.to_crawl.put(uri)
                    self.dict.append(uri)

    def scrape_info(self, html):
        return

    def post_scrape_callback(self, res):
        result = res.result()
        if result and result.status_code == 200:
            self.parse_tags(result.text)
            #self.scrape_info(result.text)
        elif result and result.status_code != 200:
            pprint('http_error')

    def scrape_page(self, url):
        try:
            res = requests.get(url, timeout=(3, 10))  # was 30
            return res
        except requests.RequestException:
            return

    def run_scraper(self):
        while True:
            try:
                target_url = self.to_crawl.get(timeout=3)  # was 60
                if target_url not in self.scraped_pages:
                    print("Scraping URL: {}".format(target_url))
                    self.scraped_pages.add(target_url)
                    job = self.pool.submit(self.scrape_page, target_url)
                    job.add_done_callback(self.post_scrape_callback)
            except Empty:
                return
            except Exception as e:
                print(e)
                continue


def chunks(n, page_list):
    """Splits the list into n chunks"""
    return np.array_split(page_list, n)


def threader(urls, master):
    for url in urls:
        s = MultiThreadScraper(url, master)
        s.run_scraper()


if __name__ == '__main__':
    # set up first dict
    master_dict[0] = manager.list()

    url_list = []
    for seed in seed_hashes:
        url_list.append(f"{endpoint}{seed}")
    url_list = list(set(url_list))
    clean_url_list = url_list  # = parse(url_list, master_dict[0])

    # split urls up into chunks if more than one
    chunk = chunks(NUM_WORKERS, clean_url_list)
    procs = []

    # adjust actual size of processes if smaller to conserve cpu for threading
    size = NUM_WORKERS
    if len(clean_url_list) < NUM_WORKERS:
        size = len(clean_url_list)

    # create all processes
    for i in range(size):
        master_dict[i] = manager.list()
        print(chunk[i])
        p = Process(target=threader, args=(chunk[i], master_dict[i]))
        procs.append(p)
        p.start()

    # join all created processes
    for p in procs:
        p.join()

    #
    for i in range(size):
        master_dict[i] = list(set(master_dict[i]))
        print(len(master_dict[i]))
        for domain in master_dict[i]:
            print('Found: ' + domain)

Here is sample API data:

[{
    "query": "inspiration",
    "exists": true,
    "nodes": [{
            "id": "dedication", 
            "relevance": 0.5637108009450206, 
            "weight": 0.45454545454545453, 
            "x": 0.21844181188954195, 
            "y": 0.05872978594377127
        }, 
        {
            "id": "ifbb", 
            "relevance": 0.41447057552244726, 
            "weight": 0.18181818181818182, 
            "x": 0.04110103005950671, 
            "y": 0.031126457980437047
    }],
    "edges": [{
            "a": "dedication", 
            "b": "ifbb", 
            "id": "dedication#ifbb", 
            "weight": 0.13807764918076493
        }, 
        {
            "a": "dedication", 
            "b": "cleaneating", 
            "id": "dedication#cleaneating", 
            "weight": 0.05179527283184793
    }]
}]

How can I fix this error?

Edit: I have also tried (With no success):

Upvotes: 1

Views: 3943

Answers (1)

Dalton Pearson
Dalton Pearson

Reputation: 570

You may be getting rate limited because you are sending so many request so quickly. try adding a delay between requests. See: https://stackoverflow.com/a/28040423/9872244 - The broken pipe error usually occurs if your request is blocked or takes too long and after request-side timeout, it'll close the connection and then, when the respond-side (server) tries to write to the socket, it will throw a pipe broken error.

Upvotes: 2

Related Questions