0

私は比較的簡単に見えることをしようとしていますが、いくつかの困難に直面しています。

たくさんのテキストがあり、各行は値です。テキストの各行を分析し、適切なキーを作成してから、KV ペアを発行します。次に、GroupByKey変換を使用します。最後に、キーごとにグループ化されたすべてのテキストを出力したいと思います (キーごとに 1 つのテキスト ファイルを取得できればボーナス ポイントですが、それが可能かどうかはわかりません)。

パイプラインは次のapplyようになります。

    public PCollection<String> apply(PCollection<String> generator) {

        // Returns individuals lines of text as <String,String> KV pairs
        PCollection<KV<String,String>> generatedTextKV = generator.apply(
                ParDo.of(new GeneratorByLineFn()));

        // Groups the <String,String> KV pairs by value
        PCollection<KV<String, Iterable<String>>> groupedText = generatedTextKV.apply(
            GroupByKey.<String, String>create());

        // Hopefully returns output where all of each key's values are together
        PCollection<String> results = groupedText.apply(ParDo.of(new FormatOutputFn()));

        return results;
    }

残念ながら、FormatOutputFn()思い通りに動作させることができません。

各値を反復してIterable<String>出力しても、キーと値のグループ化は保証されません (これについて間違っている場合は修正してください。そうすれば問題は解決します)。次にStringBuilder()、小さなデータセットで動作しますが、当然のことながらjava.lang.OutOfMemoryError: Java heap space、より大きなデータのログにエラーを生成する を使用してみました。変換も試しましたが、K,V ペアの値はではなく、通常Flatten.FlattenIterablesの であるため、どちらも機能しません。PCollectionIterable

共通キーによる分析に関するこの質問を見てきましたが、答えからすると、自分の状況で何をすべきかが正確にはわかりません。を使用する必要があると思いますが、使用Combine.PerKey方法が正確にはわかりません。また、これを行うには事前に作成された方法が必要であると想定していますが、ドキュメントでその事前に作成された方法を見つけることができません。私は正しい場所を見ていないだけだと確信しています。

そして、前述のように、テキスト ファイルの名前がキーで、値がすべてファイル内にあるテキスト ファイル出力を取得する方法があれば、それは素晴らしいことです。しかし、Dataflow でこれができるとは思いません (まだ?)。

読んでくれてありがとう。

4

1 に答える 1

3

Dataflow は現在、PCollection での順序付けの概念をサポートしていません。キーのグループ化を含め、「結果」に順序があるという保証がないことは正しいです。ある時点で PCollection の順序付けプロパティを追加したいと考えていますが、そのタイムラインはまだわかっていません。

基礎となる実装の詳細により、特定の状況で特定のランナーが順序付けされているように見える場合があります。たとえば、FormatOutputFn が書き込みステップと融合している場合、それぞれKV<K, Iterable<V>>が複数<K,V>の に処理され、次の処理が行われる前にファイルに書き込まれるため、グループ化が見られる可能性がKV<K, Iterable<V>>あります。ただし、これは Dataflow がこの特定のケースを最適化するために選択した方法の成果物にすぎず、一般的に依存するべきではありません。

すでにわかっているように、1 つの要素がメモリに収まる場合、FormatOutputFn でそれぞれKV<K, Iterable<V>>を複数の改行を含む 1 つの String に変換できます。

ここではそうではないことを考えると、私が考えることができる最善の解決策は、ファイルを手動で書き込むことです。そのため、FormatOutputFn はそれぞれKV<K, Iterable<V>>を取得し、標準の GCS ライブラリを使用して K という名前のファイルを開き、Iterable<V>それに書き込みます。悪いニュースは、フォールト トレランス セマンティクスがどのように要素を再試行するかを認識する必要があるため、これが少しトリッキーになることです。しかし、幸いなことに、私たちは現在、これらの種類のカスタム シンクをより簡単にするためのライブラリに取り組んでいます。

長さ 0 のファイルについては、ここにすばらしい答えがあります

于 2015-03-20T04:28:53.450 に答える