Reputation: 259
I tried to achieve a parallelized image processing technique using Spark. Different from conventional Spark work with millions of tasks. I only want to separate the image into the number of worker (machine) I have and let one worker process one image patch. So one image patch is one task, if I have 12 image patches, I have 12 tasks. The question is how to explicitly control the schedule of task to each worker. The current situation happens that if I parallelize the image patches, they often send several patches to one or two worker and leave the others not working. I tried to set the system property of spark to control the spark.cores.max and spark.default.parallelism. But it seems not helpful. The only way to make the task send to different workers as separate as possible is to enlarge the second parameter of SparkContext.parallelize - numSlices. Here is the code:
img = misc.imread('test_.bmp')
height, width = img.shape
divisions, patch_width, patch_height = partitionParameters(width, height, 2, 2, border=100)
spark = SparkContext(appName="Miner")
# spark.setSystemProperty('spark.cores.max','1')
spark.setSystemProperty('spark.default.parallelism','24')
broadcast_img = spark.broadcast(img)
start = datetime.now()
print "--------------------", divisions
# run spark
run = spark.parallelize(divisions, 24).cache()
print "--------------- RDD size: ", run._jrdd.splits().size()
result = run.map(lambda (x, y): crop_sub_img(broadcast_img.value, x, y, patch_width, patch_height, width, height)) \
.map(lambda ((x, y), subimg): fastSeg.process(subimg, x, y)) \
.collect()
img = cat_sub_img(result, width, height)
end = datetime.now()
print "time cost:", (end-start)
As you can see, I only have four patches set in divisions. divisions is a list of tuple with x and y-axis of the image patch. Only I set the numSlices to a high value 24 which far exceeds the actual tasks I have in divisions, most of workers are used now. But it seems not reasonable. If I set to 4, it will sent all tasks to only one worker! There must be someway to control how many task one worker accept. I am not familiar with the core of Spark. Can anyone help me, Thanks?
One thought it happens is that the image size is too small for one worker. So spark will assume one worker could handle that and send all to one.
Upvotes: 0
Views: 211
Reputation: 27455
One machine has multiple cores. Spark distributes the work between your worker cores, because they can perform work in parallel. If you have 12 machines with 4 cores each you have 48 cores in total. You should split the image into 48 patches, so that each core has something to do. If you split the imagine into just 12 patches, only 12 cores will have something to do and the other 36 cores will be wasted.
An exception is if your image processing algorithm does its own multi-threading. In this case you should start 1-core workers on the machines. The workers will only pick up 1 task each then and you can do the multi-threading as you like. If you run a standalone Spark cluster, you can set --cores 1
on the workers for this. (See the documentation.)
Upvotes: 1