Eien1no1Yami
Eien1no1Yami

Reputation: 11

Read JMS message as json and not text by using Kafka ActiveMQ Source Connector

I've been using Kafka Connect for the last couple of months and recently I included the ActiveMQ source plugin in order to read some JMS topic messages that include a json file inside, put them in a kafka topic and then create a stream/table in Ksqldb that uses as columns some of the keys the json file has. The thing is though that the plugin inserts the JMS message as text with double quotes so it's not recognized properly in Ksqldb. I tried various things in configuration in order to fix it but nothing worked so far. I also want to use json formatting and not Avro in kafka connect (no schema registry running too). For testing purposes I also tried to send JMS messages by specifying the header content as "application/json" and still no luck.

Here's how my ActiveMQ plugin looks like

 "config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}

and here's how my Kafka connect configuration looks like

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1


config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/opt/kafka_2.13-2.5.0/plugins

Also here's a example of how Kafka consumes the messages

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": "{\"widget\": {     \"debug\": \"on\",    \"window\": {        \"title\": \"Sample Konfabulator Widget\",        \"name\": \"main_window\",        \"width\": 500,        \"height\": 500    },    \"image\": {        \"src\": \"Images/Sun.png\",        \"name\": \"sun1\",        \"hOffset\": 250,        \"vOffset\": 250,        \"alignment\": \"center\"    },    \"text\": {        \"data\": \"Click Here\",        \"size\": 36,        \"style\": \"bold\",        \"name\": \"text1\",        \"hOffset\": 250,        \"vOffset\": 100,        \"alignment\": \"center\",        \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}

If any other info is needed please let me know Any help would be much appreciated.

UPDATE: Ultimately the best solution would be to somehow be able to configure the connector to not escape the quotes in the payload. Also unfortunately the escaped quotes are generated from activeMQ itself and are not part of the initial message

So the message would look like this

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    }

}

Upvotes: 1

Views: 1393

Answers (1)

Andrew Coates
Andrew Coates

Reputation: 1893

Welcome Elen1no1Yami!

Looks to me like the issue is that the text field of the message is a string containing the JSON payload you're interested in, but that payload has its double-quotes escaped with a \ char.

I'm assuming the data in ActiveMQ itself does not have the \ char, but it would be good if you could clarify this.

The approaches I see to solving this are to either:

  1. be able to configure the connector to NOT escape the quotes in the payload. So that the message looks more like:
{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    },
    ... etc
}
  1. or somehow have ksqlDB handle the message as it is an still get access to the JSON within the text field.

Does that summarise what you're looking for? If so, please update your question to reflect this. (It's good to include such details in your question so that its clear what you're asking.

As for an answer...

  1. I'm no Connect expert, so can't really comment and can't really see anything in the details of the connector's config that may allow you to change the contents of text. Others that know more about Connect may be able to help more.

  2. To be able to access the embedded/escaped JSON in ksqlDB you would first need to remove the escaping. See below for ways to do this using ksqlDB

Using ksqlDB to access escaped JSON

Before we can access the JSON document in text we must remove the escaping.

I can think of two ways of the top of my head:

Write a custom UDF

The best way would be to write a custom UDF 'unescape_json` that could remove the escaping.

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
  SELECT MY_CUSTOM_UDF(message) FROM RAW;

If written correctly, the custom UDF approach would not suffer from the potential data corruption issues the REPLACE based solution suffers from.

Using REPLACE to remove escaping

NOTE: this solution is brittle: the character replacement can match and replace things it shouldn't, depending on the content of your message!

Let's work with more simple test data to explain what's needed, e.g we want to convert:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": 10}"
}

To:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": 10}
}

This requires three things:

  1. Replace opening "text": "{ with "text": {
  2. Replace all \" with ".
  3. Replace closing }" with }

We can use the REPLACE function to do this, or the REGEXP_REPLACE function:

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS 
  SELECT 
    REPLACE(
      REPLACE(
        REPLACE(message, 
          '"text": "{', '"text": {'), 
          '\"', '"'), 
          '"}', '}')
  FROM RAW;

Of course this solution suffers from potentially corrupting your data if your data contains any of the search terms: "text": "{, \" or "} anywhere else in your data, e.g.

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": \"hello \\\"} world\"}"
}

Would incorrectly be converted to

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": "hello \\}world"}
}

This is why a custom UDF would be preferable.

Once you've corrected the contents of your input (and written it to a new topic), then you can import your data as normal:

CREATE STREAM DATA (
   messageId STRING,
   text STRUCT<Widget INT>
 ) WITH (
   kafka_topic='JSONIFIED',
   value_format='JSON'
 );

Upvotes: 0

Related Questions