0
Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin( "start" )            
            .next( "3" ).where( new FilterFunction< Tuple3< String, String, String > >() {
                @Override
                public boolean filter ( Tuple3< String, String, String > value ) throws Exception {
                    return value.f2.equals( "3" );
                }
            } )
            .next( "4" ).subtype(Tuple.getTupleClass( 2 )).where( new FilterFunction< Tuple2< String, String> >() {
                @Override
                public boolean filter ( Tuple2< String, String > value ) throws Exception {
                    return value.f1.equals( "3" );
                }
            } )

subtype(Tuple.getTupleClass( 2 ))、およびエラーが発生しました Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'

これを変更する必要がありますか?しかし、どのように?Pattern< Tuple3< String, String, String >, ? > pattern


2017年までに更新012

JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where< String >.EqualTo
        joinedStreams = someStream
        .join( otherStream )
        .where( value -> value.f1 )
        .equalTo( value -> value.f1 );

Pattern< Tuple, ? > pattern = Pattern.< Tuple > begin( "start" )
        .subtype( Tuple3.class )
        .where( evt -> evt.f2.equals( "3" ) )
        .next( "4" )
        .subtype( Tuple2.class )
        .where( evt -> evt.f1.equals( "3" ) )
        .within( Time.seconds( 10 ) );

PatternStream< ...> patternStream = CEP.pattern( joinedStreams, pattern );

私はこれを試しましたが、何を記入する必要はありませんPatternStream< ...>.助けを提供できる人に感謝します.

4

2 に答える 2

0

これはどうですか:

    Pattern<Tuple, ?> pattern = Pattern.<Tuple>begin("start")
            .subtype(Tuple3.class)
            .where(evt -> evt.f2.equals("3"))
            .next("4")
            .subtype(Tuple2.class)
            .where(evt -> evt.f1.equals("3"))
            .within(Time.seconds(10));
  1. begin の後に next を追加する必要はありません
  2. サブタイプの文字通りの意味に注意してください。tuple3 と tuple2 はタプルを拡張する必要があります。

2 つの異なるデータストリームを接続する場合。

DataStream<Tuple2> someStream = //...
DataStream<Tuple3> otherStream = //...

ConnectedStreams<Tuple2, Tuple3> connectedStreams = someStream.connect(otherStream);

次に、CoMap、CoFlatMap を使用して同じ型を取得できます。たとえば、Tuple2、Tuple3 を String に変換します: ConnectedStreams → DataStream

connectedStreams.flatMap(new CoFlatMapFunction<Tuple2, Tuple3, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(Tuple2.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(Tuple3.toString);
       }
   }
});

良いユースケースを紹介する便利なリンクを次に示します。

  1. Apache Flink による複合イベント処理 (CEP) の紹介
  2. 私が翻訳した中国語版
于 2017-01-10T03:21:05.793 に答える