Reputation: 123
Seemingly simple question, but can't find the answer.
Problem: I create a function that I will pass into map() that takes a single field and creates three fields out of it. I want the output of the map() to give me a new RDD, including both the fields from the input RDD and the new/output RDD. How do I do this?
Do I need to add the key of my data into the output of the function so that I can join more output RDD back to my original RDD? Is that the proper/best practice?
def extract_fund_code_from_iv_id(holding):
# Must include key of data for later joining
iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
return iv_id
Even more basic, I can't seem to combine two Row's.
row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
This doesn't return a new Row() like I want it to.
Thanks
Upvotes: 0
Views: 3350
Reputation: 5433
I would really recommend using UserDefinedFunction
.
Suppose you wanted to extract a number of features from a column int_col
of type int
of a DataFrame df
. Let's say these features are simply modulo 3
and modulo 2
of said column content.
We'll import UserDefinedFunction
and the data type of our functions.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
Then we'll implement our feature extraction functions:
def modulo_three(col):
return int(col) % 3
def modulo_two(col):
return int(col) % 2
and turn them into udf
s:
mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())
Now we'll compute all additional columns and give them nice names (via alias
):
new_columns = [
mod3(df['int_col']).alias('mod3'),
mod2(df['int_col']).alias('mod2'),
]
Finally we select these columns plus all columns that already existed before:
new_df = df.select(*df.columns+new_columns)
new_df
will now have two additional columns mod3
and mod2
.
Upvotes: 7