Reputation: 593
I am a beginner for PySpark, recently I tried to submit a simple python application(batch resize pictures) to my spark cluster. I can run the application through pycharm successfully and when I submittend my application to spark, the images are resized as well.
this is my original Python code:
import os
from PIL import Image
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
Then I transformed it into a way that I thought I can submit my python application properly:
import os
from PIL import Image
from pyspark import SparkContext, SparkConf
APP_NAME = "ImageResizer"
def main(sc):
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
print 'done'
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("spark://10.233.70.48:7077")
sc = SparkContext(conf=conf)
main(sc)
However, I was told that I did not actually use spark at all (I think so as well but I just don't know how). I am wondering how can I properly transform my original code into the Pyspark way.
Can anyone familiar with pyspark help me out? And any suggestions on where I can properly and systematically learn how to write PySpark applications? Thank you
Upvotes: 3
Views: 2480
Reputation: 1369
Right now you are not using spark at all. You are simply using the SparkContext as a variable you pass to your main function (and then do nothing with). In order to use PySpark, you need to rethink your application. Commands like os.listdir('.')
work fine on a single machine, but if you run it on a cluster of computers which directory does the .
refer to? The machine where the job was submitted? The local directory on every machine? A shared network drive? If you are just running on one machine (for tests that is plenty). You can start to use Spark by simply parallelizing the list (turn it into an RDD). You can then apply operations on the RDD like map
, filter
, and reduce
s_list = sc.parallelize(os.listdir('.'))
s_jpg_list = s_list.filter(lambda f: f.endswith('.jpg'))
def resize_image(f):
i = Image.open(f)
size_64 = (64,64)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
out_path = 'resize/{}_64'.format(fn, fext)
i.save(out_path)
return out_path
s_jpg_files = s_jpg_list.map(resize_image)
print('Converted Images:', s_jpg_files.collect())
Upvotes: 4
Reputation:
but the images are not resized - this is not the same as application failure. When app is submitted it uses application specific working directory. There won't be any files to process there and it exists without doing any work.
Upvotes: 0