Reputation: 65
I have a Pyspark code in which I want to read the data from 4 azure eventhubs. I need to run the code every 1 minute. But after 1.5 hours execution performance degrades. The job should complete within 1 minute, but execution time increases after almost 1.5 hrs. Code is run as databricks jobs. Cluster has 3 nodes of 16 GB memory and 4 cores and it will be always up and running as the code need to be executed every 1 min.
below is the code used to read data:
df = spark.read.format("eventhubs") \
.options(**eventhub_configuration) \
.load() \
.withColumn("eh_ky",f.lit(key)) \
.persist()
above code is used to read data from all the eventhubs in a loop and union is taken to get single dataframe. After that some simple transformations are done. finally offsets are saved by updating the json file with offset information available in dataframe without using spark checkpointing.
I tried to assign G1GC garbage collector in cluster environment variable configuration, but no improvement.
Upvotes: 0
Views: 55
Reputation: 8140
Below are possible ways you can try.
persist()
makes the data to store in memory, if you not explicitly unpersist
, over time the memory the usage will be increased.Here, is the sample scala code.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
Similarly, you do in pyspark.
ThreadPoolExecutor
Below is the sample code, alter it accordingly.
from concurrent.futures import ThreadPoolExecutor
def read_eventhub(config):
return spark.read.format("eventhubs").options(**config).load().withColumn("eh_ky", f.lit(key))
with ThreadPoolExecutor() as executor:
dataframes = list(executor.map(lambda cfg: read_eventhub(*cfg), eventhub_configs))
df = reduce(lambda df1, df2: df1.union(df2), dataframes)
Here, eventhub_configs
is the list eventhubs source you need.
receiverTimeout
and operationTimeout
configurations, check this github documentation for more information.Upvotes: 0