padul
padul

Reputation: 174

parallelize a scraper and nothing happens

I try to parallelize a scraper. Unfortunately, when I execute this code it runs unusually long. Until I stop. The output is not generated either. Is there something I missed here? Is the problem that I use os.system?

First I define the function, then I load the data pool and then I enter it into the multiprocess.

All in all is what I want do like this:

def cube(x):
    return x**3
pool = mp.Pool(processes=2)
results = pool.map(cube, range(1,7))
print(results)

But this small calculation is running now for more than 5 min. So I think there is no error in the code itself. Rather how I understand multiprocessing

    from multiprocessing import Pool
    import os
    import json
    import datetime
    from dateutil.relativedelta import relativedelta
    import re
    os.chdir(r'C:\Users\final_tweets_de')
    
    p = Pool(5)
    
    import time
    
    
    
    def get_id(data_tweets):
        for i in range(len(data_tweets)):
    
            account = data_tweets[i]['user_screen_name']
            created = datetime.datetime.strptime(data_tweets[i]['date'], '%Y-%m-%d').date()
            until = created + relativedelta(days=10)
            id = data_tweets[i]['id']
            filename = re.search(r'(.*).json',file).group(1) + '_' + 'tweet_id_' +str(id)+ '_' + 'user_id_' + str(data_tweets[i]['user_id'])
    
    
            os.system('snscrape twitter-search "(to:'+account+') since:'+created.strftime("%Y-%m-%d")+' until:'+until.strftime("%Y-%m-%d")+' filter:replies" >C:\\Users\\test_'+filename)
           
        
    
    
    directory =r'C:\Users\final_tweets_de'
    path= r'C:\Users\final_tweets_de'
    for file in os.listdir(directory):
        fh = open(os.path.join(path, file),'r')
        print(file)
        
    
    
        with open(file, 'r', encoding='utf-8') as json_file:
            data_tweets = json.load(json_file)
            
            
        data_tweets = data_tweets[0:5]
        start = time.time()
        print("start")
        
       
        p.map(get_id, data_tweets)
        p.terminate()
        p.join()
        end = time.time()
        print(end - start)

Update

The reason why the code did not run is firstly the problem addressed by @Booboo. The other one is that the script has to be started via cmd when using windows, in case of muliprocessing.

Like here: Python multiprocessing example not working

Now I the key error 0. If I run the code.

import multiprocessing as mp
import os
import json
import datetime
from dateutil.relativedelta import relativedelta
import re
os.chdir(r'C:\Users\Paul\Documents\Uni\Masterarbeit\Datengewinnung\final_tweets_de')



import time


def get_id(data_tweets):
    for i in range(len(data_tweets)):
        print(i)

        account = data_tweets[i]['user_screen_name']
        created = datetime.datetime.strptime(data_tweets[i]['date'], '%Y-%m-%d').date()
        until = created + relativedelta(days=10)
        id = data_tweets[i]['id']
        filename = re.search(r'(.*).json',file).group(1) + '_' + 'tweet_id_' +str(id)+ '_' + 'user_id_' + str(data_tweets[i]['user_id'])
        
        try:
            os.system('snscrape twitter-search "(to:'+account+') since:'+created.strftime("%Y-%m-%d")+' until:'+until.strftime("%Y-%m-%d")+' filter:replies" >C:\\Users\\Paul\\Documents\\Uni\\Masterarbeit\\Datengewinnung\\Tweets_antworten\\Antworten\\test_'+filename)
        except:
            continue
    


    


directory =r'C:\Users\Paul\Documents\Uni\Masterarbeit\Datengewinnung\final_tweets_de'
path= r'C:\Users\Paul\Documents\Uni\Masterarbeit\Datengewinnung\final_tweets_de'

