Nitin Pandey
Nitin Pandey

Reputation: 719

Flink : DataStream to Table

Usecase: Read protobuf messages from Kafka, deserialize them, apply some transformation (flatten out some columns), and write to dynamodb.

Unfortunately, Kafka Flink Connector only supports - csv, json and avro formats. So, I had to use lower level APIs (datastream).

Problem: If I can create a table out of the datastream object, then I can accept a query to run on that table. It would make the transformation part seamless and generic. Is it possible to run a SQL query over datastream object?

Upvotes: 2

Views: 2093

Answers (1)

Dominik Wosiński
Dominik Wosiński

Reputation: 3864

If You have a DataStream of objects, then You can simply register given DataStream as Table using StreamTableEnvironment.

This would look more or less like below:

val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])

Then You should be able to query the dynamic table created from Your DataStream.

Upvotes: 1

Related Questions