必要なjarがMavenによって取得されたEclipse内からflinkを実行しています。私のマシンには 8 つのコアを備えたプロセッサがあり、ストリーミング アプリケーションはその入力から行を読み取り、いくつかの統計を計算する必要があります。
私のマシンでプログラムを実行したとき、flink が CPU のすべてのコアを適切にスレッド化されたコードとして使用することを期待していました。しかし、コアを見ると、コアが 1 つしか使用されていないことがわかります。私は多くのことを試しましたが、次のコードには最後の試み、つまり環境の並列処理の設定を残しています。ストリーム単体などにも設定してみました。
public class SemSeMi {
public static void main(String[] args) throws Exception {
System.out.println("Starting Main!");
System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
.getLocalFileSystem().getWorkingDirectory());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(8);
env.socketTextStream("localhost", 9999).flatMap(new SplitterX());
env.execute("Something");
}
public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
// Do Nothing!
}
}
}
netcatを使用してプログラムにデータを供給しました:
nc -lk 9999 < fileName
問題は、プログラムをローカルにスケーリングし、利用可能なすべてのコアを使用する方法です。