belostoky
belostoky

Reputation: 974

Apache Flink - how to send and consume POJOs using AWS Kinesis

I want to consume POJOs arriving from Kinesis with Flink.
Is there any standard for how to correctly send and deserialize the messages?

Thanks

Upvotes: 3

Views: 651

Answers (1)

belostoky
belostoky

Reputation: 974

I resolved it with:

DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
        "my-stream",
        new POJODeserializationSchema(),
        kinesisConsumerConfig));

and

public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
    private ObjectMapper mapper;

    @Override
    public SamplePojo deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }

        SamplePojo retVal = mapper.readValue(message, SamplePojo.class);

        return retVal;
    }

    @Override
    public boolean isEndOfStream(SamplePojo nextElement) {
        return false;
    }
}

Upvotes: 2

Related Questions