Omri. B
Omri. B

Reputation: 462

Kafka connect RabbitMQ unable to use insert field transform: Only Struct objects supported for [field insertion], found: [B

I'm trying to use the InsertField kafka connect transformation with rabbitmq connector. my configuration:

    "config": {
        "connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
        "confluent.topic.bootstrap.servers": "kafka:29092",
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.partitions": 1,
        "tasks.max": "2",
        "kafka.topic": "test",
        "rabbitmq.queue": "events",
        "rabbitmq.host": "rabbitmq",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "InsertField",
        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertField.static.field": "MessageSource",
        "transforms.InsertField.static.value": "Kafka Connect framework"
    }

I have also tried using BytesArrayConverter as the value. Using python, I send a message as follows:

        msg = json.dumps(body)
        self.channel.basic_publish(exchange="", routing_key="events", body=msg)

where using encode() to transform it into a byte array does not work as well. The exception I'm receiving is:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion], found: [B
    at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
    at org.apache.kafka.connect.transforms.InsertField.applyWithSchema(InsertField.java:162)
    at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:133)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more

I understand the error and thought that using JsonConverter will solve it, but I was wrong. I've also used "value.converter.schemas.enable" : "false" to no avail. Would appreciate any help. I don't mind sending the data in json form or bytes form, I just want a key:value pair to be added to the event. Thanks

Upvotes: 0

Views: 1428

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191864

As the error indicates, you can only insert fields into structs. To get a Struct from RabbitMQ String/Bytes schemas, you must chain a HoistField transform before InsertField one.

To get any Struct from JSONConverter, your JSON needs two top level fields named schema and payload, then connector needs

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"

https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

Alternatively, use Kafka headers for "source" information, rather than trying to inject into the value

Upvotes: 1

Related Questions