Sella
Sella

Reputation: 31

How to use partition by in Snowpark with TableFunction

I am trying to use UDTF in snowpark but not able to do partition by column. what I want the sql query is something like this :

select mcount.result from CUSTOMER, table(map_count(name) over (partition by name)) mcount;

Here "map_count" is my JavaScript UDTF. Below is the code snippet in Snowpark :

val session = Session.builder.configs(configs).create
val df = session.table("CUSTOMER")
val window = Window.partitionBy(col("name"))
val result = df.join(TableFunction("map_count"), col("name"))
//result.show()

Any suggestion how to use window partition by with table function? Is this even supported in snowpark?

Upvotes: 3

Views: 2159

Answers (3)

Raphael Roth
Raphael Roth

Reputation: 27373

Today (version Python 0.8.0) it works as follows (example is calculating the median of a group/partition), i.e. acts as an UDAF:

from statistics import median
from snowflake.snowpark.types import *

class MyMedian:
    values = []

    def __init__(self):
        self.values = []

    def process(self, value: float):
        self.values.append(value)
        #no return value
        for _ in range(0):
            yield

    def end_partition(self):
       yield ("partition_summary",median(self.values))

output_schema = StructType([
    StructField("label", StringType()),
    StructField("median", FloatType())
])

my_median = udtf(
    MyMedian,
    output_schema=output_schema,
    input_types=[FloatType()]
)

example_df = session.create_dataframe(
    [["A", 2.0],
     ["A", 2.0],
     ["A", 4.0],
     ["B", -1.0],
     ["B", 0.0],
     ["B", 1.0]],
    StructType([
        StructField("Key", StringType()),
        StructField("Value", FloatType())
    ])
)
example_df.show()

-------------------
|"KEY"  |"VALUE"  |
-------------------
|A      |2.0      |
|A      |2.0      |
|A      |4.0      |
|B      |-1.0     |
|B      |0.0      |
|B      |1.0      |
-------------------

Now the usage uf my_median:

example_df.join_table_function(my_median("VALUE").over(partition_by=col("KEY")))\
    .show()

------------------------------------------------
|"KEY"  |"VALUE"  |"LABEL"          |"MEDIAN"  |
------------------------------------------------
|A      |NULL     |partition_total  |2.0       |
|B      |NULL     |partition_total  |0.0       |
------------------------------------------------

Upvotes: 2

orellabac
orellabac

Reputation: 2645

I think that for now the workaround will be to use sql to do the invocation, like in the example below.

I created a dummy customer table and a dummy Javascript Table UDF.

And then I invoked it using SQL.

Obviously when the DF API is ready this will be unnecessary and the DataFrame API is cleaner.

import com.snowflake.snowpark.functions._
session.sql("ALTER SESSION SET QUERY_TAG='TEST_1'")
session.sql(""" 
CREATE OR REPLACE FUNCTION MAP_COUNT(NAME STRING) RETURNS TABLE (NUM FLOAT)
LANGUAGE JAVASCRIPT AS 
$$
    {
      processRow: function (row, rowWriter, context) {
        this.ccount = this.ccount + 1;
      },
      finalize: function (rowWriter, context) {
       rowWriter.writeRow({NUM: this.ccount});
      },
      initialize: function(argumentInfo, context) {
       this.ccount = 0;
      }
    }
$$;
""").show()

session.sql("""
CREATE OR REPLACE TABLE CUSTOMER (
CUST_ID INTEGER,
CUST_NAME TEXT
)
""").show()
session.sql("INSERT INTO CUSTOMER  SELECT 1, 'John'").show()
session.sql("INSERT INTO CUSTOMER  SELECT 2, 'John'").show()
session.sql("INSERT INTO CUSTOMER  SELECT 3, 'John'").show()
session.sql("INSERT INTO CUSTOMER  SELECT 4, 'Mary'").show() 
session.sql("INSERT INTO CUSTOMER  SELECT 5, 'Mary'").show()  
import com.snowflake.snowpark.functions._
val df = session.table("CUSTOMER")
val window = Window.partitionBy(col("CUST_NAME"))
val res = session.sql("select CUST_NAME,NUM FROM CUSTOMER, TABLE(MAP_COUNT(CUST_NAME) OVER (PARTITION BY CUST_NAME ORDER BY CUST_NAME))")
res.show()
// Output will be
//-----------------------
//|"CUST_NAME"  |"NUM"  |
//-----------------------
//|Mary         |2.0    |
//|John         |3.0    |

Upvotes: 0

Khush Bhatia
Khush Bhatia

Reputation: 518

Unfortunately, this is not currently supported in Snowpark. But we are working on it.

Upvotes: 2

Related Questions