Vedant
Vedant

Reputation: 523

Aggregation in PySpark

The format of my data is

UserId\tItemId:Score,ItemId:Score
UserId\tItemId:Score,ItemId:Score,ItemId:Score

and so on..

I am trying to normalize the score by subtracting the mean and dividing by the standard deviation. My data is present on S3, about 300 files or 30Mb each. I am using PySpark. This is my try:

lines = sc.textFile("s3n://data-files/clustering")

Itr1 = lines.map(lambda x:str(x))

Itr1.take(3)

['1\t1:0.1,2:0.2', '2\t3:0.4,4:0.6', '3\t5:0.8,6:0.1']


Itr2 = Itr1.map(lambda x: x.split("\t"))

Itr2.take(3)

[['1', '1:0.1,2:0.2'], ['2', '3:0.4,4:0.6'], ['3', '5:0.8,6:0.1']]

ItemRecScore = Itr2.map(lambda x:[x[1]])

ItemRecScore.take(3)

[['1:0.1,2:0.2'], ['3:0.4,4:0.6'], ['5:0.8,6:0.1']]

ItemRecScoreClean = ItemRecScore.map(lambda x: x[0].replace(':',' '))

ItemRecScore.take(3)

['1 0.1,2 0.2', '3 0.4,4 0.6', '5 0.8,6 0.1']

1) How do I extract just the score so that I can call upon, mean() and stdev() to compute the parameters.

2) How do I transform the score?

I am new to PySpark, so apologies if this is an obvious straightforward task. Any directions or tutorials that show how to manipulate and aggregate data in PySpark would be of help.

Upvotes: 2

Views: 1793

Answers (2)

Ravinder Karra
Ravinder Karra

Reputation: 307

May be this ...

import numpy as np
itr1 = sc.textFile('hdfs:///user/rkarra777/data/mean.txt')
itr2 = itr1.map(lambda x : x.split('\\t')[1]).map(lambda x : x.split(','))
itr3 = itr2.map(lambda x : [float(str((k.split(':')[1]))) for k in x ])
itr4 = itr3.map(lambda x : np.mean([item for item in x]))

Upvotes: 0

Holden
Holden

Reputation: 7452

Since you have multiple scores for each line of input, we can use flatMap to extract all of the itemids & scores from each line and get back an RDD where each element is one of the item & score values. From there we can extract just the score and convert it to a float so that PySpark's numerical methods can work on it. After we've done that, we can just call stats() on the RDD to get the information you are interested in.

inputData = sc.textFile(inputFile)
idScores = inputData.flatMap(lambda x: x.split("\t")[1].split(","))
scores = idScores.map(lambda x: float(x.split(":")[1]))
print scores.stats()

Upvotes: 2

Related Questions