Sai Varun Kumar
Sai Varun Kumar

Reputation: 107

How to ingest data from Eventhub to ADLS using Databricks cluster(Scala)

I'm want to ingest streaming data from Eventhub to ADLS gen2 with specified format.

I did for batch data ingestion, from DB to ADLS and Container to Container but now I want to try with streaming data ingestion.

Can you please guide me from where to start to proceed further step. I did create Eventhub, Databrick Instance and Storage Account in Azure.

Upvotes: 1

Views: 883

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

You just need to follow the documentation (for Scala, for Python) for EventHubs Spark connector. In the simplest way the code looks as following (for Python):

readConnectionString = "..."
ehConf = {}
# this is required for versions 2.3.15+
ehConf['eventhubs.connectionString']=sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(readConnectionString)

df = spark.readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

# casting binary payload to String (but it's really depends on the 
# data format inside the topic)
cdf = df.withColumn("body", F.col("body").cast("string"))

# write data to storage
stream = cdf.writeStream.format("delta")\
  .outputMode("append")\
  .option("checkpointLocation", "/path/to/checkpoint/directory")\
  .start("ADLS location")

You may need to add additional options, like, starting positions, etc. but everything is described well in the documentation.

Upvotes: 1

Related Questions