Mostafa Bouzari
Mostafa Bouzari

Reputation: 10240

AssertionError: group argument must be None for now in a python library

I'm trying to use a library called gdelt which simply downloads data from gdelt website or from Google Query i'm not sure. for installing and other info pls visit or

pip install gdeltPyR

You can also install directly from github

bash


pip install git+https://github.com/linwoodc3/gdeltPyR

it's been a long time that anyone has updated it and unfortunately i need it for my master thesis so it would be a huge help if i can fix it some how.

if we want to send a request for events or mentions table it would work but unfortunately for gkg table it doesn't work.

if you send the request for only one day it works like a charm.

results = gd.Search('2016 10 19',table='gkg')

but when i set coverage=True or when i query for a time period, it returns this error AssertionError: group argument must be None for now.

The Code that cuases the error:

results = gd.Search(['2016 10 19','2023 01 22'],table='gkg')

the whole error with Traceback:

File [c:\Users\\anaconda3\envs\myenv\Lib\site-packages\gdelt\base.py:634](file:///C:/Users//anaconda3/envs/myenv/Lib/site-packages/gdelt/base.py:634), in gdelt.Search(self, date, table, coverage, translation, output, queryTime, normcols)
    630     downloaded_dfs = list(pool.imap_unordered(eventWork,
    631                                               self.download_list))
    632 else:
--> 634     pool = NoDaemonProcessPool(processes=cpu_count())
    635     downloaded_dfs = list(pool.imap_unordered(_mp_worker,
    636                                               self.download_list,
    637                                               ))
    638 pool.close()

File [c:\Users\\anaconda3\envs\myenv\Lib\multiprocessing\pool.py:215](file:///C:/Users//anaconda3/envs/myenv/Lib/multiprocessing/pool.py:215), in Pool.__init__(self, processes, initializer, initargs, maxtasksperchild, context)
    213 self._processes = processes
    214 try:
--> 215     self._repopulate_pool()
    216 except Exception:
    217     for p in self._pool:

File [c:\Users\\anaconda3\envs\myenv\Lib\multiprocessing\pool.py:306](file:///C:/Users//anaconda3/envs/myenv/Lib/multiprocessing/pool.py:306), in Pool._repopulate_pool(self)
    305 def _repopulate_pool(self):
--> 306     return self._repopulate_pool_static(self._ctx, self.Process,
    307                                         self._processes,
    308                                         self._pool, self._inqueue,
    309                                         self._outqueue, self._initializer,
    310                                         self._initargs,
    311                                         self._maxtasksperchild,
    312                                         self._wrap_exception)

File [c:\Users\\anaconda3\envs\myenv\Lib\multiprocessing\pool.py:322](file:///C:/Users//anaconda3/envs/myenv/Lib/multiprocessing/pool.py:322), in Pool._repopulate_pool_static(ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, maxtasksperchild, wrap_exception)
    318 """Bring the number of pool processes up to the specified number,
    319 for use after reaping workers which have exited.
    320 """
    321 for i in range(processes - len(pool)):
--> 322     w = Process(ctx, target=worker,
    323                 args=(inqueue, outqueue,
    324                       initializer,
    325                       initargs, maxtasksperchild,
    326                       wrap_exception))
    327     w.name = w.name.replace('Process', 'PoolWorker')
    328     w.daemon = True

File [c:\Users\\anaconda3\envs\myenv\Lib\multiprocessing\process.py:82](file:///C:/Users//anaconda3/envs/myenv/Lib/multiprocessing/process.py:82), in BaseProcess.__init__(self, group, target, name, args, kwargs, daemon)
...
---> 82     assert group is None, 'group argument must be None for now'
     83     count = next(_process_counter)
     84     self._identity = _current_process._identity + (count,)

AssertionError: group argument must be None for now

this is part of the code in file base which error occours (the link to the file just in case).

elif self.version == 2:

            if self.table == 'events' or self.table == '':
                columns = self.events_columns
                if self.coverage is True:  # pragma: no cover

                    self.download_list = (urlsv2events(v2RangerCoverage(
                        _dateRanger(self.date))))
                else:

                    self.download_list = (urlsv2events(v2RangerNoCoverage(
                        _dateRanger(self.date))))

            if self.table == 'gkg':
                columns = self.gkg_columns
                if self.coverage is True:  # pragma: no cover

                    self.download_list = (urlsv2gkg(v2RangerCoverage(
                        _dateRanger(self.date))))
                else:
                    self.download_list = (urlsv2gkg(v2RangerNoCoverage(
                        _dateRanger(self.date))))
                    # print ("2 gkg", urlsv2gkg(self.datesString))

            if self.table == 'mentions':
                columns = self.mentions_columns
                if self.coverage is True:  # pragma: no cover

                    self.download_list = (urlsv2mentions(v2RangerCoverage(
                        _dateRanger(self.date))))

                else:

                    self.download_list = (urlsv2mentions(v2RangerNoCoverage(
                        _dateRanger(self.date))))



if isinstance(self.datesString, str):
        if self.table == 'events':

            results = eventWork(self.download_list)
        else:
            # if self.table =='gkg':
            #     results = eventWork(self.download_list)
            #
            # else:
            results = _mp_worker(self.download_list, proxies=self.proxies)

    else:

        if self.table == 'events':

            pool = Pool(processes=cpu_count())
            downloaded_dfs = list(pool.imap_unordered(eventWork,
                                                      self.download_list))
        else:

            pool = NoDaemonProcessPool(processes=cpu_count())
            downloaded_dfs = list(pool.imap_unordered(_mp_worker,
                                                      self.download_list,
                                                      ))
        pool.close()
        pool.terminate()
        pool.join()
        # print(downloaded_dfs)
        results = pd.concat(downloaded_dfs)
        del downloaded_dfs
        results.reset_index(drop=True, inplace=True)

i found partially the answer here:

https://docs.python.org/2/library/threading.html#threading.Thread

but i don't know how i can change and exactly what to change. this is the first time that i probably need to change a library in order to be able to write my own code. any help would be appreciated.

EDIT:

here's a jupytur notebook in which you can easily test.

Upvotes: 0

Views: 187

Answers (1)

Ahmed AEK
Ahmed AEK

Reputation: 17496

in the base.py file line 54 to 57, patch them as follows.

def dummy_process(self, ctx, *args, **kwargs):
    return NoDaemonProcess(*args, **kwargs)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NoDaemonProcessPool(multiprocessing.pool.Pool):
    Process = dummy_process

alteratively you can monkey-patch it yourself when importing the module.

from gdelt.base import NoDaemonProcess, NoDaemonProcessPool

def dummy_process(self, ctx, *args, **kwargs):
    return NoDaemonProcess(*args, **kwargs)

NoDaemonProcessPool.Process = dummy_process

basically multiprocessing.Pool.Process is expected to be called as Process(ctx, *args, **kwargs), while NoDaemonProcessPool is expecting to be constructed with Process(**kwargs), so this wrapper just peels off the ctx ... and self, because it is no-longer a static method.

Upvotes: 1

Related Questions