bobtt
bobtt

Reputation: 73

cant perform 2 succesive groupBy in spark

I am working with Spark on python.

My problem is: i have a .csv file which contains some data (int1, int2, int3, date). I did a groupByKey on int1. Now I want to perform an other groupBy on my date with the rdd create by the first groupBy.

Problem is I can't perform it. Any idea?

Regards

EDIT2: from pyspark import SparkContext import csv import sys import StringIO

sc = SparkContext("local", "Simple App")
file = sc.textFile("histories_2week9.csv")

 csvById12Rdd=file.map(lambda (id1,id2,value): ((id1,id2),value)).groupByKey()
 csvById1Rdd=csvById12Rdd.map(lambda ((id1,id2),group):(id1, (id2,group))).groupByKey()



def printit(one):
  id1, twos=one
  print("Id1:{}".format(id1))
    for two in twos:
      id2, values=two
      print("Id1:{} Id2:{}".format(id1,id2))
     for value in values:
        print("Id1:{} Id2:{} Value:{}".format(id1,id2,value))


  csvById12Rdd.first().foreach(printit)

the csv is like 31705,48,2,2014-10-28T18:14:09.000Z

EDIT 3:

i can print my iterator data with this code

from pyspark import SparkContext

import csv
import sys
import StringIO

sc = SparkContext("local", "Simple App")
file = sc.textFile("histories_2week9.csv")

def go_in_rdd2(x):
  print x[0]
  for i in x[1]:
      print i

counts = file.map(lambda line: (line.split(",")[0],line.split(",")[1:]))
counts = counts.groupByKey()
counts.foreach(go_in_rdd2)

but i still cant groupBy

Upvotes: 1

Views: 552

Answers (1)

G Quintana
G Quintana

Reputation: 4667

Group by return an RDD of (Key, Iterable[Value]), can you do the otherway round?

  1. Group by id1 and id2 and get an RDD of ((Id1,Id2), Iterable[Value])
  2. Then group by id1 alone and get an RDD of (Id1, Iterable[(Id2,Iterable[Value])])

Something like:

csv=[(1,1,"One","Un"),(1,2,"Two","Deux"),(2,1,"Three","Trois"),(2,1,"Four","Quatre")]
csvRdd=sc.parallelize(csv)
# Step 1
csvById12Rdd=csvRdd.map(lambda (id1,id2,value1,value2): ((id1,id2),(value1,value2))).groupByKey()
# Step 2
csvById1Rdd=csvById12Rdd.map(lambda ((id1,id2),group):(id1, (id2,group))).groupByKey()
# Print    
def printit(one):
    id1, twos=one
    print("Id1:{}".format(id1))
    for two in twos:
        id2, values=two
        print("Id1:{} Id2:{}".format(id1,id2))
        for value1,value2 in values:
            print("Id1:{} Id2:{} Values:{} {}".format(id1,id2,value1,value2))

csvById1Rdd.foreach(printit)

Upvotes: 0

Related Questions