1

Apache Flink (1.3.1) を使用するのは初めてで、質問があります。より詳細には、flink-core、flink-cep、および flink-streaming ライブラリを使用しています。私のアプリケーションは、RabbitMQ からのメッセージを消費する Akka ActorSystem であり、さまざまなアクターがこのメッセージを処理します。StreamExecutionEnvironment一部のアクターでは、Flink からをインスタンス化し、着信メッセージを処理したいと考えています。したがって、クラスを拡張するカスタム ソース クラスを作成しましたRichSourceFunction。Flink 拡張機能にデータを送信する方法がわかりません。これが私のセットアップです:

public class FlinkExtension {

    private static StreamExecutionEnvironment environment;
    private DataStream<ValueEvent> input;
    private CustomSourceFunction function;

    public FlinkExtension(){

        environment = StreamExecutionEnvironment.getExecutionEnvironment();

        function = new CustomSourceFunction();
        input = environment.addSource(function);

        PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());

        DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
            @Override
            public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
                return null; //TODO
            }
        });

        warnings.print();

        try {
            environment.execute();
        } catch(Exception e){
            e.printStackTrace();
        }

    }

    private Pattern<ValueEvent, ?> _pattern(){

        return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
            @Override
            public boolean filter(ValueEvent value) throws Exception {
                return value.getValue() > 10;
            }
        });
    }

    public void sendData(ValueEvent value){
        function.sendData(value);
    }
}

そして、これは私のカスタムソース関数です:

public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {

    private volatile boolean run = false;
    private SourceContext<ValueEvent> context;

    @Override
    public void open(Configuration parameters){
        run = true;
    }

    @Override
    public void run(SourceContext<ValueEvent> ctx) throws Exception {
        this.context = ctx;

        while (run){

        }
    }

    public void sendData(ValueEvent value){
        this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
    }

    @Override
    public void cancel() {
        run = false;
    }
}

sendDataそのため、FlinkExtensionクラス内のメソッドを外部から呼び出して、データを連続的に書き込みたいと考えていFlinkExtensionます。これが私のJUnitテストで、データを拡張機能に送信してから、データをに書き込む必要がありSourceContextます。

@Test
public void testSendData(){
    FlinkExtension extension = new FlinkExtension();
    extension.sendData(new ValueEvent(30));
}

しかし、テストを実行しても何も起こらず、アプリケーションは の run メソッドでハングしますCustomSourceFunctionCustomSourceFunctionまた、 run メソッドで新しいエンドレス スレッドを作成しようとしました。

要約すると: アプリケーションから Flink インスタンスに継続的にデータを書き込む方法を知っている人はいますか?

4

2 に答える 2

0

問題は、メソッドとメソッドCustomSourceFunctionによって使用されるオブジェクトのインスタンスが異なることです。したがって、オブジェクトはメソッド間で共有されておらず、新しいオブジェクトを追加しても機能しません。runsendDatacontextValueEvent

これを修正するには、runメソッドで使用されるオブジェクト インスタンスをクラスの静的メンバー変数として格納しますCustomSourceFunction。新しい を作成する必要がある場合は、この同じオブジェクト インスタンスValueEventでメソッドを呼び出します。sendData

以下のサンプルコードを参照してください

package RuleSources;

import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.ArrayList;

public class DynamicRuleSource extends AlertingRuleSource {
    private static DynamicRuleSource sourceObj;

    private SourceContext<Rule> ctx;

    public static DynamicRuleSource getSourceObject() {
        return sourceObj;
    }

    public void run(SourceContext<Rule> ctx) throws Exception {
        this.ctx = ctx;
        sourceObj = this;
        while(true) {
            Thread.sleep(100);
        }
    }

    public void addRule(Rule rule) {
        ctx.collect(rule);
    }

    @Override
    public void cancel() {
    }
}

新しいルールを追加するには

 public static void addRule(Rule rule) throws Exception {
        AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
        sourceObject.addRule(rule);
    }
于 2018-11-13T05:05:55.333 に答える