Reputation: 5031
i'm trying to read stream from Kafka topic using spark streaming/python, I can read the message and dump it to a bronze table with default kafka message schema, but i cannot cast the key and values from binary to string, I've tried the following approach, none of them worked:
approach 1:
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
.option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
.option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
.option("kafka.ssl.keystore.type", "JKS") \
.option("kafka.ssl.truststore.type", "JKS") \
.option("failOnDataLoss", "false") \
.load()).selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
@dlt.table(
comment="the raw message from kafa topic",
table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
return raw_kafka_events
error:
Failed to merge fields 'key' and 'key'. Failed to merge incompatible data types BinaryType and StringType
approach 2:
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
.option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
.option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
.option("kafka.ssl.keystore.type", "JKS") \
.option("kafka.ssl.truststore.type", "JKS") \
.option("failOnDataLoss", "false") \
.load())
raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
@dlt.table(
comment="the raw message from kafa topic",
table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
return raw_kafka_events
No error message, but later when i checked the table kafka_bronze, it showed the column key and value are still binary format
approach 3: added kafka_silver table:
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
.option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
.option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
.option("kafka.ssl.keystore.type", "JKS") \
.option("kafka.ssl.truststore.type", "JKS") \
.option("failOnDataLoss", "false") \
.load())
@dlt.table(
comment="the raw message from kafa topic",
table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
return raw_kafka_events
@dlt.table(comment="real schema for kafka payload",
temporary=False)
def kafka_silver():
return (
# kafka streams are (timestamp,value)
# value contains the kafka payload
dlt.read_stream("kafka_bronze")
.select(col("key").cast("string"))
.select(col("value").cast("string"))
)
error:
Column 'value' does not exist.
How can I cast the key/value to string after reading them from kafka topic? i'd prefer to dump the string valued key/value to bronze table, but it's impossible, i can dump them to silver table too
Upvotes: 1
Views: 745
Reputation: 87069
First, it's recommended to define that raw_kafka_events
variable inside the function, so it will be local to that function.
In the second approach your problem is that you just do raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
without assingning it to the variable, like this: raw_kafka_events = raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.
Second problem is that when you use expressions like CAST(key AS STRING)
, then fields get a new name, matching this expression. Change it to CAST(key AS STRING) as key
, and CAST(value AS STRING) as value
- this should fix the 1st problem.
In the second approach, you have a select statements chained:
def kafka_silver():
return (
# kafka streams are (timestamp,value)
# value contains the kafka payload
dlt.read_stream("kafka_bronze")
.select(col("key").cast("string"))
.select(col("value").cast("string"))
)
but after your first select you will get a dataframe only with one column - key
. You need to change code to:
dlt.read_stream("kafka_bronze") \
.select(col("key").cast("string").alias("key"),
col("value").cast("string").alias("value"))
Upvotes: 1