ExtractField$Key doesn't work with MySqlCdcSource in Confluent Cloud

I'm creating the following connector:

CREATE
SOURCE CONNECTOR IF NOT EXISTS `myconn` WITH (
    "name" = 'myconn',
    "connector.class" = 'MySqlCdcSource',
    "tasks.max" = 1,

    --Database config--------------------------

    "database.hostname" = '${dbHost}',
    "database.port"....

    --Kafka config------------------------------

    "kafka.api.key" = '${kafkaApiKey}',
    "kafka.auth.mode"...

    --Connector behavior------------------------

    "output.data.format" = 'AVRO',
    "output.key.format" = 'STRING',
    "key.converter" = 'org.apache.kafka.connect.storage.StringConverter',
    "key.converter.schemas.enable" = true,
    "tombstones.on.delete" = true,
    "null.handling.mode" = 'keep',
    "include.schema.changes" = false,
    "table.include.list" = 'sandbox\.(xxx|yyy)',
    "errors.tolerance" = 'none',
    "errors.log.enable" = true,
    "errors.log.include.messages" = true,

    --Topics configuration----------------------

    "topic.creation.default.cleanup.policy" = 'compact',
    "topic.creation.default.min.insync.replicas"...

    --Predicates--------------------------------

    "predicates" = 'TopicDoestHaveIdField',
    "predicates.TopicDoestHaveIdField.type" = 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches',
    "predicates.TopicDoestHaveIdField.pattern" = 'myconn.sandbox\.(zzz|ooo)$',

    --Transforms--------------------------------

    "transforms" = 'extractKey',

    "transforms.extractKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
    "transforms.extractKey.field" = 'id',
    "transforms.extractKey.predicate" = 'TopicDoestHaveIdField',
    "transforms.extractKey.negate" = true
);

In local development I'm working with io.debezium.connector.mysql.MySqlConnector and the transformation works correctly, the problem is when I make de creation in Confluent Cloud (using MySqlCdcSource). It gives me the following error:

The field configured for org.apache.kafka.connect.transforms.ExtractField transform does not exist in the key or value of Kafka records. Please verify the record's key or value has the configured field.

The problem is that it doesn't find the field Id, but when I make a test without the transformation, I see this key in the topic: Struct(id=0000), so the field exists. I suppose it should be related with types or some configuration option is missed in my connector. Any ideas?

Upvotes: 0

Views: 359

Answers (1)

The problem was related with a wrong usage of a predicate. Take a look at the following part:

--Predicates--------------------------------

"predicates" = 'TopicDoestHaveIdField',
"predicates.TopicDoestHaveIdField.type" = 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches',
"predicates.TopicDoestHaveIdField.pattern" = 'myconn.sandbox\.(zzz|ooo)$',

--Transforms--------------------------------

"transforms" = 'extractKey',

"transforms.extractKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractKey.field" = 'id',
"transforms.extractKey.predicate" = 'TopicDoestHaveIdField',
"transforms.extractKey.negate" = true

As you can see negate is being used, so extractKey transforms includes all the tables that doesn't match with it. Despite that I was using "table.include.list", some topics where included additionally and they didn't have id field.

In my case the confusion was motivated because of the message in log:

The field configured for org.apache.kafka.connect.transforms.ExtractField transform does not exist in the key or value of Kafka records. Please verify the record's key or value has the configured field.

(No information about the involved topic) I was supposing that the message was talking about any of the both allowed tables ('zzz' or 'ooo') but in the end it was another third topic included.

Upvotes: 2

Related Questions