Reputation: 2561
I have a function below which made some customers per customer using a fixed DataFrame.
def calculate_fun(customer):
"""
Instead of loop
"""
result_output = main_calculation_fun(DataFrame_1[['ColA', ColB', 'ColC', 'ColD']], DataFrame_2, customer)
return pd.DataFrame(result_output)
Currently I run the code like this:
all_result = list(map(calculate_fun, customers['customer_id'].unique().tolist()))
I am using DataBricks and can see I have 150+ cores and 400gb RAM. How can I distribute the customer_ids over the cores? Because right now it runs for 2+ days.
I tried the following:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.createDataFrame(data = pd.DataFrame(unique_contracts, columns = ['customer_id']))
DataFrame_1 = spark.createDataFrame(data = DataFrame_1)
DataFrame_2 = spark.createDataFrame(data = DataFrame_2)
def reformat(partitionData):
for row in partitionData:
df_result = main_calculation_fun(DataFrame_1[['ColA', 'ColB', 'ColC', 'ColD']], DataFrame_2, row.customer_id)
return pd.DataFrame(df_result)
df2=df.rdd.mapPartitions(reformat).toDF(["A","B", "C", "D", "E"])
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
Upvotes: 0
Views: 673
Reputation: 1148
As you are already using Pandas dataframes it might be the easiest to execute your function with applyInPandas
.
Here a sample structure which you have to addapt with your resulting schema. In addition to this your input dataframe is expected to be a Spark dataframe and it should be possible to group by customers
.
from pyspark.sql.types import StructType, StringType, StructField, FloatType,
res_schema= StructType([
StructField('Column 1', FloatType()),
StructField('Column 2', FloatType()),
StructField('Column 3', StringType())
])
def calculate_fun(pdf):
# applyInPandas will feed us a group converted to a Pandas dataframe to this function
return main_calculation_fun(pdf[['ColA', 'ColB', 'ColC', 'ColD']], DataFrame_2)
all_result = df.groupBy('customer').applyInPandas(calculate_fun, schema=res_schema)
In order to use the many cores you have, you should consider multithreading on the map function. I believe it should look something like this:
from multiprocessing import Pool
parallel_processes = 20
with Pool(parallel_processes) as p:
all_result = p.map(calculate_fun, customers['customer_id'].unique().tolist())
This will introduce 20 parallel running processes. You probably can still tune yours upward depending on the resources that a single iteration needs.
Careful: In this case, all the processes are executed on the driver node. If your cores are distributed over multiple physical machines, you won't be able to benefit from all of them.
It is also suggested to look into the calculate_fun
function if it can be rewritten as UDF or even fully rewritten to PySpark. This would help optimize the execution even further.
Upvotes: 1