user3756702
user3756702

Reputation: 215

spark python reduced by key

I would like to count which user views how often which category. I am a newbie in Spark and Python. Here is the demo data:

dataSource = sc.parallelize( [("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")] )

I reduced this by key user and collected all the categories. Then I splitted to count later:

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y)
catSplitted = dataReduced.map(lambda (user,values) : (values.split(","),user))

The output format for each user looks like this -> ([cat1,cat1,cat2,catn], user)

Could someone please tell me how to count the categories with Spark and Python or do you have a different way to solve this problem?

Upvotes: 1

Views: 7434

Answers (3)

Elior Malul
Elior Malul

Reputation: 691

Another (more efficient and easy to read IMO). Efficient since you SPARK DAG will not need to collect for repartitioning category after user-based partitioning, and easy to user since it uses dataframes, not RDDs.

First, make a hash column based on user and category:

import pyspark.sql.functions as F
df = spark.createDataFrame([("u1", "f"), ("u1", "f"), ("u2", "f"), ("u2", "b"), ("u2", "b")], schema=['u', 'c'])
df = df.withColumn('hash', f.hash())

Second, we partition by hash, and aggregate locally:

from pyspark.sql import Window
win = Window.partitionBy('hash')
df.withColumns('count', F.count('hash').over(win)).distinct().show()

Upvotes: 0

user3756702
user3756702

Reputation: 215

Now I've got the result which I expected. But I guess it isn't favourable to connect the key like i did. Maybe someone has another solution or any suggestions?

# count the categorie views per user
# data
dataSource = sc.parallelize( [("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")] )
# Create Key,Value | concatenate user and category as key
dataKeyValue = dataSource.map(lambda (user,category) : (user+","+category, 1))
# reduce 
dataReduced = dataKeyValue.reduceByKey(lambda x,y : x + y)
# result => [('user2,books', 2), ('user1,film', 2), ('user2,film', 1)]
# split key
cleanResult = dataReduced.map(lambda (key,value) : (key.split(","),value))

Upvotes: 2

haraprasadj
haraprasadj

Reputation: 1087

In pure python:

ds = [('user1',['film','film','books']), ('user2',['film','books','books'])]
ds1 = map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))),ds)
print ds1

return:

[('user1', (('film', 2), ('books', 1))), ('user2', (('film', 1), ('books', 2)))]

In spark, it should be as follows (not sure as I dont have access to spark right now):

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y)
catSplitted = dataReduced.map(lambda (user,values) : (user, values.split(","))
catCounted = catSplitted.map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y)))

Hope this helps. If not you can try checking how to get the python functionality using spark commands. Basic logic should work

Upvotes: 0

Related Questions