2

私は Scala API 経由で apache flink を使用していますが、ある時点で DataSet[(Int, Int, Int)]. メソッドを使用した結果 writeAsCSV()writeAsText()予期しないものです。ディレクトリを作成します。そのディレクトリには、メソッド呼び出しの最初のパラメーターの場所と名前があります (例: filePath .)。そのディレクトリには、"1" と "2" という名前の 2 つのファイルが出現します。これらのファイルでは、DataSets データを確認できます。DataSet のコンテンツは、これら 2 つのファイルに分割されているようです。この動作を再現して、より簡潔なコード フラグメントを表示しようとしましたが、できませんでした。つまり、予想される位置に予想される名前のファイルが 1 つ作成され、ディレクトリが作成されないことを確認しました。val mas = ma_ groupBy(0,1) sum(2) mas.writeAsCsv("c:\flink\mas.csv" )

「mas.csv」という名前のディレクトリと、その中に「1」と「2」の 2 つのファイルが作成されます。このようなことはいつ起こりますか?flink 9.1 ローカル モード、Windows 7、scala 2.10、eclipse3.0.3 を使用

4

1 に答える 1

6

これは予期される動作です。単一の出力ファイルを取得する場合は、シンクの並列処理を 1 に設定する必要があります。

dataset = dataset.writeAsCsv("filename").setParallelism(1);

DataStream API の場合rebalane()、演算子チェーンを壊すために追加を挿入する必要があります。そうしないと、チェーン全体が dop=1 で実行されるsetParallelism()か、無視される可能性があります。

datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);
于 2015-09-15T08:56:57.923 に答える