Reputation: 1197
So I am running a map function on a DataStream and within the map function I want to do a join of 2 separate DataSets. Just wondering if this is possible in Flink. I know that the map function itself runs as a separate task for separate partitions, so wondering if a distributed join is allowed inside a map function?
Upvotes: 0
Views: 847
Reputation: 711
If you are joining dataSet1 with dataSet2, and dataSet2 is not large. You can broadcast dataSet2 in map of dataSet1 by using withBroadcastSet operator. You can get the broadcasted dataSet2 in the map function by using getRuntimeContext().getBroadcastVariable. Then you can do the join by your self in the map function. To speed up the joining, you can transfer the data in dataSet2 into a map before broadcasting dataSet2. For example:
Map<Integer, String> testMap = new HashMap<>();
dateSet2 = flinkEnv.fromElements(testMap);
dateSet1.map(new TestRichMapper()).withBroadcastSet(dateSet2, "dateSet2");
In the RichMap Function, you can get dateSet2 and transfer it into a map as follows:
Map<Integer, String> testMap = getRuntimeContext().getBroadcastVariable("dateSet2").toArray()[0];
Upvotes: 1
Reputation: 1197
Ok, so turns out you cannot, as joining DataSets occur on a different context (ExecutionContext) than Stream processing (which happens on a StreamExecutionContext) and Flink does not allow operations with different execution context inside one another.
java.lang.IllegalArgumentException: The two inputs have different execution contexts.
at org.apache.flink.api.java.DataSet.checkSameExecutionContext(DataSet.java:1799)
at org.apache.flink.api.java.operators.TwoInputOperator.<init>(TwoInputOperator.java:42)
at org.apache.flink.api.java.operators.TwoInputUdfOperator.<init>(TwoInputUdfOperator.java:80)
at org.apache.flink.api.java.operators.CrossOperator.<init>(CrossOperator.java:90)
at org.apache.flink.api.java.operators.CrossOperator$DefaultCross.<init>(CrossOperator.java:150)
at org.apache.flink.api.java.DataSet.crossWithTiny(DataSet.java:1088)
at org.myorg.quickstart.MessageStreamProcessor$MessageProcessor.processElement(MessageStreamProcessor.java:138)
at org.myorg.quickstart.MessageStreamProcessor$MessageProcessor.processElement(MessageStreamProcessor.java:125)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
Upvotes: 1