maxime G
maxime G

Reputation: 1771

Kafka connect transformation isn't applied

I working in kerberised server , with distributed kafka connect. the connector work well, just the transformation part is totally ignored. I have no Warn or Error or any info in logs about this problem.

My connector without transformation work well :

{
    "name": "hdfs-avro-sink-X",
    "config": {
                "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
                "tasks.max": "1",
                "topics": "Y",
                "hdfs.url": "HA_name",
                "topics.dir": "/data/path",
                "logs.dir": "/tmp/path",
                "flush.size": "8800",
                "rotate.interval.ms": "6000",
                "hive.integration": "true",
                "hive.database": "hive_db_name",
                "hive.metastore.uris": "thrift://url:9083",
                "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
                "schema.compatibility": "BACKWARD",
                "hive.conf.dir": "/usr/hdp/current/hive/target-conf/",
                "hadoop.conf.dir": "/usr/hdp/current/hadoop/target-conf/",
                "hdfs.authentication.kerberos": "true",
                "connect.hdfs.principal": "principal",
                "connect.hdfs.keytab": "/keytab/path",
                "ssl.truststore.location": "truststore.jks",
                "ssl.truststore.password": "password",
                "security.protocol": "SASL_SSL",
                "ssl.keystore.location": "keystore.jks",
                "ssl.keystore.password": "password",
                "ssl.key.password": "password"               
    }
}

with transformations, transformations is ignored :

{
    "name": "hdfs-avro-sink-X",
    "config": {
                "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
                ... same conf
                "ssl.key.password": "password",
                "transforms": "MaskField",
                "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
                "transforms.MaskField.fields": "ano_ass"
    }
}

i can type anything, it's ignored anyway :

{
    "name": "hdfs-avro-sink-X",
    "config": {
                "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
                ... same conf
                "ssl.key.password": "password",
                "transforms": "Masgfdgfdgfdhield",
                "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
                "transforms.MaskField.fields": "anfshdghgh_ass"
    }
}

ano_ass fields exist in my avro schema no problem in ranger audit

versions :

Upvotes: 3

Views: 1023

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32140

If you're running 0.10.1 then SMT don't exist yet :) Single Message Transform were added to Apache Kafka in version 0.10.2 with KIP-66, over 2.5 years ago.

You might want to consider running a more up to date release of Kafka, the latest version of which is 2.3.

It looks like the latest version of HDP is 3.1 which includes Apache Kafka 2.0. If you want the latest version of Kafka (2.3) then you can get it from kafka.apache.org, and also as part of Confluent Platform.

Upvotes: 3

Related Questions