Reputation: 475
I am using pyspark version 3.1.1-amzn-0 in an EMR where I am iterating over a batch DF and for each record I am filtering some data from a delta DF and then producing the results in a kafka topic, but this seems to be running sequentially and I want to process multiple records in parallel for better performance, so how can I do this.
Below I have attached my approach,
# Reading the deltaTable
deltaDf = spark.read \
.format("delta") \
.load("s3://deltaLake/")
def processEachMicroBatch(batchDF, epochId):
kafka_df_rows = batchDF.collect()
for index, row in enumerate(kafka_df_rows):
anon_id = row['anon_id']
pro_id = row['pro_id']
prof_id = row['prof_id']
key = anon_id[0:3]
filteredDF = deltaDf.filter((deltaDf.pro_id == pro_id) & (deltaDf.key == key) & (deltaDf.anon_id == anon_id) & (deltaDf.prof_id == prof_id))
if filteredDF.rdd.isEmpty():
exec_status_data = [(pro_id, anon_id, prof_id)]
exec_status_df = spark.createDataFrame(data=exec_status_data, schema=schema)
exec_status_df = exec_status_df.withColumn("dataNotFound", lit(True))
produceDFMessagesInKafka(exec_status_df, 'kafkaTopic')
else:
produceDFMessagesInKafka(filteredDF, 'kafkaTopic')
Upvotes: 0
Views: 603
Reputation: 824
IMHO you're doing too much lifting that Spark can handle, killing the parallelization that Spark can offer out of the box.
I would advise the following approach instead:
# use withColumn() to handle "key = anon_id[0:3]"
def filter_delta_with_batch(delta_df, batch_df):
filtered_df = delta_df.join(
batch_df,
(delta_df.pro_id == batch_df.pro_id) & (delta_df.key == batch_df.key) & (delta_df.anon_id == batch_df.anon_id) & (delta_df.prof_id == batch_df.prof_id),
how='leftsemi',
)
return filtered_df
# process filtered_df with kafka
You might be worried that a join would be highly inefficient here but leftsemi and leftanti are powerful Spark joins for filtering data. Of course it might depends on data repartition / shuffle induced.
Upvotes: 1