Divzz
Divzz

Reputation: 65

Read data from eventhubs using pyspark batch read hangs after few hours repeated execution

I have a Pyspark code in which I want to read the data from 4 azure eventhubs. I need to run the code every 1 minute. But after 1.5 hours execution performance degrades. The job should complete within 1 minute, but execution time increases after almost 1.5 hrs. Code is run as databricks jobs. Cluster has 3 nodes of 16 GB memory and 4 cores and it will be always up and running as the code need to be executed every 1 min.

below is the code used to read data:

df = spark.read.format("eventhubs") \
                .options(**eventhub_configuration) \
                .load() \
                .withColumn("eh_ky",f.lit(key)) \
                .persist()

above code is used to read data from all the eventhubs in a loop and union is taken to get single dataframe. After that some simple transformations are done. finally offsets are saved by updating the json file with offset information available in dataframe without using spark checkpointing.

  1. Is there any way to resolve the prefomance degradation after continuous execution of 1.5 hrs?
  2. Adding more eventhubs as source will lead to increased execution time even though the data volume is small. How to resolve the issue?
  3. Code waits for 5 minutes to fail the eventhub read when there is any connectivity issue. How to reduce the timeout time to few seconds?

I tried to assign G1GC garbage collector in cluster environment variable configuration, but no improvement.

Upvotes: 0

Views: 55

Answers (1)

JayashankarGS
JayashankarGS

Reputation: 8140

Below are possible ways you can try.

  1. The persist() makes the data to store in memory, if you not explicitly unpersist, over time the memory the usage will be increased.

Here, is the sample scala code.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

Similarly, you do in pyspark.

  1. Instead of looping while reading the multiple eventhubs, you do parallel reading using ThreadPoolExecutor

Below is the sample code, alter it accordingly.

from concurrent.futures import ThreadPoolExecutor

def read_eventhub(config):
    return spark.read.format("eventhubs").options(**config).load().withColumn("eh_ky", f.lit(key))

with ThreadPoolExecutor() as executor:
    dataframes = list(executor.map(lambda cfg: read_eventhub(*cfg), eventhub_configs))

df = reduce(lambda df1, df2: df1.union(df2), dataframes)

Here, eventhub_configs is the list eventhubs source you need.

  1. To reduce timeout you set receiverTimeout and operationTimeout configurations, check this github documentation for more information.

Upvotes: 0

Related Questions