Reputation: 1204
Here I am pasting my python code which I am running on spark in order to perform some analysis on data. I am able to run the following program on small amount of data-set. But when coming large data-set, it is saying "Stage 1 contains a task of very large size (17693 KB). The maximum recommended task size is 100 KB".
import os
import sys
import unicodedata
from operator import add
try:
from pyspark import SparkConf
from pyspark import SparkContext
except ImportError as e:
print ("Error importing Spark Modules", e)
sys.exit(1)
def tokenize(text):
resultDict = {}
text = unicodedata.normalize('NFKD', text).encode('ascii','ignore')
str1= text[1]
str2= text[0]
arrText= text.split(str1)
ss1 = arrText[0].split("/")
docID = ss1[0].strip()
docName = ss[1].strip()
resultDict[docID+"_"+docName] = 1
return resultDict.iteritems()
sc=SparkContext('local')
textfile = sc.textFile("path to my data")
fileContent = textfile.flatMap(tokenize)
rdd = sc.parallelize(fileContent.collect())
rdd= rdd.map(lambda x: (x[0], x[1])).reduceByKey(add)
print rdd.collect()
#reduceByKey(lambda a,b: a+b)
rdd.coalesce(1).saveAsTextFile("path to result")
Here I am posting a little more warning: The job is not running after this. Can some one help me with this.
16/06/10 19:19:58 WARN TaskSetManager: Stage 1 contains a task of very large size (17693 KB). The maximum recommended task size is 100 KB.
16/06/10 19:19:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5314, localhost, partition 0,PROCESS_LOCAL, 18118332 bytes)
16/06/10 19:19:58 INFO Executor: Running task 0.0 in stage 1.0 (TID 5314)
16/06/10 19:43:00 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:43480 in memory (size: 3.9 KB, free: 511.1 MB)
16/06/10 19:43:00 INFO ContextCleaner: Cleaned accumulator 2
Upvotes: 7
Views: 8375
Reputation: 13538
When Spark serializes tasks, it recursively serializes the full closure context. In this case, the logical culprit seems to be unicodedata
which you use in tokenize
. I may be wrong but I don't see any other heavy data structures in the code. (Caveat, I typically use Spark with Scala and my Python is rusty.) I wonder whether the library is backed by heavy data structures that are not available on the executor nodes.
The typical patterns for dealing with these types of problems are:
Make sure all libraries are available on the executor nodes.
Use broadcast variables to distribute heavy data structures to executors.
Unrelated, unless you are using this as a debugging tool, you are doing much unnecessary collection of all the data back to the driver with collect
. Transformation can be chained:
sc.textFile(...).flatMap(...).map(...).reduceByKey(add).coalesce(1).saveAsTextFile(...)
Upvotes: 3