Reputation: 442
I want to publish events to Kafka using Kafka Connect JDBC Source Connector(Postgres)
I have an outbox table where i am storing the payload id and payload as bytes after serializing them using KafkaAvroSerializer
.
The object which is serialized is an avro generated SpecificRecord class, for e.g EmployeeCreatedEvent
The data type for the outbox table in postgres:
payload bytea,
payload_id bytea
I have written a custom SMT for the Kafka Connect transformer. The code deserializes the data, payload and payload_id to '''GenericData.Record'''
But i am getting the below error:
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record
My environment: Confluent 6.0.1
Config:
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter
The ConnectRecord value has 2 elements: the subject_id & subject and they are byte[]. I want to use Key=payload_id value=payload If i do:
final byte[] subjectId = (byte[]) values.get("subject_id");
final byte[] retrievedPayload = (byte[]) values.get("subject");
I get the Exception: DataException: Invalid type for STRUCT: class [B
I am fetching the schema from schema registry and converting to connectSchema before creating my new ConnectRecord.
record.newRecord("mytopic", record.kafkaPartition(), derivedKeySchema, values.get("subject_id"), derivedValueSchema, values.get("subject"), record.timestamp());
I retrieve the schema from the schema registry at the start and use it while creating the new Connect Record.
Full Stack Trace:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class [B
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:597)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:344)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
Can anyone please provide the solution? Also is it possible to convert a SpecificRecord object to JSON?, If so I can then store them as json instead of bytes in the outbox table.
Thanks.
Upvotes: 1
Views: 1426
Reputation: 191671
The data type for the outbox table in postgres:
I assume those columns are only for the record value? If not you need an ID+bytea for the record key. Maybe also the record timestamp, and you could add the topic name, too
Honestly, I see no value in pulling out the ID into a database column since it's part of the payload already and you likely can extract it using a database query if needed (I'm not sure the byte functions in Postgres, but seems like something that could be done)
In any case, you need to use ByteArrayConverter to get access to binary data then a Connect transform to get data in a Struct value that the JDBC connector expects
I have written a custom SMT for the Kafka Connect transformer. The code deserializes the data, payload and payload_id to '''GenericData.Record'''
Okay, well that defines the value of the connect record, but Connect will interpret this as just a byte array unless you also called AvroData.toConnectSchema
somewhere in your transform
Alternatively, here's some pseudocode for doing this with raw bytes
// class MyTransform<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R r) {
final Object value = r.value();
byte[] valueAsBytes = (byte[]) value;
ByteBuffer b = ByteBuffer.wrap(valueAsBytes);
b.get();
int id = b.getInt();
byte[] payload = b.slice().toArray();
// TODO: define the payload to forward.
// note: these are Connect API imports, not Avro types
Schema valueSchema; // {int id; bytes payload}
Struct updatedValue;
return r.newRecord(topic, r.kafkaPartition(),
r.keySchema(), r.key(),
valueSchema, updatedValue,
r.timestamp());
}
If you still get an error that like "Invalid type for STRUCT: class [B" then that's because the ByteArrayConverter also got applied after the transform somehow and you'd be better off just using a Kafka Streams job, consuming using Bytes serde, to manipulate the bytes, producing into actual Avro (or your preferred JSON) payloads, then use Connect as you normally would without any transforms
Upvotes: 0