3

トポロジに問題があります。ワークフローを説明しようとしています... 2 分ごとに ~500k のタプルを発行するソースがあります。これらのタプルはスパウトによって読み取られ、単一のオブジェクトのように 1 回正確に処理される必要があります (トライデントのバッチだと思います)。その後、bolt/function/what else?...タイムスタンプを追加し、タプルを Redis に保存する必要があります。

Jedis オブジェクト (Java 用の Redis ライブラリ) をこの関数クラスに使用して、すべてのタプルを Redis に保存する関数を使用してトライデント トポロジを実装しようとしましたが、展開すると、このオブジェクトで NotSerializable 例外が発生します。

私の質問は、Redis にこのタプルのバッチを書き込む関数を実装するにはどうすればよいですか? Webで読んでいると、関数からRedisに書き込む例や、トライデントのStateオブジェクトを使用する例が見つかりません(おそらく使用する必要があります...)

私の単純なトポロジ:

TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));

前もって感謝します

4

2 に答える 2

7

(Redisに関連する特定の問題は他のコメントで解決されているように見えるため、一般的な状態について返信します)

Storm が分散 (または「分割」) データ ソースから (Storm の「スパウト」を介して) 読み取り、多くのノードでデータ ストリームを並行して処理し、必要に応じてそれらの計算を実行することを念頭に置くと、Storm での DB 更新の概念がより明確になります。データのストリーム (「集約」と呼ばれます) を生成し、結果を分散データ ストア (「状態」と呼ばれます) に保存します。集約は、「コンピューティングのもの」を意味する非常に広い用語です。たとえば、ストリームの最小値の計算は、クラスターのノードで現在処理されている新しい値を使用した、以前に既知の最小値の集約として、Storm で見られます。

アグリゲーションとパーティションの概念を念頭に置いて、状態で何かを保存できるようにする Storm の 2 つの主要なプリミティブである partitionPersist と persistentAggregate を見ることができます。他のパーティションは、DAO を介して DB と対話するように少し感じますが、2 つ目は、タプルを「再パーティション化」し (つまり、通常は何らかの groupby ロジックに沿ってクラスター全体に再分散します)、何らかの計算を行います (「集計」 ) DB に何かを読み取ったり保存したりする前に、DB ではなく HashMap と話しているように感じます (その場合、Storm は DB を「MapState」と呼び、マップにキーが 1 つしかない場合は「Snapshot」と呼びます)。

心に留めておくべきもう 1 つのことは、Storm の1 回限りのセマンティックは、各タプルを 1 回だけ処理することによって達成されるわけではないということです。スケーラビリティ上の理由から 2 フェーズ コミットを使用し、大規模になるとネットワーク パーティションが発生する可能性が高くなります。むしろ、Storm は通常、タプルが少なくとも 1 回完全に正常に処理されたことを確認するまで、タプルの再生を続けます。. これと状態の更新との重要な関係は、Storm がプリミティブ (OpaqueMap) を提供することです。このプリミティブ (OpaqueMap) により、冪等な状態の更新が可能になり、これらのリプレイによって以前に保存されたデータが破損することはありません。たとえば、数値 [1,2,3,4,5] を合計している場合、DB に保存される結果は、いくつかの理由で "合計" 操作で数回再生および処理されても、常に 15 になります。一時的な障害。OpaqueMap は、DB にデータを保存するために使用される形式にわずかな影響を与えます。これらのリプレイと不透明なロジックは、Storm にそのように動作するように指示した場合にのみ存在することに注意してください。ただし、通常はそうします。

詳細をお読みになりたい場合は、このテーマに関する 2 つのブログ記事をここに投稿しました。

http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/

最後にもう 1 つ: 上記のリプレイで示唆されているように、Storm は本質的に非常に非同期的なメカニズムです。通常、キュー システム (Kafka や 0MQ など) にイベントを投稿するデータ プロデューサーがあり、Storm はそこから読み取ります。その結果、質問で提案されているように、嵐の中からタイムスタンプを割り当てると、望ましい効果が得られる場合とそうでない場合があります。このタイムスタンプは、データの取り込み時間ではなく、「最新の成功した処理時間」を反映し、もちろん同一ではありません。リプレイされたタプルの場合。

于 2014-02-27T06:52:13.217 に答える
1

redis の trident-state を試しましたか。すでにそれを行うコードが github にあります: https://github.com/kstyrc/trident-redis

これがあなたの質問に答えているかどうか教えてください。

于 2014-02-24T10:36:15.800 に答える