Reputation: 1809
We have 3 java pojos,
class Foo{
int id;
String name;
List<Bar1> list1;
List<Bar2> list2;
}
class Bar1{
int id;
String field_x;
String field_y;
}
class Bar2{
int id;
String field_a;
String field_b;
}
And we have 3 DataStreams in our Flink job,
class Test{
public static void main(...){
DataStream<Foo> ds1 = ...;
DataStream<Bar1> ds2 = ...;
DataStream<Bar2> ds3 = ...;
}
}
For each id, there will be only one Foo object, while Bar1 and Bar2 object could be multiple.
What we want to do is, for each Foo in ds1, find all Bar1 with the same id in ds2 and put them into list1, find all Bar2 with the same id in ds3 and put them into list2.
What is the best way to go?
Upvotes: 0
Views: 204
Reputation: 18987
Flink's DataStream operators support up to two input streams. There are two common ways to implement operations on three streams:
Bar1
and Bar2
are not related to each other. This would look roughly as follows:DataStream<Foo> withList1 = ds1
.connect(ds2).keyBy("id", "id")
.process(
// your processing logic
new CoProcessFunction<Foo, Bar1, Foo>(){...});
DataStream<Foo> withList1AndList2 = withList1
.connect(ds3).keyBy("id", "id")
.process(
// your processing logic
new CoProcessFunction<Foo, Bar2, Foo>(){...});
foo
, bar1
, and bar2
of which only one field is used and using an operator with a single input to process the unioned stream.// map Foo to CommonType
DataStream<CommonType> common1 = ds1.map(new MapFunction<Foo, CommonType>(){...});
// map Bar1 to CommonType
DataStream<CommonType> common2 = ds2.map(new MapFunction<Bar1, CommonType>(){...});
// map Bar2 to CommonType
DataStream<CommonType> common3 = ds3.map(new MapFunction<Bar2, CommonType>(){...});
DataStream<Foo> withList1AndList2 = ds1.union(ds2, ds3)
.keyBy("id")
.process(
// your processing logic
new KeyedProcessFunction<CommonType, Foo>(){...});
You can also just union ds2
and ds3
and use a binary operator.
The bigger problem might be to identify when all Bar1
and Bar2
events were received such that you can emit a result. Again, there a few options (depending on your use case).
Foo
knows for how many Bar1
and Bar2
it needs to wait, the solution is obvious.Foo
does not know for how many events to wait, you can try to send a notification that signals that the last Bar1
or Bar2
was sent.Bar1
or Bar2
should arrive within x seconds/minutes/etc.Upvotes: 1