Reputation: 21
I am new to Spark ML
and got stuck in a task which require some data normalization and there is very less documentation available on net for Spark ML - Java. Any help is much appreciated.
Problem Description :
I have a Dataset
that contains encoded url in column (ENCODED_URL
) and I want to create new column (DECODED_URL
) in existing Dataset that contains decoded version of ENCODED_URL.
For Eg :
Current Dataset
ENCODED_URL
https%3A%2F%2Fmywebsite
New Dataset
ENCODED_URL | DECODED_URL
https%3A%2F%2Fmywebsite | https://mywebsite
Tried using withColumn
but had no clue what i should pass as 2nd argument
Dataset<Row> newDs = ds.withColumn("new_col",?);
After reading the Spark documentation got an idea that it may be possible with SQLTransformer but couldn't figure out how to customize it to decode the url.
This is how i read information from CSV
Dataset<Row> urlDataset = s_spark.read().option("header", true).csv(CSV_FILE).persist(StorageLevel.MEMORY_ONLY());
Upvotes: 2
Views: 1701
Reputation: 1277
The first thing to know is that Spark Datasets
are effectively immutable. Whenever you do a transformation, a new Dataset is created and returned. Another thing to keep in mind is the difference between actions
and transformations
-- actions cause Spark to actually to start crunching numbers and compute your DataFrame while transformations
add to the definition of a DataFrame but are not computed unless an action is called. An example of an action is DataFrame#count
while an example of a transformation is DataFrame#withColumn
. See the full list of actions and transformations in the Spark Scala documentation.
withColumn
allows you to either create a new column or replace an existing column in a Dataset
(if the first argument is an existing column's name). The docs for withColumn
will tell you that the second argument is supposed to be a Column
object. Unfortunately, the Column documentation only describes methods available to Column
objects but does not link to other ways to create Column
objects, so it's not your fault that you're at a loss for what do next.
The thing you're looking for is org.apache.spark.sql.functions#regexp_replace
. Putting it all together, your code should look something like this:
...
import org.apache.spark.sql.functions
Dataset<Row> ds = ... // reading from your csv file
ds = ds.withColumn(
"decoded_url",
functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))
regexp_replace
requires that we pass a Column
object as the first value but nothing requires that it even exist on any Dataset
because Column
objects are basically instructions for how to compute a column, they don't actually contain any real data themselves. To illustrate this principle, we could write the above snippet as:
...
import org.apache.spark.sql.functions
Dataset<Row> ds = ... // reading from your csv file
Column myColExpression = functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))
ds = ds.withColumn("decoded_url", myColExpression)
If you wanted, you could reuse myColExpression
on other datasets that have an encoded_url
column.
If you haven't already, you should familiarize yourself with the org.apache.spark.sql.functions
class. It's a util class that's effectively the Spark standard lib for transformations.
Upvotes: 1