Rkz
Rkz

Reputation: 1257

Convert a RDD of Tuples of Varying Sizes to a DataFrame in Spark

I am having difficulty in converting an RDD of the follwing structure to a dataframe in spark using python.

df1=[['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),(itm22,6)]]

After converting, my dataframe should look like the following:

       usr1  usr2
itm1    2.0   NaN
itm2    NaN   3.0
itm22   NaN   6.0
itm3    3.0   5.0

I was initially thinking of coverting the above RDD structure to the following:

df1={'usr1': {'itm1': 2, 'itm3': 3}, 'usr2': {'itm2': 3, 'itm3': 5, 'itm22':6}}

Then use python's pandas module pand=pd.DataFrame(dat2) and then convert pandas dataframe back to a spark dataframe using spark_df = context.createDataFrame(pand). However, I beleive, by doing this, I am converting an RDD to a non-RDD object and then converting back to RDD, which is not correct. Can some please help me out with this problem?

Upvotes: 0

Views: 1324

Answers (1)

zero323
zero323

Reputation: 330413

With data like this:

rdd = sc.parallelize([
    ['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),('itm22',6)]
])

flatten the records:

def to_record(kvs):
    user, *vs = kvs  # For Python 2.x use standard indexing / splicing
    for item, value in vs:
        yield user, item, value

records = rdd.flatMap(to_record)

convert to DataFrame:

df = records.toDF(["user", "item", "value"])

pivot:

result = df.groupBy("item").pivot("user").sum()

result.show()
## +-----+----+----+
## | item|usr1|usr2|
## +-----+----+----+
## | itm1|   2|null|
## | itm2|null|   3|
## | itm3|   3|   5|
## |itm22|null|   6|
## +-----+----+----+

Note: Spark DataFrames are designed to handle long and relatively thin data. If you want to generate wide contingency table, DataFrames won't be useful, especially if data is dense and you want to keep separate column per feature.

Upvotes: 2

Related Questions