Reputation: 53
I am trying to run a Spark program for semantic processing, but it is stuck on Stage 2. I'm wondering what the problem is here?
# create Spark Context
spark = SparkSession.builder.master("Semantic Processing")\
.config('spark.master', 'local')\
.getOrCreate()
sqlc = SQLContext(spark)
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
# read in csv
df = sqlc.read.format('csv')\
.options(header='true', inferSchema='true', sep='|')\
.load('data/VzW_PSSData_07012015.csv')
# udf for extracting triples
triples_schema = StructType([
StructField('subjects', ArrayType(StringType(), True), True),
StructField('actions', ArrayType(StringType(), True), True),
StructField('objects', ArrayType(StringType(), True), True)
])
triples_udf = udf(lambda x: get_triples(x), triples_schema)
start = time.time()
triples = df.select('InteractionID', triples_udf('Notes').alias('Triples'))
triples = triples.withColumn('Subject', explode('Triples.subjects'))\
.withColumn('Action', explode('Triples.actions'))\
.withColumn('Object', explode('Triples.objects'))\
.select('InteractionID', 'Subject', 'Action', 'Object')
triples.explain()
# write to csv
triples.coalesce(1).write.format('csv').save('SPARK_triples_output.csv', header='true', sep='|')
This is what get_triples()
does:
def get_triples(note):
"""extract subject, verb, and object"""
nlp = spacy.load("en_core_web_sm")
if note == None:
return None
else:
doc = nlp(str(note))
subj_list = []
verb_list = []
obj_list = []
for sent in doc.sents:
sent = clean_text(str(sent))
doc = nlp(sent)
# find each verb
for tok in doc:
subj = ""
verb = ""
obj = ""
mod = ""
prep = ""
# check if current token is a verb
if tok.pos == VERB:
verb = tok.text
# store right tree into object
obj = ' '.join([t.text for t in tok.rights])
# for each verb, check children for objects: dobj and prep
# find pred, modifiers, negs, adv for verb
for child in tok.children:
if child.dep_ == 'nsubj':
subj = child.text
elif (child.dep_ == 'aux'):
mod += child.text + ' '
elif (child.dep_ == 'neg'):
mod += child.text + ' '
elif (child.dep_ == 'prep'):
prep = child.text
verb = mod + verb + ' ' + prep
subj_list.append(subj)
verb_list.append(verb)
obj_list.append(obj)
return subj_list, verb_list, obj_list
It reads in a text record, finds subjects, verbs, and objects, and returns three lists: subjects, verbs, objects. My dataset is around 460k records of 247.7MB.
Example:
Input:
'customer called in needing help with activating their replacement phone did an esn swap in acss customer needing to pay their bill once bill was payed activation went through did a test call and sent out a test text issue resolved '
I want to output into a dataframe and write to CSV that looks like this:
InteractionID Subject Action Object
0 42671331 customer said lagging
0 42671331 has
0 42671331 been
0 42671331 device has been lagging freezing turned
0 42671331 moving slower
This is the output for printSchema
:
root
|-- InteractionID: integer (nullable = true)
|-- Subject: string (nullable = true)
|-- Action: string (nullable = true)
|-- Object: string (nullable = true)
Essentially my program is doing:
InteractionID
and running my UDF on the column with the textTriples
InteractionID
and the three exploded columnsAnd this is the output for explain
:
== Physical Plan ==
Generate explode(Triples#131.objects), [InteractionID#17, Subject#136, Action#142], false, [Object#149]
+- Generate explode(Triples#131.actions), [InteractionID#17, Triples#131, Subject#136], false, [Action#142]
+- Generate explode(Triples#131.subjects), [InteractionID#17, Triples#131], false, [Subject#136]
+- *(2) Project [InteractionID#17, pythonUDF0#159 AS Triples#131]
+- BatchEvalPython [<lambda>(Notes#65)], [InteractionID#17, Notes#65, pythonUDF0#159]
+- *(1) FileScan csv [InteractionID#17,Notes#65] Batched: false, Format: CSV, Location: InMemory
Upvotes: 3
Views: 1638
Reputation: 10406
The call to .coalesce(1)
is most likely what causes the problem, but there are other sources of problems in what you are trying to do.
1. coalesce: I assume that your goal is to obtain a single file CSV. That's a bad idea. Spark is about parallel computing. With coalesce(1)
, you put all the data in one partition (one core, one machine), which is why it is taking so long. You might even get an "out of memory error".
2. explode: Your job is multiplying the size of the dataset by a lot. Each line generates A*B*C lines
, where A
is the number of subjects, B
is the number of objects and C
the number of actions. That a lot of data for one single partition. If you count 5 elements per array, you end up multiplying the size of the dataset by 5^3=125
, which means a dataset of size 34GB. In terms of records, 460K*125=57.5M
. It can be a lot for a laptop.
3. python UDF: The performance gap between pyspark and scala-spark is generally very small, except when you use UDF. Indeed, UDFs are known to deteriorate the performance even in scala compared to SparkSQL functions that spark knows how to optimize. When the UDF is in scala, it's slower. When it's in python, it's catastrophically slower.
But these are just pointers. You are saying that it does not move forward. Trying calling repartition(1000)
right after reading the CSV, and removing coalesce(1)
. It will cut down the job in smaller pieces (you may even try a larger value than 1000) and see using the SparkUI if some tasks are being completed.
Upvotes: 4
Reputation: 1707
It looks like you are generating more data than you want. For data
1 dog eats meat
1 bat hits ball
When you collect lists and explode, you will get
1 dog eats meat
1 dog eats ball
1 dog hits meat
1 dog hits ball
1 bat eats meat
1 bat eats ball
1 bat hits meat
1 bat hits ball
You can see what will happen when you have 10 items in your list. For your case, it would be more reasonable to return a list of 3-tuple instead of 3 lists.
Upvotes: 0