Reputation: 303
I have a rdd of tuples where the first two lines look like this:
[[('n', 12.012457082117459), ('s', 0.79112758892014912)],
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]
In each tuple, the first value, e.g: 'n','s','t', is the desired column name, and the second value, e.g: 12.012, 0.7911.... is the desired values for each column. However, in each list(row) of rdd, we can see that not all column names are there. For example, in the first row, only
'n', 's'
appeared, while there is no
's'
in the second row. So I want to convert this rdd to a dataframe, where the values should be 0 for columns that do not show up in the original tuple. In other words, the first two rows might look like this:
n s t vn omitted.....
12 0.79 0 0 .....
52 0 3.62 3.62 .......
I tried following:
row = Row('l','eng','q','g','j','b','nt','z','n','d','f','i','k','s','vn','nz','v','nrt','tg','nrfg','t','ng','zg','a')
df = tup_sum_data.map(row).toDF()
Where strings in Row() are my desired column names. But i got following error:
TypeError Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968 try:
--> 969 return _infer_schema(obj)
970 except TypeError:
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_schema(row)
991 else:
--> 992 raise TypeError("Can not infer schema for type: %s" % type(row))
993
TypeError: Can not infer schema for type: <class 'numpy.float64'>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968 try:
--> 969 return _infer_schema(obj)
970 except TypeError:
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
969 return _infer_schema(obj)
970 except TypeError:
--> 971 raise TypeError("not supported type: %s" % type(obj))
972
973
TypeError: not supported type: <class 'tuple'>
Some lines in the error codes are omitted. Could anyone help me figure out how to deal with this? Thank you !
UPDATE I converted data types from np.float64 to float, and there is no error. However, the dataframe does not look like what I wanted; it looked like this:
+--------------------+
| l|
+--------------------+
|[[n,12.0124570821...|
|[[t,3.62434093297...|
|[[a,0.44628710262...|
|[[n,16.7534769832...|
|[[n,17.6017774340...|
+--------------------+
only showing top 5 rows
So can anyone help me how to get a correctly formatted dataframe? Thank you !
Upvotes: 0
Views: 5892
Reputation: 20440
from pyspark.sql.types import *
from pyspark.sql import *
data_frame_schema = StructType([
StructField("n", FloatType()),
StructField("s", FloatType()),
StructField("t", FloatType()),
StructField("v", FloatType()),
StructField("vn", FloatType())
])
raw_list = [[('n', 12.012457082117459), ('s', 0.79112758892014912)], \
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]
raw_rdd = sc.parallelize(raw_list)
# dict_to_row = lambda d: Row(n=d.get("n"), s=d.get("s"), t=d.get("t"), v=d.get("v"), vn=d.get("vn"))
dict_to_row = lambda d: Row(n=d.get("n", 0.0), s=d.get("s", 0.0), t=d.get("t", 0.0), v=d.get("v", 0.0), vn=d.get("vn", 0.0))
row_rdd = raw_rdd.map(lambda l: dict_to_row(dict(l)))
df = spark.createDataFrame(row_rdd, data_frame_schema)
df.show()
Pasting the above into the pyspark shell yields output:
+---------+----------+--------+---------+--------+
| n| s| t| v| vn|
+---------+----------+--------+---------+--------+
|12.012457|0.79112756| 0.0| 0.0| 0.0|
| 52.74325| 0.0|3.624341|11.644348|3.624341|
+---------+----------+--------+---------+--------+
Upvotes: 3