1

必要なjarがMavenによって取得されたEclipse内からflinkを実行しています。私のマシンには 8 つのコアを備えたプロセッサがあり、ストリーミング アプリケーションはその入力から行を読み取り、いくつかの統計を計算する必要があります。

私のマシンでプログラムを実行したとき、flink が CPU のすべてのコアを適切にスレッド化されたコードとして使用することを期待していました。しかし、コアを見ると、コアが 1 つしか使用されていないことがわかります。私は多くのことを試しましたが、次のコードには最後の試み、つまり環境の並列処理の設定を残しています。ストリーム単体などにも設定してみました。

public class SemSeMi {


    public static void main(String[] args) throws Exception {
        System.out.println("Starting Main!");

        System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
                .getLocalFileSystem().getWorkingDirectory());

        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        env.setParallelism(8);

        env.socketTextStream("localhost", 9999).flatMap(new SplitterX());

        env.execute("Something");       
    }

    public static class SplitterX implements
            FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // Do Nothing!

        }
    }
} 

netcatを使用してプログラムにデータを供給しました:

 nc -lk 9999 < fileName

問題は、プログラムをローカルにスケーリングし、利用可能なすべてのコアを使用する方法です。

4

1 に答える 1

2

並列度を明示的に指定する必要はありません。デフォルト設定で実行されるジョブは、並列処理を使用可能なコア数に自動的に設定します。

1あなたの場合、ソケットからの読み取りは分散できないため、ソースは並列処理で実行されます。ただし、flatMap操作のために、システムは 8 つのインスタンスをインスタンス化します。ロギングをオンにすると、それも表示されます。flatMapこれで、入力データがラウンドロビン方式でタスクに分散されます。各flatMapタスクは、個別のスレッドによって実行されます。

単一のコアにしか負荷がかからない理由は、SplitterXが何も機能しないためだと思います。それぞれの文字数をカウントしString、結果をコンソールに出力する次のコードを試してください。

public static void main(String[] args) throws Exception {
    System.out.println("Starting Main!");

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
        .getLocalFileSystem().getWorkingDirectory());

    StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print();

    env.execute("Something");
}

public static class SplitterX implements
    FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence,
                        Collector<Tuple2<String, Integer>> out) throws Exception {
        out.collect(Tuple2.of(sentence, sentence.length()));

    }
}

各行の先頭にある数字は、どのタスクが結果を出力したかを示しています。

于 2016-01-15T16:30:10.327 に答える