Reputation: 373
I have to run a script that takes a few arguments as input and returns some results as output, so first I developed it in my local machine - working fine - and my goal now is running it in Databricks in order to parallelize it.
The issue comes when I'm trying to parallelize it. I'm taking the data from a Datalake already mounted (the issue is not there as I'm able to print the DataFrame after reading it), transforming it to a Spark DataFrame and passing each row to the main function grouped by material:
import pandas as pd
import os
import numpy as np
import scipy.stats as stats
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType
# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
StructField('Alpha', IntegerType(), True),
StructField('Beta', IntegerType(), True),
StructField('Sales', IntegerType(), True),
StructField('SL', FloatType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
material = data['Material'].iloc[0]
print(material) #<-------- THIS IS NOT PRINTING
print('Hello world') #<------ NEITHER IS THIS
start = data['start '].iloc[0]
end = data['end '].iloc[0]
mu_lt = data['mu_lt'].iloc[0]
sigma_lt = data['sigma_lt'].iloc[0]
df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
for beta in range(1, 2):
for alpha in range(3, 5):
# Do stuff
return df
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
params_spark = spark.createDataFrame(params)
params_spark.groupby('Material').apply(main).show()
I'm not sure if I'm passing correctly the DF to the main function or even declaring it right, but none of the prints nor the DF defined in the main function seem to be running. The code throws no error, but no output is returned either.
Upvotes: 0
Views: 952