DAE
DAE

Reputation: 123

Combine Row()'s in Spark

Seemingly simple question, but can't find the answer.

Problem: I create a function that I will pass into map() that takes a single field and creates three fields out of it. I want the output of the map() to give me a new RDD, including both the fields from the input RDD and the new/output RDD. How do I do this?

Do I need to add the key of my data into the output of the function so that I can join more output RDD back to my original RDD? Is that the proper/best practice?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id

Even more basic, I can't seem to combine two Row's.

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2

This doesn't return a new Row() like I want it to.

Thanks

Upvotes: 0

Views: 3350

Answers (1)

karlson
karlson

Reputation: 5433

I would really recommend using UserDefinedFunction.

Suppose you wanted to extract a number of features from a column int_col of type int of a DataFrame df. Let's say these features are simply modulo 3 and modulo 2 of said column content.

We'll import UserDefinedFunction and the data type of our functions.

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

Then we'll implement our feature extraction functions:

def modulo_three(col):
    return int(col) % 3

def modulo_two(col):
    return int(col) % 2

and turn them into udfs:

mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())

Now we'll compute all additional columns and give them nice names (via alias):

new_columns = [
    mod3(df['int_col']).alias('mod3'),
    mod2(df['int_col']).alias('mod2'),
]

Finally we select these columns plus all columns that already existed before:

new_df = df.select(*df.columns+new_columns)

new_df will now have two additional columns mod3 and mod2.

Upvotes: 7

Related Questions