Reputation: 11
I've been using Kafka Connect for the last couple of months and recently I included the ActiveMQ source plugin in order to read some JMS topic messages that include a json file inside, put them in a kafka topic and then create a stream/table in Ksqldb that uses as columns some of the keys the json file has. The thing is though that the plugin inserts the JMS message as text with double quotes so it's not recognized properly in Ksqldb. I tried various things in configuration in order to fix it but nothing worked so far. I also want to use json formatting and not Avro in kafka connect (no schema registry running too). For testing purposes I also tried to send JMS messages by specifying the header content as "application/json" and still no luck.
Here's how my ActiveMQ plugin looks like
"config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}
and here's how my Kafka connect configuration looks like
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka_2.13-2.5.0/plugins
Also here's a example of how Kafka consumes the messages
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": { \"debug\": \"on\", \"window\": { \"title\": \"Sample Konfabulator Widget\", \"name\": \"main_window\", \"width\": 500, \"height\": 500 }, \"image\": { \"src\": \"Images/Sun.png\", \"name\": \"sun1\", \"hOffset\": 250, \"vOffset\": 250, \"alignment\": \"center\" }, \"text\": { \"data\": \"Click Here\", \"size\": 36, \"style\": \"bold\", \"name\": \"text1\", \"hOffset\": 250, \"vOffset\": 100, \"alignment\": \"center\", \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}
If any other info is needed please let me know Any help would be much appreciated.
UPDATE: Ultimately the best solution would be to somehow be able to configure the connector to not escape the quotes in the payload. Also unfortunately the escaped quotes are generated from activeMQ itself and are not part of the initial message
So the message would look like this
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {
"widget": {
"debug": "on",
"window": {
"title": "Sample Konfabulator Widget",
"name": "main_window",
"width": 500,
"height": 500
},
"image": {
"src": "Images/Sun.png",
"name": "sun1",
"hOffset": 250,
"vOffset": 250,
"alignment":
"center"
}
}
Upvotes: 1
Views: 1393
Reputation: 1893
Welcome Elen1no1Yami!
Looks to me like the issue is that the text
field of the message is a string containing the JSON payload you're interested in, but that payload has its double-quotes escaped with a \
char.
I'm assuming the data in ActiveMQ itself does not have the \
char, but it would be good if you could clarify this.
The approaches I see to solving this are to either:
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {
"widget": {
"debug": "on",
"window": {
"title": "Sample Konfabulator Widget",
"name": "main_window",
"width": 500,
"height": 500
},
"image": {
"src": "Images/Sun.png",
"name": "sun1",
"hOffset": 250,
"vOffset": 250,
"alignment":
"center"
},
... etc
}
text
field.Does that summarise what you're looking for? If so, please update your question to reflect this. (It's good to include such details in your question so that its clear what you're asking.
As for an answer...
I'm no Connect expert, so can't really comment and can't really see anything in the details of the connector's config that may allow you to change the contents of text
. Others that know more about Connect may be able to help more.
To be able to access the embedded/escaped JSON in ksqlDB you would first need to remove the escaping. See below for ways to do this using ksqlDB
Before we can access the JSON document in text
we must remove the escaping.
I can think of two ways of the top of my head:
The best way would be to write a custom UDF 'unescape_json` that could remove the escaping.
-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
message STRING
) WITH (
KAFKA_TOPIC=<something>,
VALUE_FORMAT='KAFKA'
);
-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
SELECT MY_CUSTOM_UDF(message) FROM RAW;
If written correctly, the custom UDF approach would not suffer from the potential data corruption issues the REPLACE
based solution suffers from.
REPLACE
to remove escapingNOTE: this solution is brittle: the character replacement can match and replace things it shouldn't, depending on the content of your message!
Let's work with more simple test data to explain what's needed, e.g we want to convert:
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": 10}"
}
To:
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {"widget": 10}
}
This requires three things:
"text": "{
with "text": {
\"
with "
.}"
with }
We can use the REPLACE function to do this, or the REGEXP_REPLACE function:
-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
message STRING
) WITH (
KAFKA_TOPIC=<something>,
VALUE_FORMAT='KAFKA'
);
-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS
SELECT
REPLACE(
REPLACE(
REPLACE(message,
'"text": "{', '"text": {'),
'\"', '"'),
'"}', '}')
FROM RAW;
Of course this solution suffers from potentially corrupting your data if your data contains any of the search terms: "text": "{
, \"
or "}
anywhere else in your data, e.g.
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": \"hello \\\"} world\"}"
}
Would incorrectly be converted to
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {"widget": "hello \\}world"}
}
This is why a custom UDF would be preferable.
Once you've corrected the contents of your input (and written it to a new topic), then you can import your data as normal:
CREATE STREAM DATA (
messageId STRING,
text STRUCT<Widget INT>
) WITH (
kafka_topic='JSONIFIED',
value_format='JSON'
);
Upvotes: 0