eedideyahoocom
eedideyahoocom

Reputation: 193

Why is Kafka connect jdbc sink connector reading null values from Kafka topic

I have been struggling with sinking data from my kafka topic to a mysql database table and here are the scenarios playing out.

First the data in the kafka topic is from another mysql database table (using debezium cdc) with the following config:

{
  "name": "smartdevsignupconnector111",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",  
    "database.hostname": "mysql1",  
    "database.port": "3306",
    "database.user": "clusterAdmin",
    "database.password": "**********",
    "database.server.id": "184055",  
    "database.server.name": "smartdevdbserver1",  
    "database.include.list": "signup_db",
    "database.history.kafka.bootstrap.servers": "kafka1:9092",  
    "database.history.kafka.topic": "schema-changes.signup_db",
    "schema.history.internal.kafka.topic": "KafkaSchemaHistoryTopic",
    "schema.history.internal.kafka.bootstrap.servers": "kafka1:9092",
    "table.whitelist": "signup_db.users",
    "column.blacklist": "signup_db.users.fullName, signup_db.users.address, signup_db.users.phoneNo, signup_db.users.gender, signup_db.users.userRole, signup_db.users.reason_for_inactive, signup_db.users.firstvisit, signup_db.users.last_changed_PW, signup_db.users.regDate",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "snapshot.mode": "when_needed",
    "topic.creation.enable": "true",
    "topic.prefix": "smartdevdbserver1",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1"
  }
}

and here is the data in the kafka topic:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":true,"field":"fullName"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phoneNo"},{"type":"string","optional":true,"field":"gender"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"visitor,student,Admin"},"default":"visitor","field":"userRole"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"visitor,TERMINATED,SUSPENDED_FOR_VIOLATION"},"default":"visitor","field":"reason_for_inactive"},{"type":"string","optional":true,"field":"firstvisit"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"last_changed_PW"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"default":0,"field":"regDate"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":22,"email":"[email protected]","password":"$2a$10$qj8AEMZMqRKV.68b0uwFZu0Zx14Pk77/HhmSn/bNmirjuYOwVmhse","fullName":null,"address":null,"phoneNo":null,"gender":null,"userRole":"visitor","User_status":"INACTIVE","reason_for_inactive":"visitor","firstvisit":null,"last_changed_PW":"2022-11-05T20:57:44Z","regDate":1667681864000,"auth_token":null}}

Now this is the jdbc sink connector:

{
    "name": "resetpassword-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true", 
        "topics": "smartdevdbserver1.signup_db.users",
        "connection.url": "jdbc:mysql://rpwd_mysql:3306/rpwd_db",
        "connection.user": "rpwd_user",
        "connection.password": "**********",
        "table.name.format": "rpwd_db.users",
        "fields.whitelist": "rpwd_db.users.id,rpwd_db.users.email,rpwd_db.users.password,rpwd_db.users.User_status,rpwd_db.users.auth_token",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

but when i execute the jdbc-sink-connector, i get the error:

java.sql.BatchUpdateException: Field 'email' doesn't have a default value

and the logs look like this:

connect           | 2022-11-06 10:23:26,357 INFO   ||  Attempting to open connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
connect           | 2022-11-06 10:23:50,563 INFO   ||  JdbcDbWriter Connected   [io.confluent.connect.jdbc.sink.JdbcDbWriter]
connect           | 2022-11-06 10:23:53,616 INFO   ||  Checking MySql dialect for existence of TABLE "rpwd_db"."users"   [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
connect           | 2022-11-06 10:23:54,966 INFO   ||  Using MySql dialect TABLE "rpwd_db"."users" present   [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
connect           | 2022-11-06 10:23:55,597 INFO   ||  Checking MySql dialect for type of TABLE "rpwd_db"."users"   [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
connect           | 2022-11-06 10:23:55,640 INFO   ||  Setting metadata for table "rpwd_db"."users" to Table{name='"rpwd_db"."users"', type=TABLE columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'User_status', isPrimaryKey=false, allowsNull=false, sqlType=ENUM}, Column{'auth_token', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'password', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}]}   [io.confluent.connect.jdbc.util.TableDefinitions]
connect           | 2022-11-06 10:23:56,355 WARN   ||  Write of 4 records failed, remainingRetries=10   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
connect           | java.sql.BatchUpdateException: Field 'email' doesn't have a default value
connect           |     at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
connect           |     at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
connect           |     at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
connect           |     at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
connect           |     at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
connect           |     at com.mysql.cj.util.Util.getInstance(Util.java:167)
connect           |     at com.mysql.cj.util.Util.getInstance(Util.java:174)
connect           |     at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
connect           |     at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
connect           |     at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
connect           |     at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
connect           |     at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
connect           |     at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186)
connect           |     at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
connect           |     at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
connect           |     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
connect           |     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
connect           |     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
connect           |     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
connect           |     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
connect           |     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
connect           |     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect           |     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect           |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect           |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect           |     at java.base/java.lang.Thread.run(Thread.java:829)
connect           | Caused by: java.sql.SQLException: Field 'email' doesn't have a default value
connect           |     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
connect           |     at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
connect           |     at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
connect           |     at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1061)
connect           |     at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:795)
connect           |     ... 17 more
connect           | 2022-11-06 10:23:56,589 INFO   ||  Closing connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]

