
Reputation: 45

How to count distinct based on a condition over a window aggregation in PySpark?

This is a sample dataframe of the data that I have:

from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DateType, StructType, StructField
from datetime import datetime
from pyspark.sql import Window

data2 = [
  (datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 1", 0),
  (datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 2", 1),
  (datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 2", 1),
  (datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 3", 1),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 1", 1),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 2", 3),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 3", 2),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 1", 10),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 2", 15),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 3", 9),
  (datetime.strptime("2021/01/02", "%Y/%m/%d"), "Store A", "Product 1", 0),
  (datetime.strptime("2021/01/03", "%Y/%m/%d"), "Store A", "Product 2", 2)

schema = StructType([ \
    StructField("date",DateType(),True), \
    StructField("store",StringType(),True), \
    StructField("product",StringType(),True), \
    StructField("stock_c", IntegerType(), True)
df = spark.createDataFrame(data=data2,schema=schema)
 |-- date: date (nullable = true)
 |-- store: string (nullable = true)
 |-- product: string (nullable = true)
 |-- stock_c: integer (nullable = true)

|date      |store  |product  |stock_c|
|2020-12-29|Store B|Product 1|0      |
|2020-12-29|Store B|Product 2|1      |
|2020-12-31|Store A|Product 2|1      |
|2020-12-31|Store A|Product 3|1      |
|2021-01-01|Store A|Product 1|1      |
|2021-01-01|Store A|Product 2|3      |
|2021-01-01|Store A|Product 3|2      |
|2021-01-01|Store B|Product 1|10     |
|2021-01-01|Store B|Product 2|15     |
|2021-01-01|Store B|Product 3|9      |
|2021-01-02|Store A|Product 1|0      |
|2021-01-03|Store A|Product 2|2      |

Column stock_c represents the cumulative stock of the product in the store.

I want to create two new columns, one of them tells me how many products does the store have or has had in the past. This is easy. The other column I need is the number of products that have stock that day in that store, and this is where I can't get to solve this.

This is the code that I used:

windowStore = Window.partitionBy("store").orderBy("date")

df \
.withColumn("num_products", approx_count_distinct("product").over(windowStore)) \
.withColumn("num_products_with_stock", approx_count_distinct(when(col("stock_c") > 0, col("product"))).over(windowStore)) \

This is what I get:

|      date|  store|  product|stock_c|num_products|num_products_with_stock|
|2020-12-31|Store A|Product 2|      1|           2|                      2|
|2020-12-31|Store A|Product 3|      1|           2|                      2|
|2021-01-01|Store A|Product 1|      1|           3|                      3|
|2021-01-01|Store A|Product 2|      3|           3|                      3|
|2021-01-01|Store A|Product 3|      2|           3|                      3|
|2021-01-02|Store A|Product 1|      0|           3|                      3|
|2021-01-03|Store A|Product 2|      2|           3|                      3|
|2020-12-29|Store B|Product 1|      0|           2|                      1|
|2020-12-29|Store B|Product 2|      1|           2|                      1|
|2021-01-01|Store B|Product 1|     10|           3|                      3|
|2021-01-01|Store B|Product 2|     15|           3|                      3|
|2021-01-01|Store B|Product 3|      9|           3|                      3|

This is what I would like to get:

|      date|  store|  product|stock_c|num_products|num_products_with_stock|
|2020-12-31|Store A|Product 2|      1|           2|                      2|
|2020-12-31|Store A|Product 3|      1|           2|                      2|
|2021-01-01|Store A|Product 1|      1|           3|                      3|
|2021-01-01|Store A|Product 2|      3|           3|                      3|
|2021-01-01|Store A|Product 3|      2|           3|                      3|
|2021-01-02|Store A|Product 1|      0|           3|                      2|
|2021-01-03|Store A|Product 2|      2|           3|                      2|
|2020-12-29|Store B|Product 1|      0|           2|                      1|
|2020-12-29|Store B|Product 2|      1|           2|                      1|
|2021-01-01|Store B|Product 1|     10|           3|                      3|
|2021-01-01|Store B|Product 2|     15|           3|                      3|
|2021-01-01|Store B|Product 3|      9|           3|                      3|

The key is in these two lines, as Product 1 has no more stock and then it should reflect that you only have 2 products with stock (Product 2 and Product 3).

|2021-01-02|Store A|Product 1|      0|           3|                      2|
|2021-01-03|Store A|Product 2|      2|           3|                      2|

How can I achieve what I want?

Thanks in advance.

Upvotes: 2

Views: 1530

Answers (2)


Reputation: 45

I finally got to solve it with the help of @danimille

First of all, I completed the missing dates and then calculated the number of products with stock with a helper column called has_stock:

from datetime import timedelta
from pyspark.sql.types import ArrayType, TimestampType

def dates_between(t1, t2):
    return [t1 + timedelta(days=x) for x in range(0, int((t2-t1).days) + 1)]
dates_between_udf = udf(dates_between, ArrayType(TimestampType()))

date_filler = (
  .withColumn('date', to_timestamp(to_date('date'))) # Ñapa de las gordas
  .withColumn("max_date", max("date").over(Window.partitionBy("store")))
  .withColumn("min_date", min("date").over(Window.partitionBy("store")))
  .withColumn("products", collect_set("product").over(Window.partitionBy("store")))
  .withColumn("dates", dates_between_udf(col("min_date"), col("max_date")))
  .select("store", "products", "dates")
  .withColumn("product", explode("products"))
  .withColumn("date", explode("dates"))
  .drop("products", "dates")

  .join(date_filler, on = ["store", "product", "date"], how = "full")
    last("stock_c", ignorenulls=True).over(Window.partitionBy("store", "product").orderBy(col("date")))
  .na.fill(0, "stock_c")
  .withColumn("num_products", approx_count_distinct("product").over(windowStore))
  .withColumn("has_stock", when(col("stock_c") > 0, 1).otherwise(0))
  .withColumn("num_products_with_stock", sum("has_stock").over(Window.partitionBy("store", "date")))

The result is the following:

|  store|  product|               date|stock_c|num_products|num_products_with_stock|has_stock|
|Store A|Product 1|2020-12-31 00:00:00|      0|           3|                      2|        0|
|Store A|Product 2|2020-12-31 00:00:00|      1|           3|                      2|        1|
|Store A|Product 3|2020-12-31 00:00:00|      1|           3|                      2|        1|
|Store A|Product 1|2021-01-01 00:00:00|      1|           3|                      3|        1|
|Store A|Product 2|2021-01-01 00:00:00|      3|           3|                      3|        1|
|Store A|Product 3|2021-01-01 00:00:00|      2|           3|                      3|        1|
|Store A|Product 1|2021-01-02 00:00:00|      0|           3|                      2|        0|
|Store A|Product 2|2021-01-02 00:00:00|      3|           3|                      2|        1|
|Store A|Product 3|2021-01-02 00:00:00|      2|           3|                      2|        1|
|Store A|Product 1|2021-01-03 00:00:00|      0|           3|                      2|        0|
|Store A|Product 2|2021-01-03 00:00:00|      2|           3|                      2|        1|
|Store A|Product 3|2021-01-03 00:00:00|      2|           3|                      2|        1|
|Store B|Product 1|2020-12-29 00:00:00|      0|           3|                      1|        0|
|Store B|Product 2|2020-12-29 00:00:00|      1|           3|                      1|        1|
|Store B|Product 3|2020-12-29 00:00:00|      0|           3|                      1|        0|
|Store B|Product 1|2020-12-30 00:00:00|      0|           3|                      1|        0|
|Store B|Product 2|2020-12-30 00:00:00|      1|           3|                      1|        1|
|Store B|Product 3|2020-12-30 00:00:00|      0|           3|                      1|        0|
|Store B|Product 1|2020-12-31 00:00:00|      0|           3|                      1|        0|
|Store B|Product 2|2020-12-31 00:00:00|      1|           3|                      1|        1|
only showing top 20 rows

Upvotes: 0


Reputation: 350

You can find below the code I used to solve the issue of num_products_with_stock column. Basically I created a new conditional column that replace the Product for None when the stock_c is 0. At the end of day I use a very close code as you had used but did the F.approx_count_distinct on this new column I created.

from pyspark.sql import functions as F
from pyspark.sql import Window as W

window1 = W.partitionBy("store").orderBy("date")
window2 = W.partitionBy(["store", "date"]).orderBy("date")

df = (df 
        .withColumn("num_products", F.approx_count_distinct("product").over(window1))
        .withColumn('hasItem', F.when(F.col('stock_c') > 0, F.col('product')).otherwise(None))
        .withColumn("num_products_with_stock", F.approx_count_distinct(F.col("hasItem")).over(window2))

Hope this solve your issue!

Upvotes: 1

Related Questions