Reputation: 337
I am struggling with a PySpark code, in particular, I'd like to call a function on an object col
which is not iterable.
from pyspark.sql.functions import col, lower, regexp_replace, split
from googletrans import Translator
def clean_text(c):
c = lower(c)
c = regexp_replace(c, r"^rt ", "")
c = regexp_replace(c, r"(https?\://)\S+", "")
c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "") #removePunctuation
c = regexp_replace(c, r"\n", " ")
c = regexp_replace(c, r" ", " ")
c = regexp_replace(c, r" ", " ")
# c = translator.translate(c, dest='en', src='auto')
return c
clean_text_df = uncleanedText.select(clean_text(col("unCleanedCol")).alias("sentence"))
clean_text_df.printSchema()
clean_text_df.show(10)
As soon as I run the code within c = translator.translate(c, dest='en', src='auto')
the error shown from Spark is TypeError: Column is not iterable
.
What I would like to do is a translation word by word:
From:
+--------------------+
| sentence|
+--------------------+
|ciao team there a...|
|dear itteam i urg...|
|buongiorno segnal...|
|hi team regarding...|
|hello please add ...|
|ciao vorrei effet...|
|buongiorno ho vis...|
+--------------------+
To:
+--------------------+
| sentence|
+--------------------+
|hello team there ...|
|dear itteam i urg...|
|goodmorning segna...|
|hi team regarding...|
|hello please add ...|
|hello would effet...|
|goodmorning I see...|
+--------------------+
The schema of the DataFrame
is:
root
|-- sentence: string (nullable = true)
Could anyone please help me?
Thank you very much
Upvotes: 2
Views: 1522
Reputation: 43544
PySpark is just the Python API written to support Apache Spark. If you want to use custom python functions, you will have to define a user defined function (udf
).
Keep your clean_text()
function as is (with the translate
line commented out) and try the following:
from pyspark.sql.functions import udf
from pyspark.sql.Types import StringType
def translate(c):
return translator.translate(c, dest='en', src='auto')
translateUDF = udf(translate, StringType())
clean_text_df = uncleanedText.select(
translateUDF(clean_text(col("unCleanedCol"))).alias("sentence")
)
The other functions in your original clean_text
(lower
and regexp_replace
) are built-in spark functions and operate on apyspark.sql.Column
.
Be aware that using this udf
will bring a performance hit. See: Spark functions vs UDF performance?
Upvotes: 3