Mamta
Mamta

Reputation: 41

Read data from EventHub to table in DataBricks using scala

I'm trying to read data from event hub to Databricks and want to give it a structure as col1, col2 etc.

Issue- I see that only the 1st record is coming in a proper structure and doesn't load the rest of the data.

The Data in the event hub looks exactly like as below - there are 3 records and each record is broken into 2 lines after the DateTime column-->

body 24,5300,123456,1,PLAN-QD,PMT,10/09/15 00:00,1253323,INTEREST,LOAN-AS,NULL 32,1300,12458,2,PLAN,PMT,25/09/15 00:00,12532123,INTEREST,LOAN,NULL 36,1400,19458,25,PLAN,PMTS,25/11/15 00:00,92532163,INTEREST,LOAN-DS,NULL


The headers of the columns are ( not present in the evnt hub and just for reference but in output to the table, they should be there)-> id,Bal,accnum,active,plan,Status,DateTime,Type,Loan,Where

My code is as below-

import org.apache.spark.eventhubs._

import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

import spark.implicits._

val connectionString = ConnectionStringBuilder("my connection string").setEventHubName("oth-transactions").build //this connection string to read from eventhub

val customEventhubParameters = EventHubsConf(connectionString)

val ConsumerDF = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).option("checkpointLocation", "/tmp/checkpoint").load()

val OTHDF = ConsumerDF.select($"body" cast "string")

val OTHDF2 = OTHDF.withColumn("temp", split(col("body"), "\,")).select(

(0 until 52).map(i => col("temp").getItem(i).as(s"col$i")): _*

)

OTHDF2.printSchema

OTHDF2.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("Table_Name")

May I please get some advice around how I can read all these records and lod it in a table in data bricks?

Thanks in advance!!

Upvotes: 1

Views: 1286

Answers (1)

Vijay Kumar Sharma
Vijay Kumar Sharma

Reputation: 397

You are splitting the body columns using "\," instead of "," please try it.

val OTHDF2 = OTHDF.withColumn("temp", split(col("body"), ","))
               .select($"_tmp".getItem(0).as("id")
                      ,$"_tmp".getItem(1).as("Bal")
                      ,$"_tmp".getItem(2).as("accnum")
                      ,$"_tmp".getItem(3).as("active")
                      ,$"_tmp".getItem(4).as("plan")
                      ,$"_tmp".getItem(5).as("Status")
                      ,$"_tmp".getItem(6).cast("timestamp").as("DateTime")
                      ,$"_tmp".getItem(7).as("Type")
                      ,$"_tmp".getItem(8).as("Loan")
                      ,$"_tmp".getItem(9).as("Where")
                     )
               .drop("_tmp")
               .writeStream
               .format("csv")
               .outputMode("append")
               .option("checkpointLocation", "/FileStore/checkpointLocation.csv")
               .option("path", "/FileStore/data.csv")
               .start()

Upvotes: 0

Related Questions