Reputation: 8144
I am new beginner to Spark and started to write some script in Python. My understanding is that Spark executes the Transformation in parallel (map).
def some_function(name, content):
print(name, datetime.now())
time.sleep(30)
return content
config = SparkConf().setAppName("sample2").setMaster("local[*]")
filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")
inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))
print(inputfileRDD.collect())
The above code collects list of .zip
files from a folder and processes it. When I execute it I am seeing this is happening sequentially.
Output
file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085
file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317
You can see that it started processing 2nd file after 30 sec. meaning after completing the first file. What went wrong in my code ? why it is not executing RDD in parallel ? Can you please help me ?
Upvotes: 1
Views: 747
Reputation: 10406
I don't know exactly how the method binaryFiles
partitions the files accross spark partitions. It seems that contrarily to textFiles
, it tends to only create one partition. Let's see that with an example directory called dir
and containing 5 files.
> ls dir
test1 test2 test3 test4 test5
If I use textFile
, things are run in parallel. I don't provide the output because it is not very pretty but you can check yourself. We can verify that things are run in parallel with getNumPartitions
.
>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))
# ugly output, but everything starts at the same time,
# except maybe the last one since you have 4 cores.
>>> sc.textFile("dir").getNumPartitions()
5
With binaryFiles
things are different and for some reason everything goes to the same partition.
>>> sc.binaryFiles("dir").getNumPartitions()
1
I even tried with 10k files and everything still goes to the same partition. I believe the reason behind that is that in scala, binaryFiles
returns a RDD with file names and an object that allows to read the files (but no reading is performed). Therefore it is fast, and the resulting RDD is small. Therefore, having it on one partition is OK.
In scala, we can thus use repartition after using binaryFiles
and things will work great.
scala> sc.binaryFiles("dir").getNumPartitions
1
scala> sc.binaryFiles("dir").repartition(4).getNumPartitions
4
scala> sc.binaryFiles("dir").repartition(4)
.foreach{ case (name, ds) => {
println(System.currentTimeMillis+": "+name)
Thread.sleep(2000)
// do some reading on the DataStream ds
}}
1603352918396: file:/home/oanicol/sandbox/dir/test1
1603352918396: file:/home/oanicol/sandbox/dir/test3
1603352918396: file:/home/oanicol/sandbox/dir/test4
1603352918396: file:/home/oanicol/sandbox/dir/test5
1603352920397: file:/home/oanicol/sandbox/dir/test2
The problem in python is that binaryFiles
actually reads the file onto one single partition. Also, that's extremely mysterious to me but the following lines of code in pyspark 2.4 yield the same behaviour you notice which does not make sense.
# this should work but does not
sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
# this does not work either, which is strange but it would not be advised anyway
# since all the data would be read on one partition
sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))
Yet, since binaryFiles
actually reads the file, you can use wholeTextFile
which reads the file as a text file and behaves as expected:
# this works
sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
Upvotes: 2