Reputation: 533
I'm trying to KafkaSink my DataStream and for that I'm using the below code:
KafkaSink<Events> sink = KafkaSink.<Events>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_KAFKA_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
Here, SimpleStringSchema() is not suitable as I'm returning a POJO of Events. Here is the POJO I've been using.
public Events(Date windowStart, Date windowEnd,
String metric, String eventId,
long count) {
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.metric = metric;
this.eventId = eventId;
this.count = count;
}
public Date getWindowStart() {
return windowStart;
}
public void setWindowStart(Date windowStart) {
this.windowStart = windowStart;
}
public Date getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(Date windowEnd) {
this.windowEnd = windowEnd;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public String getEventId() {
return eventId;
}
public void setEventId(String eventId) {
this.eventId = eventId;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public String toString() {
StringBuilder sb = new StringBuilder("Events{");
sb.append("windowStart=").append(windowStart);
sb.append(", windowEnd=").append(windowEnd);
sb.append(", metric=").append(metric);
sb.append(", eventId=").append(eventId);
sb.append(", count=").append(count);
sb.append("}");
return sb.toString();
}
For the POJO, I'm not able to come up with the SerializationSchema, that can be used here. I tried the below:
public class EventsSerializationSchema implements DeserializationSchema<Events>, SerializationSchema<Events> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;
public EventsSerializationSchema(){
}
public EventsSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
final Events events, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(events));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + events, e);
}
}
}
But, this is not working as I'm not sure on how to serialize this. Can someone please help on this? P.S: As I'm using Flink 1.14, FlinkKafkaPublisher is deprecated in this version.
Thanks in Advance
Upvotes: 2
Views: 2038
Reputation: 43717
EventsSerializationSchema
is implementing the wrong interface. You want to implement either SerializationSchema
or KafkaSerializationSchema
, depending on whether you'd rather implement
byte[] serialize(T element)
or
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)
.
See KafkaProducerJob.java and UsageRecordSerializationSchema.java for an example.
Upvotes: 2