user2825083
user2825083

Reputation: 77

pyspark dataframe from rdd containing key and values as list of lists

I have a RDD like below with keys and values as list of list containing some parameters.

(32719, [[[u'200.73.55.34', u'192.16.48.217', 0, 6, 10163, 443, 0], [u'177.207.76.243', u'192.16.58.8', 0, 6, 59575, 80, 0]])
(32897, [[[u'200.73.55.34', u'193.16.48.217', 0, 6, 10163, 443, 0], [u'167.207.76.243', u'194.16.58.8', 0, 6, 59575, 80, 0]])

I want to create a dataframe with rows and columns as below

32719, '200.73.55.34', u'192.16.48.217', 0, 6, 10163, 443, 0
32719, '177.207.76.243', u'192.16.58.8', 0, 6, 59575, 80, 0
32897, 200.73.55.34', u'193.16.48.217', 0, 6, 10163, 443, 0

Or just a dataframe of all the values but grouped by the key. How can I do this.

Upvotes: 0

Views: 1283

Answers (2)

Himaprasoon
Himaprasoon

Reputation: 2659

Use flat map Values

a =[(32719, [[[u'200.73.55.34', u'192.16.48.217', 0, 6, 10163, 443, 0], [u'177.207.76.243', u'192.16.58.8', 0, 6, 59575, 80, 0]]]),
(32897, [[[u'200.73.55.34', u'193.16.48.217', 0, 6, 10163, 443, 0], [u'167.207.76.243', u'194.16.58.8', 0, 6, 59575, 80, 0]]])]

rdd =sc.parallelize(a)

rdd.flatMapValues(lambda x:x[0]).map(lambda x: [x[0]]+x[1]).toDF().show()

Output

+-------+----------------+---------------+----+----+-------+-----+----+
|  _1   |       _2       |      _3       | _4 | _5 |  _6   | _7  | _8 |
+-------+----------------+---------------+----+----+-------+-----+----+
| 32719 | 200.73.55.34   | 192.16.48.217 |  0 |  6 | 10163 | 443 |  0 |
| 32719 | 177.207.76.243 | 192.16.58.8   |  0 |  6 | 59575 |  80 |  0 |
| 32897 | 200.73.55.34   | 193.16.48.217 |  0 |  6 | 10163 | 443 |  0 |
| 32897 | 167.207.76.243 | 194.16.58.8   |  0 |  6 | 59575 |  80 |  0 |
+-------+----------------+---------------+----+----+-------+-----+----+

Upvotes: 3

Suresh
Suresh

Reputation: 5870

You can map to add the key to each value and create dataframe. I tried in my way,

>>>dat1 = [(32719, [[u'200.73.55.34', u'192.16.48.217', 0, 6, 10163, 443, 0], [u'177.207.76.243', u'192.16.58.8', 0, 6, 59575, 80, 0]]),(32897, [[u'200.73.55.34', u'193.16.48.217', 0, 6, 10163, 443, 0], [u'167.207.76.243', u'194.16.58.8', 0, 6, 59575, 80, 0]])]

>>>rdd1 = sc.parallelize(dat1).map(lambda x : [[x[0]]+i for i in x[1]]).flatMap(lambda x : (x))
>>>df = rdd1.toDF(['col1','col2','col3','col4','col5','col6','col7','col8'])
>>> df.show()
+-----+--------------+-------------+----+----+-----+----+----+
| col1|          col2|         col3|col4|col5| col6|col7|col8|
+-----+--------------+-------------+----+----+-----+----+----+
|32719|  200.73.55.34|192.16.48.217|   0|   6|10163| 443|   0|
|32719|177.207.76.243|  192.16.58.8|   0|   6|59575|  80|   0|
|32897|  200.73.55.34|193.16.48.217|   0|   6|10163| 443|   0|
|32897|167.207.76.243|  194.16.58.8|   0|   6|59575|  80|   0|
+-----+--------------+-------------+----+----+-----+----+----+

Upvotes: 0

Related Questions