Reputation: 11
Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin( "start" )
.next( "3" ).where( new FilterFunction< Tuple3< String, String, String > >() {
@Override
public boolean filter ( Tuple3< String, String, String > value ) throws Exception {
return value.f2.equals( "3" );
}
} )
.next( "4" ).subtype(Tuple.getTupleClass( 2 )).where( new FilterFunction< Tuple2< String, String> >() {
@Override
public boolean filter ( Tuple2< String, String > value ) throws Exception {
return value.f1.equals( "3" );
}
} )
subtype(Tuple.getTupleClass( 2 )),and occoured the error
Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'
should I modify this?but how?Pattern< Tuple3< String, String, String >, ? > pattern
update by 2017012
JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where< String >.EqualTo
joinedStreams = someStream
.join( otherStream )
.where( value -> value.f1 )
.equalTo( value -> value.f1 );
Pattern< Tuple, ? > pattern = Pattern.< Tuple > begin( "start" )
.subtype( Tuple3.class )
.where( evt -> evt.f2.equals( "3" ) )
.next( "4" )
.subtype( Tuple2.class )
.where( evt -> evt.f1.equals( "3" ) )
.within( Time.seconds( 10 ) );
PatternStream< ...> patternStream = CEP.pattern( joinedStreams, pattern );
I tried this, and don't what should I fill in PatternStream< ...>
.Thanks for anyone who can offer help.
Upvotes: 0
Views: 424
Reputation: 46
what about this:
Pattern<Tuple, ?> pattern = Pattern.<Tuple>begin("start")
.subtype(Tuple3.class)
.where(evt -> evt.f2.equals("3"))
.next("4")
.subtype(Tuple2.class)
.where(evt -> evt.f1.equals("3"))
.within(Time.seconds(10));
If you want to connect two different datastreams.
DataStream<Tuple2> someStream = //...
DataStream<Tuple3> otherStream = //...
ConnectedStreams<Tuple2, Tuple3> connectedStreams = someStream.connect(otherStream);
Then you can use CoMap, CoFlatMap to get the same type, for example transform Tuple2, Tuple3 to String: ConnectedStreams → DataStream
connectedStreams.flatMap(new CoFlatMapFunction<Tuple2, Tuple3, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(Tuple2.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(Tuple3.toString);
}
}
});
Here are some useful links, introducing a good use case:
Upvotes: 0
Reputation: 858
Try this code:
Pattern<Tuple, ?> pattern =
Pattern.<Tuple>begin("start")
.next("3")
.subtype(Tuple3.class)
.where(new FilterFunction<Tuple3>() {
@Override
public boolean filter(Tuple3 value) throws Exception {
return value.f2.equals("3");
}
})
.next("4")
.subtype(Tuple2.class)
.where(new FilterFunction<Tuple2>() {
@Override
public boolean filter(Tuple2 value) throws Exception {
return value.f1.equals("3");
}
});
Start with a common type Tuple
and use concrete types Tuple2
and Tuple3
for subevents. And a data stream for this pattern must have a Tuple
type.
Upvotes: 1