for file in os.listdir(directory):
    fh = open(os.path.join(path, file),'r')
    print(file)

    


    with open(file, 'r', encoding='utf-8') as json_file:
        data_tweets = json.load(json_file)
        
       
    data_tweets = data_tweets[0:2]
    start = time.time()
    print("start")
    if __name__ == '__main__': 
        pool = mp.Pool(processes=2)
        pool.map(get_id, data_tweets)
        

    end = time.time()
    print(end - start)
    del(data_tweets)

Output:

(NLP 2) C:\Users\Paul\Documents\Uni\Masterarbeit\Datengewinnung\Tweets_antworten>python scrape_id_antworten_parallel.py
corona.json
start
corona.json
corona.json
start
0.0009980201721191406
coronavirus.json
start
0.0
coronavirus.json
start
0.0
covid.json
start
0.0
SARS_CoV.json
start
0.0
0
0
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "C:\Users\Paul\Anaconda3\envs\NLP 2\lib\multiprocessing\pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\Paul\Anaconda3\envs\NLP 2\lib\multiprocessing\pool.py", line 44, in mapstar
    return list(map(*args))
  File "C:\Users\Paul\Documents\Uni\Masterarbeit\Datengewinnung\Tweets_antworten\scrape_id_antworten_parallel.py", line 25, in get_id
    account = data_tweets[i]['user_screen_name']
KeyError: 0
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "scrape_id_antworten_parallel.py", line 60, in <module>
    pool.map(get_id, data_tweets)
  File "C:\Users\Paul\Anaconda3\envs\NLP 2\lib\multiprocessing\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\Paul\Anaconda3\envs\NLP 2\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
KeyError: 0

Upvotes: 0

Views: 344

Answers (2)

Maciej Skorski
Maciej Skorski

Reputation: 3354

Let me share a working example on retrieving futurists tweets. Note that we want multithreading rather than multiprocessing to paralellize I/O operations.

# install social media scrapper: !pip3 install snscrape
import snscrape.modules.twitter as sntwitter
import itertools
import multiprocessing as mp
import datetime
import pandas as pd

start_date = datetime.datetime(2023,2,1,tzinfo=datetime.timezone.utc) # from when
attributes = ('date','url','rawContent') # what attributes to keep

    
def get_tweets(username,n_tweets=None,attributes=attributes):
    tweets = sntwitter.TwitterSearchScraper(f'from:{username}').get_items() # invoke the scrapper
    tweets = itertools.islice(tweets,n_tweets) # stopped when the count reached
    tweets = itertools.takewhile(lambda t:t.date>=start_date, tweets) # stop when date passed
    tweets = map(lambda t: (username,)+tuple(getattr(t,a) for a in attributes),tweets) # keep only attributes needed
    tweets = list(tweets) # the result has to be pickle'able
    return tweets
    
# prepare a list of accounts to scrape
user_names = ['kevin2kelly','briansolis','PeterDiamandis','Richard_Florida']

# parallelise queries for speed ! 
with mp.Pool(4) as p:
    results = p.map(get_tweets, user_names) 
    # combine results
    results = list(itertools.chain(*results))

Upvotes: 0

Booboo
Booboo

Reputation: 44043

I can see from path= r'C:\Users\final_tweets_de' that your platform is Windows. When you do multiprocessing under Windows your code that creates the sub-processes must absolutely be executed in a block as follows:

import multiprocessing as mp

def cube(x):
    return x**3

if __name__ == '__main__':
    pool = mp.Pool(processes=2)
    results = pool.map(cube, range(1,7))
    print(results)

Otherwise you get into a recursive loop where the sub-process will attempt to create a new pool and new sub-processes ad-infinitum. Fix this problem and re-test. The easiest way is to wrap your code in a function (call it main for example) and then add:

if __name__ == '__main_':
    main()

Also, why are you only using 2 processes or 5 in your actual example. By not specifying an argument to the Pool constructor, you will create a pool size equal to the number of processors actually available on your computer. Not a bad default that.

Upvotes: 1

Related Questions