Reputation: 15293
I have this dataframe path_df:
path_df.show()
+---------------+-------------+----+
|FromComponentID|ToComponentID|Cost|
+---------------+-------------+----+
| 160| 163|27.0|
| 160| 183|27.0|
| 161| 162|22.0|
| 161| 170|31.0|
| 162| 161|22.0|
| 162| 167|24.0|
| 163| 160|27.0|
| 163| 164|27.0|
| 164| 163|27.0|
| 164| 165|35.0|
| 165| 164|35.0|
| 165| 166|33.0|
| 166| 165|33.0|
| 166| 167|31.0|
| 167| 162|24.0|
| 167| 166|31.0|
| 167| 168|27.0|
| 168| 167|27.0|
| 168| 169|23.0|
| 169| 168|23.0|
+---------------+-------------+----+
only showing top 20 rows
From this, I want to make a dictionnary, as follow:
{FromComponentID:{ToComponentID:Cost}}
For my current data, it would be:
{160 : {163 : 27,
183 : 27},
161 : {162 : 22,
170 : 31},
162 : {161 : 22
167 : 24},
...
167 : {162 : 24,
166 : 31,
168 : 27}
168 : {167 : 27,
169 : 23},
169 : {168 : 23}
}
Can I do that using only PySpark and how ? Or maybe it's better to extract my data and process them directly with python.
Upvotes: 1
Views: 19863
Reputation: 2590
You can do all of this with dataframe transformations and udfs. The only slightly annoying thing is that, because you technically have two different types of dictionaries (one where key=integer and value=dictionary, the other where key=integer value=float), you will have to define two udfs with different datatypes. Here is one possible way to do this:
from pyspark.sql.functions import udf,collect_list,create_map
from pyspark.sql.types import MapType,IntegerType,FloatType
data = [[160,163,27.0],[160,183,27.0],[161,162,22.0],
[161,170,31.0],[162,161,22.0],[162,167,24.0],
[163,160,27.0],[163,164,27.0],[164,163,27.0],
[164,165,35.0],[165,164,35.0],[165,166,33.0],
[166,165,33.0],[166,167,31.0],[167,162,24.0],
[167,166,31.0],[167,168,27.0],[168,167,27.0],
[168,169,23.0],[169,168,23.0]]
cols = ['FromComponentID','ToComponentID','Cost']
df = spark.createDataFrame(data,cols)
combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
MapType(IntegerType(),FloatType()))
combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
MapType(IntegerType(),MapType(IntegerType(),FloatType())))
mapdf = df.groupBy('FromComponentID')\
.agg(collect_list(create_map('ToComponentID','Cost')).alias('maps'))\
.agg(combineDeepMap(collect_list(create_map('FromComponentID',combineMap('maps')))))
result_dict = mapdf.collect()[0][0]
For a large dataset, this should offer some performance boosts over a solution that requires the data to be collected onto a single node. But since spark still has to serialize the udf, there won't be huge gains over an rdd based solution.
Update:
An rdd solution is a lot more compact but, in my opinion, it is not as clean. This is because pyspark doesn't store large dictionaries as rdds very easily. The solution is to store it as a distributed list of tuples and then convert it to a dictionary when you collect it to a single node. Here is one possible solution:
maprdd = df.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1]:y[2] for y in x[1]}))
result_dict = dict(maprdd.collect())
Again, this should offer performance boosts over a pure python implementation on single node, and it might not be that different than the dataframe implementation, but my expectation is that the dataframe version will be more performant.
Upvotes: 8
Reputation: 1670
Easiest way I know is the below (but has Pandas dependency):
path_df.toPandas().set_index('FromComponentID').T.to_dict('list')
Upvotes: 2
Reputation: 109
You can try this way
df_prod = spark.read.csv('/path/to/sample.csv',inferSchema=True,header=True)
rdd = df_prod.rdd.map(lambda x: {x['FromComponentID']:{x['ToComponentID']:x['Cost']}})
rdd.collect()
Upvotes: 0