user468587
user468587

Reputation: 5031

Spark read stream from kafka using delta tables

i'm trying to read stream from Kafka topic using spark streaming/python, I can read the message and dump it to a bronze table with default kafka message schema, but i cannot cast the key and values from binary to string, I've tried the following approach, none of them worked:

approach 1:

raw_kafka_events = (spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
    .option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
    .option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
    .option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
    .option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
    .option("kafka.ssl.keystore.type", "JKS") \
    .option("kafka.ssl.truststore.type", "JKS") \
    .option("failOnDataLoss", "false") \
    .load()).selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

@dlt.table(
  comment="the raw message from kafa topic",
  table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
  return raw_kafka_events

error:

Failed to merge fields 'key' and 'key'. Failed to merge incompatible data types BinaryType and StringType

approach 2:

raw_kafka_events = (spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
    .option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
    .option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
    .option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
    .option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
    .option("kafka.ssl.keystore.type", "JKS") \
    .option("kafka.ssl.truststore.type", "JKS") \
    .option("failOnDataLoss", "false") \
    .load())
raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

@dlt.table(
  comment="the raw message from kafa topic",
  table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
  return raw_kafka_events

No error message, but later when i checked the table kafka_bronze, it showed the column key and value are still binary format

approach 3: added kafka_silver table:

raw_kafka_events = (spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
    .option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
    .option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
    .option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
    .option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
    .option("kafka.ssl.keystore.type", "JKS") \
    .option("kafka.ssl.truststore.type", "JKS") \
    .option("failOnDataLoss", "false") \
    .load())

@dlt.table(
  comment="the raw message from kafa topic",
  table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
  return raw_kafka_events

@dlt.table(comment="real schema for kafka payload",
          temporary=False)
def kafka_silver():
  return (
    # kafka streams are (timestamp,value)
    # value contains the kafka payload
        
    dlt.read_stream("kafka_bronze")
    .select(col("key").cast("string"))
    .select(col("value").cast("string")) 
)

error:

Column 'value' does not exist.

How can I cast the key/value to string after reading them from kafka topic? i'd prefer to dump the string valued key/value to bronze table, but it's impossible, i can dump them to silver table too

Upvotes: 1

Views: 745

Answers (1)

Alex Ott
Alex Ott

Reputation: 87069

First, it's recommended to define that raw_kafka_events variable inside the function, so it will be local to that function.

In the second approach your problem is that you just do raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") without assingning it to the variable, like this: raw_kafka_events = raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").

Second problem is that when you use expressions like CAST(key AS STRING), then fields get a new name, matching this expression. Change it to CAST(key AS STRING) as key, and CAST(value AS STRING) as value - this should fix the 1st problem.

In the second approach, you have a select statements chained:

def kafka_silver():
  return (
    # kafka streams are (timestamp,value)
    # value contains the kafka payload
        
    dlt.read_stream("kafka_bronze")
    .select(col("key").cast("string"))
    .select(col("value").cast("string")) 
)

but after your first select you will get a dataframe only with one column - key. You need to change code to:

dlt.read_stream("kafka_bronze") \
   .select(col("key").cast("string").alias("key"), 
           col("value").cast("string").alias("value"))

Upvotes: 1

Related Questions