Reputation: 385
I am struggling to correctly parallelise a function using Cython. Basically, the problem is to bin some data. The actual code is a bit long, but in the end it does something like this:
def bin_var(double[:] dist,
double[:] values,
double[:] bin_def,
double[:] varg, long[:] count):
dbin = (bin_def[1] - bin_def[0]) / bin_def[2]
for n1 in range(values.size):
if (dist[n1] < bin_def[0]) or (dist[n1] >= bin_def[1]):
continue
else:
ni = int((dist - bin_def[0]) / dbin)
count[ni] += 1
varg[ni] += calc_something(values[ni])
# Compute the mean
for n1 in range(int(bin_def[2])):
varg[ni] /= count[ni]
This code lends itself to some simple parallelisation (values
and dist
are very large): one needs to split the first for
loop over separate processes, each working on its own version of the count
and varg
arrays. When that is done, one has to combine everything together by summing the different versions of count
and varg
before the second for loop (much shorter).
That said, it's two days I'm trying to understand how to implement efficiently this in Cython, and I am starting to suspect it is not possible with the current version of the language. Note that just using prange
from cython.parallel
for the first loop does not provide correct results, because (I assume) of the simultaneous access of ni
, count
and varg
from different threads.
Is Cython parallel support really so limited? I was getting such a nice speedup single-threaded, I just hoped I could continue...
Upvotes: 3
Views: 902
Reputation: 520
For everybody who is still struggling, here is the solution that is much faster than with gil: . Nowadays (7 years later), it is possible to import atomic/openmp directly. It took me a while to figure that out since there is nothing about in the official documentation.
from libcpp.atomic cimport atomic
from cython.parallel cimport prange
cpdef parallel_loop():
cdef:
Py_ssize_t a = 100000
Py_ssize_t loopvar
atomic[int] *atticounter1= new atomic[int](0)
int finalvar = 0
try:
for loopvar in prange(a,nogil=True):
if loopvar % 1000 == 0:
atticounter1.fetch_add(1)
finalvar = atticounter1.load()
finally:
del atticounter1
return finalvar
f=parallel_loop()
print(f)
Most of the time, you want to do more than count positive results. In this case you can use openmp.
Here is a working example of a code that locates RGB colors in an image
The pyx file:
from cython.parallel cimport prange
cimport openmp
import numpy as np
cimport numpy as np
import cython
cimport cython
cpdef void searchforcolor(
unsigned char[:] pic,
unsigned char[::1] colors,
Py_ssize_t[:,::1] results,
Py_ssize_t[::1] countervar,
Py_ssize_t width,
Py_ssize_t totallengthpic,
Py_ssize_t totallengthcolor,
int cpus,
):
cdef:
Py_ssize_t i, j
unsigned char r,g,b
openmp.omp_lock_t locker
if cpus < 1:
cpus=openmp.omp_get_max_threads()
if cpus > 1:
openmp.omp_set_num_threads(cpus)
openmp.omp_init_lock(&locker)
for i in prange(0, totallengthcolor, 3, nogil=True): # should be possible, but not work not working: use_threads_if=cpus>1
r = colors[i]
g = colors[i + 1]
b = colors[i + 2]
for j in range(0, totallengthpic, 3):
if (r == pic[j]) and (g == pic[j+1]) and (b == pic[j+2]):
openmp.omp_set_lock(&locker)
results[countervar[0]][1] = ((j / 3) // width) #x
results[countervar[0]][0] = ((j / 3) % width) #y
results[countervar[0]][2] = b
results[countervar[0]][3] = g
results[countervar[0]][4] = r
countervar[0]+=1
openmp.omp_unset_lock(&locker)
openmp.omp_destroy_lock(&locker)
else:
for i in range(0, totallengthcolor, 3):
r = colors[i]
g = colors[i + 1]
b = colors[i + 2]
for j in range(0, totallengthpic, 3):
if (r == pic[j]) and (g == pic[j+1]) and (b == pic[j+2]):
results[countervar[0]][1] = ((j / 3) // width) #x
results[countervar[0]][0] = ((j / 3) % width) #y
countervar[0]+=1
The python code:
def search_colors(pic, colors, cpus=-1):
if not isinstance(colors, np.ndarray):
colors = np.array(colors, dtype=np.uint8)
rav_colors = np.ascontiguousarray(colors.ravel())
totallengthcolor = rav_colors.shape[0] - 1
totallenghtpic = np.prod(pic.shape) - 1
width = pic.shape[1]
results = np.zeros((totallenghtpic, 5), dtype=np.int64)
countervar = np.zeros(1, dtype=np.int64) # this is going to be our atomic counter
searchforcolor(
pic.ravel(),
rav_colors,
results,
countervar,
width,
totallenghtpic,
totallengthcolor,
cpus,
)
return results[: countervar[0]]
picpath = r"C:\Users\hansc\Downloads\pexels-alex-andrews-2295744.jpg"
pic = cv2.imread(picpath)
colors0 = np.array([[255, 255, 255]], dtype=np.uint8)
resus0 = search_colors(pic=pic, colors=colors0)
colors1 = np.array(
[
(66, 71, 69),
(62, 67, 65),
(144, 155, 153),
(52, 57, 55),
(127, 138, 136),
(53, 58, 56),
(51, 56, 54),
(32, 27, 18),
(24, 17, 8),
],
dtype=np.uint8,
)
resus1 = search_colors(pic=pic, colors=colors1)
print(resus1)
Important: Compile both examples it with the OpenMP flag!
Upvotes: 1
Reputation: 30901
I can think of three options here:
Use the GIL to ensure that the +=
is done single threaded:
varg_ni = calc_something(values[ni]) # keep this out
# of the single threaded block...
with gil:
count[ni] += 1
varg[ni] += varg_ni
This is easy and won't be too bad provided the work done in calc_something
is reasonably large
Make count
and varg
2D arrays with each thread writing to a different column. Sum along the second dimension afterwards:
# rough, untested outline....
# might need to go in a `with parallel()` block
num_threads = openmp.omp_get_num_threads()
cdef double[:,:] count_tmp = np.zeros((count.shape[0],num_threads))
cdef double[:,:] varg_tmp = np.zeros((varg.shape[0],num_threads))
# then in the loop:
count_tmp[ni,cython.parallel.threadid()] += 1
varg_tmp[ni,cython.parallel.threadid()] += calc_something(values[ni])
# after the loop:
count[:] = np.sum(count_tmp,axis=1)
varg[:] = np.sum(varg_tmp,axis=1)
You could also do something similar using the ideas in the local_buf
example in the documentation.
(NOTE - GCC is currently giving me an "internal compiler error" for this - I feel it should work, but for the moment it doesn't seem to be working, so try option 3 at your own risk...) Use the openmp atomic
directive to do the addition atomically. This requires a bit of work to circumvent Cython but shouldn't be too difficult. Create a short C header file with an add_inplace
macro:
#define add_inplace(x,y) _Pragma("omp atomic") x+=y
The _Pragma
is a C99 feature that should allow you to put pragmas in preprocessor statements. Then tell Cython about that header file (as if it's a function):
cdef extern from "header.h":
void add_inplace(...) nogil # just use varargs to make Cython think it accepts anything
Then in the loop do:
add_inplace(count[ni], 1)
add_inplace(varg[ni], calc_something(values[ni]))
Because this uses macro trickery it may be a bit fragile (i.e. definitely won't work with PyObject*
s, but it should generate the correct C code when using standard C numeric types. (Inspect the code to be sure)
Upvotes: 5