Reputation: 533
I have a pyspark application that is consuming messages from a Kafka topic, these messages are serialized by org.apache.kafka.connect.json.JsonConverter
. I'm using confluent Kafka JDBC connector to do this
The issue is, when I consume the messages, the ID column comes in some kind of encoded text such as "ARM=" when it should be a number type.
Here is the code I have now
spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)
kafka_params = {
"bootstrap.servers": "kafkahost:9092",
"group.id": "Deserialize"
}
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))
ssc.start()
ssc.awaitTermination()
I am aware the createDirectStream has a valueDecoder parameter I can set, the problem is I don't know how to use this for decoding. I am also aware of the schema before hand so I will be able to create one if need be.
For reference, this is the JSON I am getting when I print out rdd.foreach
{
"schema": {
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": False,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0"
},
"field": "ID"
},
{
"type": "string",
"optional": True,
"field": "COLUMN1"
}
],
"optional": False
},
"payload": {
"ID": "AOo=",
"COLUMN1": "some string"
}
}
Upvotes: 1
Views: 2409
Reputation: 533
So as cricket_007 mentioned, in your confluent Kafka configuration, you have to set the setting as thisvalue.converter.schema.enable=false
. This will get rid of the Schema field and leave you with only the payload json. Now for some reason I had an issue where all my number columns would being encoding in this weird format AOo=
. Now when using Json to serialized your data, confluent will convert your number columns using base64 but the real issue even before that. For some reason all my number columns were being converted into bytes. Not sure exactly why it's doing it but it has something to do with the way confluent handles Oracle Databases. Anyway the way to fix this is by setting a value decoder in your createDirectStream
such as
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)
and in your decoder method, you have to decode your message from UTF-8, parse the json and then decode your number column from base64 and then from bytes like so
def decoder(s):
if s is None:
return None
loaded_json = json.loads(s.decode('utf-8'))
loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
return loaded_json
Upvotes: 2
Reputation: 191728
In your Connect configuration, you can set value.converter.schema.enable=false
, and then you would only get the "payload" datum of that JSON record.
From there, I assume you would be able to process the message according to any other example of reading streaming JSON in PySpark.
Otherwise, since you're not using Structured Streaming, there is no schema for you to define. Rather you would have to at least do something like so to just parse the records
rdd.map(lambda x: json.loads(x))\
.map(lambda x: x['payload'])\
.foreach(lambda x: print(x))
Upvotes: 1