Sai
Sai

Reputation: 31

In Azure databricks writing pyspark dataframe to eventhub is taking too long as there3 Million records in dataframe

Oracle database table has 3 million records. I need to read it into dataframe and then convert it to json format and send it to eventhub for downstream systems.

Below is my pyspark code to connect and read oracle db table as dataframe

df = spark.read \
            .format("jdbc") \
            .option("url", databaseurl) \
            .option("query","select * from tablename") \
            .option("user", loginusername) \
            .option("password", password) \
            .option("driver", "oracle.jdbc.driver.OracleDriver") \
            .option("oracle.jdbc.timezoneAsRegion", "false") \
            .load()

then I am converting the column names and values of each row into json (placing under a new column named body) and then sending it to Eventhub.

I have defined ehconf and eventhub connection string. Below is my write to eventhub code

df.select("body") \
   .write\
   .format("eventhubs") \
   .options(**ehconf) \    
   .save()

my pyspark code is taking 8 hours to send 3 million records to eventhub.

Could you please suggest how to write pyspark dataframe to eventhub faster ?

My Eventhub is created under eventhub cluster which has 1 CU in capacity

Databricks cluster config : mode: Standard runtime: 10.3 worker type: Standard_D16as_v4 64GB Memory,16 cores (min workers :1, max workers:5) driver type: Standard_D16as_v4 64GB Memory,16 cores

Upvotes: 1

Views: 387

Answers (1)

restlessmodem
restlessmodem

Reputation: 448

The problem is that the jdbc connector just uses one connection to the database by default so most of your workers are probably idle. That is something you can confirm in Cluster Settings > Metrics > Ganglia UI.

To actually make use of all the workers the jdbc connector needs to know how to parallelize retrieving your data. For this you need a field that has evenly distributed data over its values. For example if you have a date field in your data and every date has a similar amount of records, you can use it to split up the data:

df = spark.read \
  .format("jdbc") \
  .option("url", jdbcUrl) \
  .option("dbtable", tableName) \
  .option("user", jdbcUsername) \
  .option("password", jdbcPassword) \
  .option("numPartitions", 64) \
  .option("partitionColumn", "<dateField>") \
  .option("lowerBound", "2019-01-01") \
  .option("upperBound", "2022-04-07") \
  .load()

You have to define the field name and the min and max value of that field so that the jdbc connector can try to split the work evenly between the workers. The numPartitions is the amount of individual connections opened and the best value depends on the count of workers in your cluster and how many connections your datasource can handle.

Upvotes: 0

Related Questions