Reputation: 51
Here is my problem: I've got this RDD:
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd= sc.parallelize (a)
Then I try : x: (x[0],x[1],x[2], list(x[3:])))
Finally I should find this:
But the problem is that I can't collect a list.
If anyone can help me I will appreciate it
Upvotes: 3
Views: 13339
Reputation: 60390
This might do your job (or give you some ideas to proceed further)...
One idea is to convert your col4
to a primitive data type, i.e. a string:
from pyspark.sql.functions import collect_list
import pandas as pd
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd = sc.parallelize(a)
df = x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"])
#[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']]
UPDATE (after your own answer):
I really thought the point I had reached above was enough to further adapt it according to your needs, plus that I didn't have time at the moment to do it myself; so, here it is (after modifying my df
definition to get rid of the parentheses, it is just a matter of a single list comprehension):
df = x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"])
# temp list:
ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
# [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']]
# final list of lists:
ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]]
which gives your initially requested result:
[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]] # requested output
This approach has certain advantages compared with the one provided in your own answer:
Upvotes: 1
Reputation: 51
I finally found a solution, it is not the best way but I can continue working...
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
def example(lista):
d = [[] for x in range(len(lista))]
for index, elem in enumerate(lista):
d[index] = elem.split("@")
return d
example_udf = udf(example, LongType())
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd= sc.parallelize (a)
df = rdd.toDF(["col1","col2","col3","col4","col5"])
df2=df.withColumn('col6', concat(col('col4'),lit('@'),col('col5'))).drop(col("col4")).drop(col("col5")).groupBy([col("col1"),col("col2"),col("col3")]).agg(collect_set(col("col6")).alias("col6")) x: (x[0],x[1],x[2],example(x[3]))).collect()
And it gives:
[(u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']])]
Hope this solution can help to someone else.
Thanks for all your answers.
Upvotes: 2
Reputation: 56
Since you cannot update to 2.x your only option is RDD API. Replace you current code with: x: ((x[0], x[1], x[2]), list(x[3:]))).groupByKey().toDF()
Upvotes: 0