Sachin Sharma
Sachin Sharma

Reputation: 382

SparkException while porting pyspark code to scala for Spark 2.4.3

I've already written working code in python and now as per a new request, I'm rewriting the same code in scala. However, I am running into a few errors

Python Code

from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

#Step-1: Split fits_assembly_name column#

Initial dataframe df looks as following

+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
|     Itemno|fits_model_number|    fits_assembly_id|  fits_assembly_name                                                                  
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
| 0450056                                   44011         BODY, DECALS - Z19VFK99LK/LE (702498)                                                
| 0450056                                   135502        DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)    
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------                                                                                                                                              


df0 = df.withColumn('new_col', split(regexp_replace('fits_assembly_name', r'^(.*?)\s+-\s+(\S+)(.*)$', '$1$3\0$2'),'\0')) \
    .selectExpr(
        'Itemno'
      , 'fits_model_number'    
      , 'fits_assembly_id'
      , 'fits_assembly_name'
      , "coalesce(new_col[0], fits_assembly_name) as assembly_name"
      , "coalesce(new_col[1], '') as models"
)

display(df0)

OUTPUT:
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------+--------------------------------------+--------------------+
|     Itemno|fits_model_number|    fits_assembly_id|  fits_assembly_name                                                                  |       assembly_name                  |              models|
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------+--------------------------------------+--------------------+
0450056                                 44011         BODY, DECALS - Z19VFK99LK/LE (702498)                                                 BODY, DECALS (702498)                   Z19VFK99LK/LE
0450056                                 135502        DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)    DRIVE TRAIN, TRANSMISSION               V08AB26/GB26/LB26
                                                                                                                                            (6 SPEED) ALL OPTIONS (49VICTRANS08) 

#Step-2: convert string into array of strings
df1 = df0.withColumn('temp0', split('fits_model_number', r'(?:(?![/_])\p{Punct}|\s)+')) \
        .withColumn('temp0', expr("filter(temp0, x -> x <> '')"))

I am not sure about how to translate regexp_replace('fits_assembly_name', r'^(.*?)\s+-\s+(\S+)(.*)$', '$1$3\0$2'),'\0')) into scala code. I tried regexp_replace($"fits_assembly_name", """^(.*?)\s+-\s+(\S+)(.*)$""", """$1$3\0$2"""),"""\0""")) but I am getting complite time error while running display(df0).

Could you please help me in what needs to be updated in step-1 and step-2 here so that scala code runs for this logic.

P.S.: So far, this is how I started in scala

import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import spark.implicits._

// Step-1: Split fits_assembly_name column to remove model#

val df0 = df.withColumn("new_col", split(regexp_replace($"fits_assembly_name", """^(.*?)\s+-\s+(\S+)(.*)$""", """$1$3\0$2"""),"""\0""")).selectExpr(
        "Itemno"
      , "fits_model_number"    
      , "fits_assembly_id"
      , "fits_assembly_name"
      , "coalesce(new_col[0], fits_assembly_name) as assembly_name"
      , "coalesce(new_col[1], '') as models"
)

but I'm getting error while running display(df0).

Upvotes: 1

Views: 249

Answers (1)

The fourth bird
The fourth bird

Reputation: 163577

To create the 2 new columns from new_col by splitting on the new string as a result from regexp_replace you could update the pattern to use 4 capturing groups and in the replacement use $1$4$2$3

Then there are some single quotes that could be changed to double quotes.

^(.*?)(\s+-\s+)(\S+)(.*)$

Regex demo

For the new string, split on the pattern \s+-\s+ which will match a dash between 2 whitespace chars.

The code might look like

val df0 = df
  .withColumn(
    "new_col", split(
      regexp_replace(
        $"fits_assembly_name",
        """^(.*?)(\s+-\s+)(\S+)(.*)$""",
        "$1$4$2$3"
      ),
      """\s+-\s+"""
    )
  )
  .selectExpr(
    "Itemno"
    , "fits_model_number"
    , "fits_assembly_id"
    , "fits_assembly_name"
    , "coalesce(new_col[0], fits_assembly_name) as assembly_name"
    , "coalesce(new_col[1], '') as models"
  )
df0.show(false)

Output

+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
|Itemno |fits_model_number|fits_assembly_id|fits_assembly_name                                                                |assembly_name                                                 |models           |
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
|0450056|a,b              |44011           |BODY, DECALS - Z19VFK99LK/LE (702498)                                             |BODY, DECALS (702498)                                         |Z19VFK99LK/LE    |
|0450056|c.d              |135502          |DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)|DRIVE TRAIN, TRANSMISSION (6 SPEED) ALL OPTIONS (49VICTRANS08)|V08AB26/GB26/LB26|
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+

For the second part, there is not data for fits_model_number, but the pattern that you use (?:(?![/_])\p{Punct}|\s)+ will match either 1+ punctuations except / or _ or matches 1+ whitespace chars.

You could might update the code to:

val df1 = df0
  .withColumn("temp0", split(
    col("fits_model_number"),
    """(?:(?![/_])\p{Punct}|\s)+""")
  )
  .withColumn(
    "temp0",
    expr("filter(temp0, x -> x <> '')")
  )
df1.show()

Output (with some test data)

+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
| Itemno|fits_model_number|fits_assembly_id|  fits_assembly_name|       assembly_name|           models|       temp0|
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
|0450056|         a,b,test|           44011|BODY, DECALS - Z1...|BODY, DECALS (702...|    Z19VFK99LK/LE|[a, b, test]|
|0450056|              c.d|          135502|DRIVE TRAIN, TRAN...|DRIVE TRAIN, TRAN...|V08AB26/GB26/LB26|      [c, d]|
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+

Upvotes: 1

Related Questions