Reputation: 233
We are trying to use SQL-Server JDBC Connector with KafkaAvroSerializer and also providing Customized ProducerInterceptor to Encrypt data before sending it to Kafka.
On consumer side, we want to use Flink connector to Decrypt and then use the appropriate deserializer.
We got couple of question in order to achieve that:
1) If we provide Customized ConsumerInterceptor to decrypt data, then should that be passed in through Properties file when we create DataStream in Flink ?
Properties properties = new Properties();
...
`properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
...
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));
Is above configuration correct or do I need to set any other property so that I can pass in ConsumerInterceptor to Flink ?
2) Another question is about Deserializer in Flink. I looked it up for example on the web and found few code snippets like following:
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
So if we are passing data using JDBC Connector to Kafka without any modification (apart from encrypting data) then what's the Data Type should we provide during Deserialization ? We'll be decrypting data before deserialization.
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
Thanks in advance,
Upvotes: 0
Views: 406
Reputation: 233
Just adding the end result so that it can help anybody who is looking for the same:
public class ConfluentAvroDeserializationSchema
implements DeserializationSchema<GenericRecord> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private transient KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
identityMapCapacity) {
this.schemaRegistryUrl = schemaRegistryUrl;
this.identityMapCapacity = identityMapCapacity;
}
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
if (kafkaAvroDecoder == null) {
SchemaRegistryClient schemaRegistry = new
CachedSchemaRegistryClient(this.schemaRegistryUrl,
this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
}
@Override
public boolean isEndOfStream(GenericRecord string) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
}
Upvotes: 1