yasin mohammed
yasin mohammed

Reputation: 471

effective way to groupby without using pivot in pyspark

I have a query where I need to calculate memory utilization using pyspark. I had achieved this with python pandas using pivot but now I need to do it in pyspark and pivoting would be an expensive function so I would like to know if there is any alternative in pyspark for this solution

time_stamp          Hostname    kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1     memory  Total       100
2019/08/17 10:01:06 Server1     memory  used        35
2019/08/17 10:01:09 Server1     memory  buffer      8
2019/08/17 10:02:04 Server1     memory  cached      10
2019/08/17 10:01:05 Server2     memory  Total       100
2019/08/17 10:01:06 Server2     memory  used        42
2019/08/17 10:01:09 Server2     memory  buffer      7
2019/08/17 10:02:04 Server2     memory  cached      9
2019/08/17 10:07:05 Server1     memory  Total       100
2019/08/17 10:07:06 Server1     memory  used        35
2019/08/17 10:07:09 Server1     memory  buffer      8
2019/08/17 10:07:04 Server1     memory  cached      10
2019/08/17 10:08:05 Server2     memory  Total       100
2019/08/17 10:08:06 Server2     memory  used        35
2019/08/17 10:08:09 Server2     memory  buffer      8
2019/08/17 10:08:04 Server2     memory  cached      10

Which need to be transformed to

time_stamp      Hostname    kpi Percentage
2019-08-17 10:05:00 Server1     memory  17
2019-08-17 10:05:00 Server2     memory  26
2019-08-17 10:10:00 Server1     memory  17
2019-08-17 10:10:00 Server2     memory  17

Python code i used

df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000 
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100

Looking for a to replicate this in pyspark and a more efficient way in python as pivot is an expensive operation and I need to perform this every 5 mins on a really large dataset

Upvotes: 1

Views: 1173

Answers (2)

yasin mohammed
yasin mohammed

Reputation: 471

1st is using pivot in spark and the 2nd is using map.

1st Solution

df = sql.read.csv("/home/yasin/Documents/IMI/Data/memorry sample.csv", header = "True").withColumn("timestamp", unix_timestamp("time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType())).drop("time_stamp")
df = df.withColumn("unixtime",unix_timestamp(df["timestamp"],"yyyy/MM/dd HH:mm:ss"))
df = df.withColumn("unixtime2",(round(df["unixtime"]/300)*300).cast("timestamp"))
df = df.groupBy("unixtime2" , "Hostname" , "kpi").pivot("kpi_subtype").agg(mean(df["value_current"]))
df = df.withColumn("Percentage", (df["Total"] - (df["Total"] - df["Used"] + df["buffer"] + df["cached"])) /df["Total"] * 100)

2nd Solution

df = sql.read.csv("/home/yasin/Documents/IMI/Data/memorry sample.csv", header = "True").withColumn("timestamp", unix_timestamp("time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType())).drop("time_stamp")
df = df.withColumn("unixtime",unix_timestamp(df["timestamp"],"yyyy/MM/dd HH:mm:ss"))
df = df.withColumn("unixtime2",(round(df["unixtime"]/300)*300).cast("timestamp"))
df = df.withColumn("value_current2",df["value_current"].cast("Float"))
df = df.groupBy("unixtime2" , "Hostname" , "kpi").agg(collect_list(create_map("kpi_subtype","value_current2")).alias("mapped"))
nn=df.withColumn("formula" ,  ( df["mapped"][0]["Total"].cast("Float") - (( df["mapped"][0]["Total"].cast("Float") - df["mapped"][1]["used"].cast("Float")  + df["mapped"][2]["buffer"].cast("Float") + df["mapped"][3]["cached"].cast("Float") ) / df["mapped"][0]["Total"].cast("Float") ) * 100).cast("Float"))

Upvotes: 1

Gelerion
Gelerion

Reputation: 1714

Pivoting is expensive when the list of values that are translated to columns is unknown. Spark has an overloaded pivot method that takes them as an argument.

def pivot(pivotColumn: String, values: Seq[Any])

In case they aren't known Spark must sort and collect the distinct values from your dataset. Otherwise, the logic is pretty straightforward and described here.

The implementation adds a new logical operator (o.a.s.sql.catalyst.plans.logical.Pivot). That logical operator is translated by a new analyzer rule (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) that currently translates it into an aggregation with lots of if statements, one expression per pivot value.

For example, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") would be translated into the equivalent of df.groupBy("A", "B").agg(expr(“sum(if(C = ‘small’, D, null))”), expr(“sum(if(C = ‘large’, D, null))”)). You could have done this yourself but it would get long and possibly error prone quickly.

Without pivoting I would do something like that:

val in = spark.read.csv("input.csv")
      //cast to the unix timestamp
      .withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
      .drop($"time_stamp")

Now we can group our dataset by the time window with hostname and collect KPI metrics into a map.
There is an excellent answer describing just that.

val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }

val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
  .agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))

Output

+------------------------------------------+--------+-------------------------------------------------------------+
|window                                    |Hostname|metrics                                                      |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+

Now we define some aliases and a simple select statement:

val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")

val result = grouped.select($"window", $"Hostname",
          (total - ((total - used + buffer + cached) / total) * 100).as("percentage"))

And here we go:

+------------------------------------------+--------+----------+
|window                                    |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0      |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0      |
+------------------------------------------+--------+----------+

Upvotes: 3

Related Questions