Reputation: 3241
I have 10 jpeg images in a directory. I want to read all them simultaneously using pyspark. I tried as follows.
from PIL import Image
from pyspark import SparkContext, SparkConf
conf = SparkConf()
spark = SparkContext(conf=conf)
files = glob.glob("E:\\tests\\*.jpg")
files_ = spark.parallelize(files)
arrs = []
for fi in files_.toLocalIterator():
im = Image.open(fi)
data = np.asarray(im)
arrs.append(data)
img = np.array(arrs)
print (img.shape)
The code ended without error and printed out img.shape
; however, it did not run in parallel.
Could you help me?
Upvotes: 5
Views: 724
Reputation: 4059
My solution follows the same idea from werner, but I did only using spark libs:
from pyspark.ml.image import ImageSchema
import numpy as np
df = (spark
.read
.format("image")
.option("pathGlobFilter", "*.jpg")
.load("your_data_path"))
df = df.select('image.*')
# Pre-caching the required schema. If you remove this line an error will be raised.
ImageSchema.imageFields
# Transforming images to np.array
arrays = df.rdd.map(ImageSchema.toNDArray).collect()
img = np.array(arrays)
print(img.shape)
Upvotes: 3
Reputation: 14845
You can use rdd.map to load and transform the pictures in parallel and then collect the rdd into a Python list:
files = glob.glob("E:\\tests\\*.jpg")
file_rdd = spark.parallelize(files)
def image_to_array(path):
im = Image.open(path)
data = np.asarray(im)
return data
array_rdd = file_rdd.map(lambda f: image_to_array(f))
result_list = array_rdd.collect()
result_list
is now a list with 10 elements, each element is a numpy.ndarray
.
The function image_to_array
will be executed on different Spark executors in parallel. If you have a multi-node Spark cluster, you have to make sure that all nodes can access E:\\tests\\
.
After collecting the arrays, processing can continue with
img = np.array(result_list, dtype=object)
Upvotes: 4