pyspark - Grouping and calculating data

I have the following csv file.

Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

I have to create a RDD where USER MODEL AND GT are PRIMARY KEY, I don't know if I have to do it using them as a tuple.

Then when I have the primary key field I have to calculate AVG, MAX and MIN from 'x','y' and 'z'.

Here is an output:

User,Model,gt,media(x,y,z),desviacion(x,y,z),max(x,y,z),min(x,y,z)
a, nexus4,stand,-3.0,0.7,8.2,2.8,0.14,0.0,-1.0,0.8,8.2,-5.0,0.6,8.2

Any idea about how to group them and for example get the media values from "x"

With my current code I get the following.

# Data loading

    lectura = sc.textFile("Phones_accelerometer.csv")

    datos = lectura.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(x.split(",")[3], x.split(",")[4], x.split(",")[5])))

    sumCount = datos.combineByKey(lambda value: (value, 1), lambda x, value: (x[0] + value, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))

An example of my tuples:

   [(('a', 'nexus4', 'stand'), ('-5.958191', '0.6880646', '8.135345'))]

Upvotes: 0

Views: 1381

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If you have a csv data in a file as given in the question then you can use sqlContext to read it as a dataframe and cast the appropriate types as

df = sqlContext.read.format("com.databricks.spark.csv").option("header", True).load("path to csv file")
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = df.select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x').cast('float'), F.col('y').cast('float'), F.col('z').cast('float'))

I have selected primary keys and necessary columns only which should give you

+----+------+-----+----------+---------+--------+
|User|Model |gt   |x         |y        |z       |
+----+------+-----+----------+---------+--------+
|a   |nexus4|stand|-5.958191 |0.6880646|8.135345|
|a   |nexus4|stand|-5.95224  |0.6702118|8.136536|
|a   |nexus4|stand|-5.9950867|0.6535492|8.204376|
|a   |nexus4|stand|-5.9427185|0.6761627|8.128204|
+----+------+-----+----------+---------+--------+

All of your requirements: median, deviation, max and min depend on the list of x, y and z when grouped by primary keys: User, Model and gt.

So you would need groupBy and collect_list inbuilt function and a udf function to calculate all of your requiremnts. Final step is to separate them in different columns which are given below

from math import sqrt
def calculation(array):
    num_items = len(array)
    print num_items, sum(array)
    mean = sum(array) / num_items
    differences = [x - mean for x in array]
    sq_differences = [d ** 2 for d in differences]
    ssd = sum(sq_differences)
    variance = ssd / (num_items - 1)
    sd = sqrt(variance)
    return [mean, sd, max(array), min(array)]

calcUdf = F.udf(calculation, T.ArrayType(T.FloatType()))

df.groupBy('User', 'Model', 'gt')\
    .agg(calcUdf(F.collect_list(F.col('x'))).alias('x'), calcUdf(F.collect_list(F.col('y'))).alias('y'), calcUdf(F.collect_list(F.col('z'))).alias('z'))\
    .select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x')[0].alias('median_x'), F.col('y')[0].alias('median_y'), F.col('z')[0].alias('median_z'), F.col('x')[1].alias('deviation_x'), F.col('y')[1].alias('deviation_y'), F.col('z')[1].alias('deviation_z'), F.col('x')[2].alias('max_x'), F.col('y')[2].alias('max_y'), F.col('z')[2].alias('max_z'), F.col('x')[3].alias('min_x'), F.col('y')[3].alias('min_y'), F.col('z')[3].alias('min_z'))\
    .show(truncate=False)

So finally you should have

+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
|User|Model |gt   |median_x |median_y |median_z|deviation_x|deviation_y|deviation_z|max_x     |max_y    |max_z   |min_x     |min_y    |min_z   |
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
|a   |nexus4|stand|-5.962059|0.6719971|8.151115|0.022922019|0.01436464 |0.0356973  |-5.9427185|0.6880646|8.204376|-5.9950867|0.6535492|8.128204|
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+

I hope the answer is helpful.

Upvotes: 1

David
David

Reputation: 11573

You'll have to used groupByKey to get median. While generally not preferred for performance reasons, finding the median value of a list of numbers can not be parallelized easily. The logic to compute median requires the entire list of numbers. groupByKey is the aggregation method to use when you need to process all the values for a key at the same time

Also, as mentioned in the comments, this task would be easier using Spark DataFrames.

Upvotes: 0

Related Questions