Reputation: 1549
Below there is a for loop execution I am running on a Databricks cluster:
datalake_spark_dataframe_downsampled = pd.DataFrame(
{'IMEI' : ['001', '001', '001', '001', '001', '002', '002'],
'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826],
'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70],
'DaysDeploymentDate': [0, 0, 1, 1, 1, 1, 1],
'label': [0, 0, 1, 1, 0, 0, ]}
)
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )
# printSchema of the datalake_spark_dataframe_downsampled (spark df):
"root
|-- IMEI: string (nullable = true)
|-- OuterSensorConnected: integer (nullable = false)
|-- OuterHumidity: float (nullable = true)
|-- EnergyConsumption: float (nullable = true)
|-- DaysDeploymentDate: integer (nullable = true)
|-- label: integer (nullable = false)"
device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
print(device_ids) #["001", "002", ..."030"] 30 ids
for i in device_ids:
#filtered_dataset=datalake_spark_dataframe_downsampled.where(datalake_spark_dataframe_downsampled.IMEI.isin([i]))
#The above operation is executed inside the function training_models_operation_testing()
try:
training_models_operation_testing(i, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training, training_split_ratio_value, testing_split_ratio_value, mlflow_folder, cross_validation_rounds_value, features_column_name, optimization_metric_value, pretrained_models_T_minus_one, folder_name_T_minus_one, timestamp_snap, instrumentation_key_value, canditate_asset_ids, executor, device_ids)
except Exception as e:
custom_logging_function("ERROR", instrumentation_key_value, "ERROR EXCEPTION: {0}".format(e))
For the sake of the problem I have attached a sample data to have a general idea of how my data is..And imagine that many more rows and IDs exist. I have just created a few only for demonstration
As you can see this is a simple function call inside a for loop in a Databricks cluster running with pyspark.
Briefly, I first create a list of the unique ids (IMEI column) existing in my dataset. This is equal to 30. Thus, I am running 30 iterations with the for loop. In each iteration I am executing the following steps:
The code snippet attached is working successfully. Although the for loop is executed sequentially, one iteration at a time. The function is called for the first id and only after completes it goes to the next id. However, what I want is to transform the above for loop in a way that the 30 iterations will run concurrently in pyspark and NOT one-by-one. How could I achieve this in pyspark?
I am open to discussion and ideas testing, because I understand that what I am asking may not be so simple to be executed in a Spark environment.
My current output in logging (this is something I print the way below)
Iteration 1
Starting execution...
- Executing the function for id 001
Finished execution...
Iteration 2
Starting execution...
- Executing the function for id 002
Finished execution...
My desired output in logging (this is something I print the way below)
Starting execution...
- Executing the function for id 001
- Executing the function for id 002
- Executing the function for id 003
- Executing the function for id 004
.
.
.
.
- Executing the function for id 030
Finished execution...
All at the same time (concurrently) once
[Update] Based on the answer on the comments (threading module):
Upvotes: 2
Views: 4358
Reputation: 1549
"for loop" is linear execution/ Sequential execution and can be considered as single threaded execution.
If you want to run your code concurrently, you need to create multiple threads/processes to execute your code.
Below is the example to achieve multi threading. I didn't test the code, but should work :)
#importing threading library
import threading
# Creating a list of threads
thread_list = []
#looping all objects, creating a thread for each element in the loop, and append them to thread_list
for items in device_ids:
thread = threading.Thread(target=training_models_operation_testing,args=(items, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training,
training_split_ratio_value, testing_split_ratio_value, mlflow_folder,
cross_validation_rounds_value, features_column_name,
optimization_metric_value, pretrained_models_T_minus_one,
folder_name_T_minus_one, timestamp_snap, instrumentation_key_value,
canditate_asset_ids, executor, device_ids,))
thread_list.append(thread)
#Start multi threaded exucution
for thread in thread_list:
thread.start()
#Wait for all threads to finish
for thread in thread_list:
thread.join()
print("Finished executing all threads")
Upvotes: 3