Reputation: 41
I'm trying to read data from event hub to Databricks and want to give it a structure as col1, col2 etc.
Issue- I see that only the 1st record is coming in a proper structure and doesn't load the rest of the data.
body 24,5300,123456,1,PLAN-QD,PMT,10/09/15 00:00,1253323,INTEREST,LOAN-AS,NULL 32,1300,12458,2,PLAN,PMT,25/09/15 00:00,12532123,INTEREST,LOAN,NULL 36,1400,19458,25,PLAN,PMTS,25/11/15 00:00,92532163,INTEREST,LOAN-DS,NULL
The headers of the columns are ( not present in the evnt hub and just for reference but in output to the table, they should be there)-> id,Bal,accnum,active,plan,Status,DateTime,Type,Loan,Where
My code is as below-
import org.apache.spark.eventhubs._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
val connectionString = ConnectionStringBuilder("my connection string").setEventHubName("oth-transactions").build //this connection string to read from eventhub
val customEventhubParameters = EventHubsConf(connectionString)
val ConsumerDF = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).option("checkpointLocation", "/tmp/checkpoint").load()
val OTHDF = ConsumerDF.select($"body" cast "string")
val OTHDF2 = OTHDF.withColumn("temp", split(col("body"), "\,")).select(
(0 until 52).map(i => col("temp").getItem(i).as(s"col$i")): _*
)
OTHDF2.printSchema
OTHDF2.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("Table_Name")
May I please get some advice around how I can read all these records and lod it in a table in data bricks?
Thanks in advance!!
Upvotes: 1
Views: 1286
Reputation: 397
You are splitting the body columns using "\," instead of "," please try it.
val OTHDF2 = OTHDF.withColumn("temp", split(col("body"), ","))
.select($"_tmp".getItem(0).as("id")
,$"_tmp".getItem(1).as("Bal")
,$"_tmp".getItem(2).as("accnum")
,$"_tmp".getItem(3).as("active")
,$"_tmp".getItem(4).as("plan")
,$"_tmp".getItem(5).as("Status")
,$"_tmp".getItem(6).cast("timestamp").as("DateTime")
,$"_tmp".getItem(7).as("Type")
,$"_tmp".getItem(8).as("Loan")
,$"_tmp".getItem(9).as("Where")
)
.drop("_tmp")
.writeStream
.format("csv")
.outputMode("append")
.option("checkpointLocation", "/FileStore/checkpointLocation.csv")
.option("path", "/FileStore/data.csv")
.start()
Upvotes: 0