Reputation: 480
In order to build sources & sinks on top of existing ones (as opposed to do it from scratch or with more boilerplate), I'd like to:
f:A->B
to a Source<A>
in order to get a Source<B>
(corresponds to map in FP circles)f:B->A
to a Sink<A>
in order to get a Sink<B>
(corresponds to contramap in FP circles)In Scala, this pattern is commonly used by JSON libraries such as Circe:
Does this pattern apply well to the case at hand (Flink sources & sinks)?
Otherwise what would be the recommended way of approaching this problem?
As a special instance of the problem, you might consider the following example:
RowData
as the data typeThe official docs already touch upon this here:
In particular:
If you want to develop a connector that needs to bridge with DataStream APIs (i.e. if you want to adapt a DataStream connector to the Table API), you need to add this dependency: "org.apache.flink:flink-table-api-java-bridge:1.16-SNAPSHOT"
Here you can see this in action within the kafka connector:
Finally, see also the original question posted in the User mailing list:
Upvotes: 1
Views: 359
Reputation: 43707
Normally you don't really have to work that hard. If, for example, you have a DataStream<Customer> customerStream
where Customer
is a Flink POJO, it's enough to do
tableEnv.createTemporaryView("Customers", customerStream);
and then you have a Customers Table you can use like any other Table.
You can use toDataStream
and toChangelogStream
to convert tables to datastreams before sinking them with datastream sinks. If you need to, you can always chain a MapFunction
in between to do additional conversion. For example, I once found myself doing this when I had to have the output in a very particular format:
Table hourlyMax = tableEnv.sqlQuery(...);
DataStream<Tuple3<Long, Long, Float>> resultsAsStreamOfTuples =
tableEnv.toDataStream(hourlyMax)
.map(row -> new Tuple3<>(
row.<LocalDateTime>getFieldAs("window_end")
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli(),
row.<Long>getFieldAs("driverId"),
row.<Float>getFieldAs("sumOfTips")))
.returns(Types.TUPLE(Types.LONG, Types.LONG, Types.FLOAT));
resultsAsStreamOfTuples.addSink(sink);
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/ describes this in more detail and includes examples.
Of course, all of the above only applies to the special case of converting between streams and tables. But for the more general case, I would generally recommend using a chained MapFunction
instead of creating a custom connector that wraps an existing connector.
Upvotes: 1