Hazriq Ishak
Hazriq Ishak

Reputation: 335

How to stream data from Kafka topic to Delta table using Spark Structured Streaming

I'm trying to understand databricks delta and thinking to do a POC using Kafka. Basically the plan is to consume data from Kafka and insert it to the databricks delta table.

These are the steps that I did:

  1. Create a delta table on databricks.
%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
  1. Consume data from Kafka.
import org.apache.spark.sql.types._
    
val kafkaBrokers = "broker1:port,broker2:port,broker3:port"
val kafkaTopic = "kafkapoc"
    
val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 100)
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .table("hazriq_delta_trial2")

However, when I query the table, it is empty.

I can confirm that the data is coming. I verify it by seeing the spike in the graph when I produce a message to the Kafka topic.

incoming data

Am I missing something?

I need help on how I can insert the data that I get from Kafka into the table.

Upvotes: 7

Views: 4813

Answers (2)

Michael Heil
Michael Heil

Reputation: 18495

Below is a working example on how to read data from Kafka and stream it into a delta table. I was using Spark 3.0.1 and delta-core 0.7.0 (if you are on Spark 2.4 version you need to use 0.6.0).

Streaming data from Kafka into Delta table

val spark = SparkSession.builder()
  .appName("Kafka2Delta")
  .master("local[*]")
  .getOrCreate()

// in production this should be a more reliable location such as HDFS
val deltaPath = "file:///tmp/delta/table"

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .selectExpr("CAST(value AS STRING) as value")

val query: StreamingQuery = df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .start(deltaPath)

query.awaitTermination()

For testing, I have simply produced characters "a", "b", "c" and "d" as values into the Kafka topic. Obviously, you can build some more sophisticated Dataframes if the Kafka input data is e.g. a JSON string.

Checking data in Delta table

val table = spark.read
  .format("delta")
  .load(deltaPath)
  .createOrReplaceTempView("testTable")

spark.sql("SELECT * FROM testTable").show(false)

// result
+-----+
|value|
+-----+
|a    |
|b    |
|c    |
|d    |
+-----+

Files created in deltaPath

>/tmp/delta/table$ ll
total 44
drwxrwxr-x 3 x x 4096 Jan 11 17:12 ./
drwxrwxr-x 3 x x 4096 Jan 11 17:10 ../
drwxrwxr-x 2 x x 4096 Jan 11 17:12 _delta_log/
-rw-r--r-- 1 x x  414 Jan 11 17:12 part-00000-0a0ae7fb-2995-4da4-8284-1ab85899fe9c-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-0a0ae7fb-2995-4da4-8284-1ab85899fe9c-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  306 Jan 11 17:12 part-00000-37eb0bb2-cd27-42a4-9db3-b79cb046b638-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-37eb0bb2-cd27-42a4-9db3-b79cb046b638-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  414 Jan 11 17:12 part-00000-8d6b4236-1a12-4054-b016-3db7a007cbab-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-8d6b4236-1a12-4054-b016-3db7a007cbab-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  407 Jan 11 17:12 part-00000-d2612eaa-3f48-4708-bf90-31dd3d83f124-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-d2612eaa-3f48-4708-bf90-31dd3d83f124-c000.snappy.parquet.crc


Upvotes: 4

Eric Bellet
Eric Bellet

Reputation: 2045

1) Try to verify if you have access to Kafka from your Spark cluster, sometimes you need to allow the access from some ips in Kafka.

2) Try to change this .option("startingOffsets", "earliest") to this .option("startingOffsets", "latest")

3) Try also

val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .start("hazriq_delta_trial2")

Upvotes: 0

Related Questions