Sameer Gaware
Sameer Gaware

Reputation: 21

Create new column from existing column in Dataset - Apache Spark Java

I am new to Spark ML and got stuck in a task which require some data normalization and there is very less documentation available on net for Spark ML - Java. Any help is much appreciated.

Problem Description :

I have a Dataset that contains encoded url in column (ENCODED_URL) and I want to create new column (DECODED_URL) in existing Dataset that contains decoded version of ENCODED_URL.

For Eg :
Current Dataset

ENCODED_URL
https%3A%2F%2Fmywebsite

New Dataset

ENCODED_URL                         | DECODED_URL  
https%3A%2F%2Fmywebsite             | https://mywebsite

Tried using withColumn but had no clue what i should pass as 2nd argument

Dataset<Row> newDs = ds.withColumn("new_col",?);

After reading the Spark documentation got an idea that it may be possible with SQLTransformer but couldn't figure out how to customize it to decode the url.

This is how i read information from CSV

Dataset<Row> urlDataset = s_spark.read().option("header", true).csv(CSV_FILE).persist(StorageLevel.MEMORY_ONLY());

Upvotes: 2

Views: 1701

Answers (1)

Michael Wu
Michael Wu

Reputation: 1277

A Spark primer

The first thing to know is that Spark Datasets are effectively immutable. Whenever you do a transformation, a new Dataset is created and returned. Another thing to keep in mind is the difference between actions and transformations -- actions cause Spark to actually to start crunching numbers and compute your DataFrame while transformations add to the definition of a DataFrame but are not computed unless an action is called. An example of an action is DataFrame#count while an example of a transformation is DataFrame#withColumn. See the full list of actions and transformations in the Spark Scala documentation.

A solution

withColumn allows you to either create a new column or replace an existing column in a Dataset (if the first argument is an existing column's name). The docs for withColumn will tell you that the second argument is supposed to be a Column object. Unfortunately, the Column documentation only describes methods available to Column objects but does not link to other ways to create Column objects, so it's not your fault that you're at a loss for what do next.

The thing you're looking for is org.apache.spark.sql.functions#regexp_replace. Putting it all together, your code should look something like this:

...
import org.apache.spark.sql.functions

Dataset<Row> ds = ... // reading from your csv file
ds = ds.withColumn(
    "decoded_url", 
    functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))

regexp_replace requires that we pass a Column object as the first value but nothing requires that it even exist on any Dataset because Column objects are basically instructions for how to compute a column, they don't actually contain any real data themselves. To illustrate this principle, we could write the above snippet as:

...
import org.apache.spark.sql.functions

Dataset<Row> ds = ... // reading from your csv file
Column myColExpression = functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))
ds = ds.withColumn("decoded_url", myColExpression)

If you wanted, you could reuse myColExpression on other datasets that have an encoded_url column.

Suggestion

If you haven't already, you should familiarize yourself with the org.apache.spark.sql.functions class. It's a util class that's effectively the Spark standard lib for transformations.

Upvotes: 1

Related Questions