Reputation: 423
What I did was to read in a message from kafka in json format. E.g.
{"a":1,"b":2}
Then I applied a filter to this message to make sure the value corresponding to a is 1, the value of b is 2. Finally, I want to output the result stream to a downstream kafka. However, I don't know why the compiler says type mismatch.
My code is as follows:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
The error I got is shown in the image below:
I refer to the flink kafka connector document to write the kafka outstream code: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
Upvotes: 0
Views: 532
Reputation: 1881
Adding to what @Dawid already pointed out, you can provide the serialization schema for the ObjectNode (Assuming it's a POJO, as I haven't tested it for other objects) as follows:
TypeInformation<ObjectNode> typeInfo =
TypeInformation.of(new TypeHint<ObjectNode>() {});
TypeInformationSerializationSchema<ObjectNode> serdeSchema =
new TypeInformationSerializationSchema<>(typeInfo, env.getConfig());
and then use serdeschema as follows for the KafkaPrducer sink:
FlinkKafkaProducer010<RecordReadEventType> kafkaSink =
new FlinkKafkaProducer010<>(
BOOTSTRAP_SERVERS,
"output-topic",
serdeSchema);
Hopefully, this would solve your issues with kafka sink conflicts.
Upvotes: 1
Reputation: 3422
You have a stream DataStream
of type ObjectNode
, so you need to provide FlinkKafkaProducer010[ObjectNode]
e.g:
stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties)
All generic types in java are invariant in type, that is why you cannot just pass FlinkKafkaProducer010[Object]
.
Another problem you may encounter further is that you also need to provide SerializationSchema[ObjectNode]
whereas the SimpleStringSchema
implements SerializationSchema[String]
.
Upvotes: 1