Reputation: 107
I'm want to ingest streaming data from Eventhub to ADLS gen2 with specified format.
I did for batch data ingestion, from DB to ADLS and Container to Container but now I want to try with streaming data ingestion.
Can you please guide me from where to start to proceed further step. I did create Eventhub, Databrick Instance and Storage Account in Azure.
Upvotes: 1
Views: 883
Reputation: 87154
You just need to follow the documentation (for Scala, for Python) for EventHubs Spark connector. In the simplest way the code looks as following (for Python):
readConnectionString = "..."
ehConf = {}
# this is required for versions 2.3.15+
ehConf['eventhubs.connectionString']=sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(readConnectionString)
df = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
# casting binary payload to String (but it's really depends on the
# data format inside the topic)
cdf = df.withColumn("body", F.col("body").cast("string"))
# write data to storage
stream = cdf.writeStream.format("delta")\
.outputMode("append")\
.option("checkpointLocation", "/path/to/checkpoint/directory")\
.start("ADLS location")
You may need to add additional options, like, starting positions, etc. but everything is described well in the documentation.
Upvotes: 1