Vinh Nguyen
Vinh Nguyen

Reputation: 53

PySpark task is taking too long to run with three explode functions

I am trying to run a Spark program for semantic processing, but it is stuck on Stage 2. I'm wondering what the problem is here?

# create Spark Context
spark = SparkSession.builder.master("Semantic Processing")\
        .config('spark.master', 'local')\
        .getOrCreate()
sqlc = SQLContext(spark)
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

# read in csv
df = sqlc.read.format('csv')\
    .options(header='true', inferSchema='true', sep='|')\
    .load('data/VzW_PSSData_07012015.csv')

# udf for extracting triples

triples_schema = StructType([
    StructField('subjects', ArrayType(StringType(), True), True),
    StructField('actions', ArrayType(StringType(), True), True),
    StructField('objects', ArrayType(StringType(), True), True)
])

triples_udf = udf(lambda x: get_triples(x), triples_schema)

start = time.time()

triples = df.select('InteractionID', triples_udf('Notes').alias('Triples'))
triples = triples.withColumn('Subject', explode('Triples.subjects'))\
                    .withColumn('Action', explode('Triples.actions'))\
                    .withColumn('Object', explode('Triples.objects'))\
                    .select('InteractionID', 'Subject', 'Action', 'Object')

triples.explain()

# write to csv
triples.coalesce(1).write.format('csv').save('SPARK_triples_output.csv', header='true', sep='|')

This is what get_triples() does:

def get_triples(note):
    """extract subject, verb, and object"""
    nlp = spacy.load("en_core_web_sm")

    if note == None:
        return None
    else:
        doc = nlp(str(note))
        subj_list = []
        verb_list = []
        obj_list = []
        for sent in doc.sents:
            sent = clean_text(str(sent))
            doc = nlp(sent)
            # find each verb
            for tok in doc:
                subj = ""
                verb = ""
                obj = ""
                mod = ""
                prep = ""
                # check if current token is a verb
                if tok.pos == VERB:
                    verb = tok.text

                    # store right tree into object
                    obj = ' '.join([t.text for t in tok.rights])

                    # for each verb, check children for objects: dobj and prep
                    # find pred, modifiers, negs, adv for verb
                    for child in tok.children:
                        if child.dep_ == 'nsubj':
                            subj = child.text

                        elif (child.dep_ == 'aux'):
                            mod += child.text + ' '

                        elif (child.dep_ == 'neg'):
                            mod += child.text + ' '

                        elif (child.dep_ == 'prep'):
                            prep = child.text

                    verb = mod + verb + ' ' + prep

                    subj_list.append(subj)
                    verb_list.append(verb)
                    obj_list.append(obj)

        return subj_list, verb_list, obj_list

It reads in a text record, finds subjects, verbs, and objects, and returns three lists: subjects, verbs, objects. My dataset is around 460k records of 247.7MB.

Example:

Input:
'customer called in needing help with activating their  replacement phone did an esn swap  in acss customer needing to pay their bill once bill was payed activation went through did a test call and sent out a test text issue resolved '

I want to output into a dataframe and write to CSV that looks like this:

InteractionID   Subject     Action              Object
0   42671331    customer    said                lagging
0   42671331                has 
0   42671331                been    
0   42671331    device      has been lagging    freezing turned
0   42671331                moving              slower

This is the output for printSchema:

root                                                                            
 |-- InteractionID: integer (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Action: string (nullable = true)
 |-- Object: string (nullable = true)

Essentially my program is doing:

  1. reading in CSV file
  2. declaring a UDF to semantically process a string
  3. Creating a dataframe selecting InteractionID and running my UDF on the column with the text
  4. Exploding the three array columns within the root column Triples
  5. Selecting InteractionID and the three exploded columns
  6. Writing to a new CSV

And this is the output for explain:

== Physical Plan ==                                                             
Generate explode(Triples#131.objects), [InteractionID#17, Subject#136, Action#142], false, [Object#149]
+- Generate explode(Triples#131.actions), [InteractionID#17, Triples#131, Subject#136], false, [Action#142]
   +- Generate explode(Triples#131.subjects), [InteractionID#17, Triples#131], false, [Subject#136]
      +- *(2) Project [InteractionID#17, pythonUDF0#159 AS Triples#131]
         +- BatchEvalPython [<lambda>(Notes#65)], [InteractionID#17, Notes#65, pythonUDF0#159]
            +- *(1) FileScan csv [InteractionID#17,Notes#65] Batched: false, Format: CSV, Location: InMemory

Upvotes: 3

Views: 1638

Answers (2)

Oli
Oli

Reputation: 10406

The call to .coalesce(1) is most likely what causes the problem, but there are other sources of problems in what you are trying to do.

1. coalesce: I assume that your goal is to obtain a single file CSV. That's a bad idea. Spark is about parallel computing. With coalesce(1), you put all the data in one partition (one core, one machine), which is why it is taking so long. You might even get an "out of memory error".

2. explode: Your job is multiplying the size of the dataset by a lot. Each line generates A*B*C lines, where A is the number of subjects, B is the number of objects and C the number of actions. That a lot of data for one single partition. If you count 5 elements per array, you end up multiplying the size of the dataset by 5^3=125, which means a dataset of size 34GB. In terms of records, 460K*125=57.5M. It can be a lot for a laptop.

3. python UDF: The performance gap between pyspark and scala-spark is generally very small, except when you use UDF. Indeed, UDFs are known to deteriorate the performance even in scala compared to SparkSQL functions that spark knows how to optimize. When the UDF is in scala, it's slower. When it's in python, it's catastrophically slower.

But these are just pointers. You are saying that it does not move forward. Trying calling repartition(1000) right after reading the CSV, and removing coalesce(1). It will cut down the job in smaller pieces (you may even try a larger value than 1000) and see using the SparkUI if some tasks are being completed.

Upvotes: 4

Fang Zhang
Fang Zhang

Reputation: 1707

It looks like you are generating more data than you want. For data

1 dog eats meat
1 bat hits ball

When you collect lists and explode, you will get

1 dog eats meat
1 dog eats ball
1 dog hits meat
1 dog hits ball
1 bat eats meat
1 bat eats ball
1 bat hits meat
1 bat hits ball

You can see what will happen when you have 10 items in your list. For your case, it would be more reasonable to return a list of 3-tuple instead of 3 lists.

Upvotes: 0

Related Questions