Mathieu
Mathieu

Reputation: 31

parallel python: Communication pipe read error

I'm using parallel Python in one of my programs. when I run the Python program through the CLI it runs just fine. But when I run it through my debugger it throws the next error

  File "D:/Unief/Thesis/deepmedic-master\deepmedic\trainValidateTestVisualiseParallel.py", line 1063, in do_training
    job_server = pp.Server(ncpus=1, ppservers=ppservers) # Creates jobserver with automatically detected number of workers
  File "D:\Anaconda2\lib\site-packages\pp-1.6.4-py2.7.egg\pp.py", line 339, in __init__
    self.set_ncpus(ncpus)
  File "D:\Anaconda2\lib\site-packages\pp-1.6.4-py2.7.egg\pp.py", line 503, in set_ncpus
    range(ncpus - len(self.__workers))])
  File "D:\Anaconda2\lib\site-packages\pp-1.6.4-py2.7.egg\pp.py", line 138, in __init__
    self.start()
  File "D:\Anaconda2\lib\site-packages\pp-1.6.4-py2.7.egg\pp.py", line 149, in start
    self.pid = int(self.t.receive())
  File "D:\Anaconda2\lib\site-packages\pp-1.6.4-py2.7.egg\pptransport.py", line 140, in receive
    raise RuntimeError("Communication pipe read error")
RuntimeError: Communication pipe read error

Same error occurs when running the next pp example trough the debugger:

import math, sys, time
import pp
import os

def isprime(n):
    """Returns True if n is prime and False otherwise"""
    if not isinstance(n, int):
        raise TypeError("argument passed to is_prime is not of 'int' type")
    if n < 2:
        return False
    if n == 2:
        return True
    max = int(math.ceil(math.sqrt(n)))
    i = 2
    while i <= max:
        if n % i == 0:
            return False
        i += 1
    return True

def sum_primes(n):
    """Calculates sum of all primes below given integer n"""
    return sum([x for x in xrange(2,n) if isprime(x)])

print """Usage: python sum_primes.py [ncpus]
    [ncpus] - the number of workers to run in parallel,
    if omitted it will be set to the number of processors in the system
"""

print sys.argv
print sys.stdin
print sys.stdout
print sys.stderr

print 'read method'

#print 'str:', sys.stdin.read()
#print 'str:', sys.stdout.read()


print 'pptest print done'

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"



# Submit a job of calulating sum_primes(100) for execution.
# sum_primes - the function
# (100,) - tuple with arguments for sum_primes
# (isprime,) - tuple with functions on which function sum_primes depends
# ("math",) - tuple with module names which must be imported before sum_primes execution
# Execution starts as soon as one of the workers will become available
job1 = job_server.submit(sum_primes, (100,), (isprime,), ("math",))

# Retrieves the result calculated by job1
# The value of job1() is the same as sum_primes(100)
# If the job has not been finished yet, execution will wait here until result is available
result = job1()

print "Sum of primes below 100 is", result

start_time = time.time()

# The following submits 8 jobs and then retrieves the results
inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700)
jobs = [(input, job_server.submit(sum_primes,(input,), (isprime,), ("math",))) for input in inputs]
for input, job in jobs:
    print "Sum of primes below", input, "is", job()

print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()


# Parallel Python Software: http://www.parallelpython.com

Seems to be related to the absence of a CLI.

Does anyone have a solution?

Upvotes: 3

Views: 670

Answers (2)

Fabio Zadrozny
Fabio Zadrozny

Reputation: 25332

The real problem is that parallel-python does:

class PipeTransport(Transport):
    def __init__(self, r, w):
        if isinstance(r, types.FileType) and isinstance(w, types.FileType):
            self.r = r
            self.w = w
        else:
            raise TypeError("Both arguments of PipeTransport constructor " \
                    "must be file objects")

and sys.stdin is monkey-patched by the debugger.

A local fix is changing parallel-python to use sys.__stdin__ (it also uses sys.__stdout__ already due to those checks, so, I guess this would be natural).

i.e.: change the code to: self.t = pptransport.CPipeTransport(sys.__stdin__, sys.__stdout__) -- instead of the current self.t = pptransport.CPipeTransport(sys.stdin, sys.__stdout__).

in ppworker.py (in the python3 or python2 folder depending on which python version you use).

Also see: https://github.com/microsoft/debugpy/issues/296 for the report on the debugger side.

Upvotes: 1

Dave
Dave

Reputation: 680

I encountered the same problem but couldn't solve the fundamental cause. As a work around, when debugging, I add the following code after 'import pp'.

class FakeServer(object):
    def __init__(self,ncpus,ppservers):
        pass
     def submit(self,fn,params,localfunctions=None, externalmodules=None):
         result = apply(fn,params)
         return lambda: result
 pp.Server = FakeServer

Now your code is single-threaded and easily to debug, without having to change your usage of pp.

Upvotes: 0

Related Questions