私の質問は、(1)私の問題を解決するためのより良い戦略がありますか(2)それが機能し、信頼できる方法で集計を分割しないように私の解決策を微調整/改善することが可能ですか(3重要度が低いもの)どうすればよいですか私はそれをよりインテリジェントにデバッグしますか?アグリゲーターが実行しているwtfを把握することは困難です。これは、サイズが原因でデバッグが難しい巨大なバッチでのみ失敗するためです。これらのいずれかに対する回答は非常に役立ちます。最も重要なのは最初の2つです。
問題は、入ってくるCSVファイルを単一の塊として扱うために必要であり、すべてのレコードが集約されるまでアグリゲーターを停止させたくないということを、ラクダに正しく表現していないことだと思います。
100万行のCSVファイルをダイジェストし、いくつかの主要なプライマリフィールドのデータを分割して集計し、集計されたレコードをテーブルに書き込むルートを作成しています
残念ながら、テーブルの主要な制約に違反しています(これは集約キーにも対応しています)。これは、アグリゲーターが入力全体の終了を待機していないことを意味します。
数千レコードの小さなファイルでは問題なく動作しますが、実際に本番環境で直面する大きなサイズ(1,000,000レコード)では失敗します。
まず、CSVアンマーシャル後の分割でJavaHeapメモリエラーが発生して失敗します。これは.streaming()で修正します。これはアグリゲーターに影響を与え、アグリゲーターの「完了」が早すぎます。
説明する:
A 1
A 2
B 2
--- aggregator split ---
B 1
A 2
--> A(3),B(2) ... A(2),B(1) = constraint violation because 2 lots of A's etc.
when what I want is A(5),B(3)
100、1000などの例で、それが正常に正しく機能することを記録します。ただし、処理する必要のある実際のサイズである1,000,000レコードを処理する場合、最初にsplit()はOutOfJavaHeapSpace例外を取得します。
ヒープサイズを変更するだけで短期的な解決策になり、次のレコードの上限が来るまで問題を押し戻すことができると感じたので、分割で.streaming()を使用して問題を回避しました。
残念ながら、現在、アグリゲーターはレコードをドリップフィードされており、大きな手がかりになっていないため、早期に完了して別のアグリゲーションを実行しているようです。これは、私の主な制約に違反しています。
from( file://inbox )
.unmarshall().bindy().
.split().body().streaming()
.setHeader( "X" Expression building string of primary-key fields)
.aggregate( header("X") ... ).completionTimeout( 15000 )
etc.
問題の一部は、ストリーミングスプリットが一定時間より長くタイムアウトしないことに依存していることだと思います。これは絶対確実ではありません。たとえば、システムタスクがこれを合理的に引き起こす可能性があります。また、これを増やすたびにタイムアウトすると、このようなもののデバッグとテストがますます長くなります。
おそらくより良い解決策は、入ってくるCSVファイルのサイズを読み取り、すべてのレコードが処理されるまでアグリゲーターが完了できないようにすることです。しかし、これをラクダでどのように表現するかはわかりません。
おそらく私は、この問題にどのようにアプローチ/説明すべきかについて、根本的な状態の誤解を持っているだけです。私が知らないはるかに良い(より単純な)アプローチがあるかもしれません。
また、非常に多くのレコードが入っているため、実際に手動でデバッグして何が起こっているのかを把握することはできません(そうすると、アグリゲーターのタイムアウトも解除されます)