Reputation: 719
Usecase: Read protobuf messages from Kafka, deserialize them, apply some transformation (flatten out some columns), and write to dynamodb.
Unfortunately, Kafka Flink Connector only supports - csv, json and avro formats. So, I had to use lower level APIs (datastream).
Problem: If I can create a table out of the datastream object, then I can accept a query to run on that table. It would make the transformation part seamless and generic. Is it possible to run a SQL query over datastream object?
Upvotes: 2
Views: 2093
Reputation: 3864
If You have a DataStream
of objects, then You can simply register given DataStream
as Table using StreamTableEnvironment
.
This would look more or less like below:
val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])
Then You should be able to query the dynamic table created from Your DataStream.
Upvotes: 1