Reputation: 1020
I was surprised to find that there are no outer joins for DataStream
in Flink (DataStream docs).
For DataSet
you have all the options: leftOuterJoin
, rightOuterJoin
and fullOuterJoin
, apart from the regular join
(DataSet docs). But for DataStream
you just have the plain old join.
Is this due to some fundamental properties of the DataStream
that make it impossible to have outer joins? Or maybe we can expect this in the (close?) future?
I could really use an outer join on DataStream
for the problem I'm working on... Is there any way to achieve a similar behaviour?
Upvotes: 3
Views: 2223
Reputation: 11
Here is the complete working example of how to perform FULL OUTER JOIN on DataStreams using Flink Table API. See more details on DataStream API integration
under Table API on Flink official page.
Steps
1. Convert both dataStreams into tables.
2. Registers tables as views to execute SQL query.
3. Execute full outer join SQL query on registered views.
4. Result of the SQL query will be a table.
5. Convert the result table to dataStream using toDataStream.
/**
* Two input data streams
*
* <p>@<code>
* 1. "Alice", "Bob", "John"
* 2. "Mike", "Sam", "Adam", "Alice"
* <p>
* The expected full outer join result is
* (John), (null)
* (Bob), (null)
* (Alice), (Alice)
* (null), (Mike)
* (null), (Sam)
* (null), (Adam)
* </code>
*/
public class DataStreamFullOuterJoinUsingTable {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
// Convert stream into a table and register it as a view object to execute SQL query.
DataStream<String> nameStream = streamEnv.fromElements("Alice", "Bob", "John");
Table nameTable = tableEnv.fromDataStream(nameStream).as("name");
tableEnv.createTemporaryView("nameTable", nameTable);
// Convert stream into a table and register it as a view object to execute SQL query.
DataStream<String> detailStream = streamEnv.fromElements("Mike", "Sam", "Adam", "Alice");
Table detailTable = tableEnv.fromDataStream(detailStream).as("detail");
tableEnv.createTemporaryView("detailTable", detailTable);
// Execute SQL full outer join SQL query and the result will be a table
Table result =
tableEnv.sqlQuery(
"SELECT * FROM "
+ "nameTable FULL OUTER JOIN detailTable "
+ "ON nameTable.name = detailTable.detail");
// Convert the result table to a dataStream and map the Row objects to String using map
DataStream<String> resultStream =
tableEnv
.toDataStream(result)
.map(
(MapFunction<Row, String>)
row -> "(" + row.getField(0) + "), (" + row.getField(1) + ")");
// print as a sink
resultStream.print();
/*
(John), (null)
(Bob), (null)
(Alice), (Alice)
(null), (Mike)
(null), (Sam)
(null), (Adam)
*/
streamEnv.execute();
}
}
Along with flink-streaming-java, you will need 3 below dependencies (provided scope) to get DataStream and Table to work together.
1. flink-table-api-java
2. flink-table-api-java-bridge
3. flink-table-planner_2.12
Once you know how to convert dataStream to a Table, execute an SQL query, and then convert it back to a dataStream, you can solve many similar problems.
Upvotes: 0
Reputation: 539
One way would be to go from a stream -> table -> stream, using the following api: FLINK TABLE API - OUTER JOIN
Here is a java example:
DataStream<String> data = env.readTextFile( ... );
DataStream<String> data2Merge = env.readTextFile( ... );
...
tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");
String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";
Table tableLeft = tableEnv.sqlQuery(queryLeft);
Table tableRight = tableEnv.sqlQuery(queryRight);
Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);
Upvotes: 0
Reputation: 18987
You can implement outer joins using the DataStream.coGroup()
transformation. A CoGroupFunction
receives two iterators (one for each input), which serve all elements of a certain key and which may be empty if no matching element is found. This allows to implement outer join functionality.
First-class support for outer joins might be added to the DataStream API in one of the next releases of Flink. I am not aware of any such efforts at the moment. However, creating an issue in the Apache Flink JIRA could help.
Upvotes: 1