salvalcantara
salvalcantara

Reputation: 480

Obtain Source<B> (Sink<B>) out of Source<A> (Sink<A>) and f:A->B (f:B->A)

In order to build sources & sinks on top of existing ones (as opposed to do it from scratch or with more boilerplate), I'd like to:

In Scala, this pattern is commonly used by JSON libraries such as Circe:

Does this pattern apply well to the case at hand (Flink sources & sinks)?

Otherwise what would be the recommended way of approaching this problem?

As a special instance of the problem, you might consider the following example:

The official docs already touch upon this here:

In particular:

If you want to develop a connector that needs to bridge with DataStream APIs (i.e. if you want to adapt a DataStream connector to the Table API), you need to add this dependency: "org.apache.flink:flink-table-api-java-bridge:1.16-SNAPSHOT"

Here you can see this in action within the kafka connector:

Finally, see also the original question posted in the User mailing list:

Upvotes: 1

Views: 359

Answers (1)

David Anderson
David Anderson

Reputation: 43707

Normally you don't really have to work that hard. If, for example, you have a DataStream<Customer> customerStream where Customer is a Flink POJO, it's enough to do

tableEnv.createTemporaryView("Customers", customerStream);

and then you have a Customers Table you can use like any other Table.

You can use toDataStream and toChangelogStream to convert tables to datastreams before sinking them with datastream sinks. If you need to, you can always chain a MapFunction in between to do additional conversion. For example, I once found myself doing this when I had to have the output in a very particular format:

Table hourlyMax = tableEnv.sqlQuery(...);

DataStream<Tuple3<Long, Long, Float>> resultsAsStreamOfTuples =
    tableEnv.toDataStream(hourlyMax)
        .map(row -> new Tuple3<>(
            row.<LocalDateTime>getFieldAs("window_end")
               .atZone(ZoneId.systemDefault())
               .toInstant()
               .toEpochMilli(),
            row.<Long>getFieldAs("driverId"),
            row.<Float>getFieldAs("sumOfTips")))
        .returns(Types.TUPLE(Types.LONG, Types.LONG, Types.FLOAT));

resultsAsStreamOfTuples.addSink(sink);

https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/ describes this in more detail and includes examples.

Of course, all of the above only applies to the special case of converting between streams and tables. But for the more general case, I would generally recommend using a chained MapFunction instead of creating a custom connector that wraps an existing connector.

Upvotes: 1

Related Questions