Note that i created the table (users) where the sink connector needs to insert its data in the database by my self.

Again, even if i remove the email field, the next field which is password reports the same error and so on.

If i decide to use auto.create (by removing the pre-created users table) instead, the jdbc-sink-connector creates the table but only one field (i.e the primary key column - id) is created and populated and the rest of the field is ignored.

mysql> select * from users;
+----+
| id |
+----+
|  1 |
|  8 |
| 15 |
| 22 |
| 29 |
+----+
5 rows in set (0.02 sec)

below is the logs for auto.create:

connect           | 2022-11-09 09:56:01,676 INFO   ||  Initializing writer using SQL dialect: MySqlDatabaseDialect   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
connect           | 2022-11-09 09:56:01,691 INFO   ||  WorkerSinkTask{id=resetpassword-sink-connector-0} Sink task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect           | 2022-11-09 09:56:01,705 INFO   ||  WorkerSinkTask{id=resetpassword-sink-connector-0} Executing sink task   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect           | 2022-11-09 09:56:01,755 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Resetting the last seen epoch of partition users-0 to 0 since the associated topicId changed from null to hjdRgZ2PTgWqZxW53PaAfg   [org.apache.kafka.clients.Metadata]
connect           | 2022-11-09 09:56:01,756 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Cluster ID: 2vOSZymlQ42roGNU_nYGFg   [org.apache.kafka.clients.Metadata]
connect           | 2022-11-09 09:56:01,765 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Discovered group coordinator kafka1:9092 (id: 2147483646 rack: null)   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:01,770 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:01,938 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Request joining group due to: need to re-join with the given member-id: connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]        
kafka1            | 2022-11-09 09:56:01,956 - INFO  [data-plane-kafka-request-handler-4:Logging@66] - [GroupCoordinator 1]: Preparing to rebalance group connect-resetpassword-sink-connector in state PreparingRebalance with old generation 0 (__consumer_offsets-21) (reason: Adding new member connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException)
connect           | 2022-11-09 09:56:01,941 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:01,941 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
kafka1            | 2022-11-09 09:56:02,150 - INFO  [executor-Rebalance:Logging@66] - [GroupCoordinator 1]: Stabilized group connect-resetpassword-sink-connector generation 1 (__consumer_offsets-21) with 1 members
connect           | 2022-11-09 09:56:02,166 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692', protocol='range'}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:02,196 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Finished assignment for group at generation 1: {connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692=Assignment(partitions=[users-0])}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
kafka1            | 2022-11-09 09:56:02,239 - INFO  [data-plane-kafka-request-handler-5:Logging@66] - [GroupCoordinator 1]: Assignment received from leader connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692 for group connect-resetpassword-sink-connector for generation 1. The group has 1 members, 0 of which are static.
connect           | 2022-11-09 09:56:02,254 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692', protocol='range'}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:02,255 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Notifying assignor about the new Assignment(partitions=[users-0])   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:02,255 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Adding newly assigned partitions: users-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:02,414 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Found no committed offset for partition users-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect           | 2022-11-09 09:56:02,433 INFO   ||  [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Resetting offset for partition users-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1:9092 (id: 1 rack: null)], epoch=0}}.   [org.apache.kafka.clients.consumer.internals.SubscriptionState]
connect           | 2022-11-09 09:56:02,574 INFO   ||  Attempting to open connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
connect           | 2022-11-09 09:56:05,960 INFO   ||  JdbcDbWriter Connected   [io.confluent.connect.jdbc.sink.JdbcDbWriter]
connect           | 2022-11-09 09:56:06,168 INFO   ||  Checking MySql dialect for existence of table "rpwd_db"."users"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect           | 2022-11-09 09:56:06,583 INFO   ||  Using MySql dialect table "rpwd_db"."users" present   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect           | 2022-11-09 09:56:06,768 INFO   ||  Setting metadata for table "rpwd_db"."users" to Table{name='"rpwd_db"."users"', columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}]}   [io.confluent.connect.jdbc.util.TableDefinitions]

So either ways, the sink connector seems to be dealing with the primary key column only as it is evidence in the last part of the logs: "Setting metadata for table", which ends up setting metadata only for the column "id", while ignoring the other 4 columns.

Setting metadata for table "rpwd_db"."users" to Table{name='"rpwd_db"."users"', columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}]}   [io.confluent.connect.jdbc.util.TableDefinitions]

I know that there is something i am missing, can someone help me to walk through what i am missing.

Upvotes: 0

Views: 2219

Answers (1)

eedideyahoocom
eedideyahoocom

Reputation: 193

The problem was with this line:

"fields.whitelist": "rpwd_db.users.id,rpwd_db.users.email,rpwd_db.users.password,rpwd_db.users.User_status,rpwd_db.users.auth_token",

I remove all the prefix before actual column names and left it like this:

"fields.whitelist": "id,email,password,User_status,auth_token",

I also removed the prefix in table.name.format from:

"table.name.format": "rpwd_db.users"

to:

"table.name.format": "users",

Upvotes: 0

Related Questions