Arduino
Arduino

Reputation: 373

Unable to use Pandas UDF in Databricks

I have to run a script that takes a few arguments as input and returns some results as output, so first I developed it in my local machine - working fine - and my goal now is running it in Databricks in order to parallelize it.

The issue comes when I'm trying to parallelize it. I'm taking the data from a Datalake already mounted (the issue is not there as I'm able to print the DataFrame after reading it), transforming it to a Spark DataFrame and passing each row to the main function grouped by material:

import pandas as pd
import os
import numpy as np
import scipy.stats as stats

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType

# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
                    StructField('Alpha', IntegerType(), True),
                    StructField('Beta', IntegerType(), True),
                    StructField('Sales', IntegerType(), True),
                    StructField('SL', FloatType(), True)])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
    material = data['Material'].iloc[0]
    print(material)      #<-------- THIS IS NOT PRINTING
    print('Hello world')   #<------ NEITHER IS THIS

    start = data['start '].iloc[0]
    end = data['end '].iloc[0]
    mu_lt = data['mu_lt'].iloc[0]
    sigma_lt = data['sigma_lt'].iloc[0]
    
    df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
    
    for beta in range(1, 2):
        for alpha in range(3, 5):
            # Do stuff
    
    return df


if __name__ == '__main__':
  spark = SparkSession.builder.getOrCreate()
  params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
  params_spark = spark.createDataFrame(params) 

  params_spark.groupby('Material').apply(main).show()

I'm not sure if I'm passing correctly the DF to the main function or even declaring it right, but none of the prints nor the DF defined in the main function seem to be running. The code throws no error, but no output is returned either.

Upvotes: 0

Views: 952

Answers (1)

Frank
Frank

Reputation: 512

Try this:

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])

Upvotes: 1

Related Questions