anonuser1234
anonuser1234

Reputation: 533

Deserialize Kafka json message with PySpark Streaming

I have a pyspark application that is consuming messages from a Kafka topic, these messages are serialized by org.apache.kafka.connect.json.JsonConverter. I'm using confluent Kafka JDBC connector to do this

The issue is, when I consume the messages, the ID column comes in some kind of encoded text such as "ARM=" when it should be a number type.

Here is the code I have now

spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)

kafka_params = {
    "bootstrap.servers": "kafkahost:9092",
    "group.id": "Deserialize"
}

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))

ssc.start()
ssc.awaitTermination()

I am aware the createDirectStream has a valueDecoder parameter I can set, the problem is I don't know how to use this for decoding. I am also aware of the schema before hand so I will be able to create one if need be.

For reference, this is the JSON I am getting when I print out rdd.foreach

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "bytes",
        "optional": False,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "0"
        },
        "field": "ID"
      },
      {
        "type": "string",
        "optional": True,
        "field": "COLUMN1"
      }
    ],
    "optional": False
  },
  "payload": {
    "ID": "AOo=",
    "COLUMN1": "some string"
  }
}

Upvotes: 1

Views: 2409

Answers (2)

anonuser1234
anonuser1234

Reputation: 533

So as cricket_007 mentioned, in your confluent Kafka configuration, you have to set the setting as thisvalue.converter.schema.enable=false. This will get rid of the Schema field and leave you with only the payload json. Now for some reason I had an issue where all my number columns would being encoding in this weird format AOo=. Now when using Json to serialized your data, confluent will convert your number columns using base64 but the real issue even before that. For some reason all my number columns were being converted into bytes. Not sure exactly why it's doing it but it has something to do with the way confluent handles Oracle Databases. Anyway the way to fix this is by setting a value decoder in your createDirectStream such as

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)

and in your decoder method, you have to decode your message from UTF-8, parse the json and then decode your number column from base64 and then from bytes like so

def decoder(s):
    if s is None:
        return None

    loaded_json = json.loads(s.decode('utf-8'))
    loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
    return loaded_json

Upvotes: 2

OneCricketeer
OneCricketeer

Reputation: 191728

In your Connect configuration, you can set value.converter.schema.enable=false, and then you would only get the "payload" datum of that JSON record.

From there, I assume you would be able to process the message according to any other example of reading streaming JSON in PySpark.

Otherwise, since you're not using Structured Streaming, there is no schema for you to define. Rather you would have to at least do something like so to just parse the records

rdd.map(lambda x: json.loads(x))\
    .map(lambda x: x['payload'])\
    .foreach(lambda x: print(x))

Upvotes: 1

Related Questions