現在、クラスター化されたトポロジ モードで Apache Storm 0.9.5 を使用して、Amazon Kinesis レコード (spout) を処理し、それらを Redshift データ ウェアハウス (bolt) に保存しています。私たちの Storm クラスターは AWS にデプロイされており、1 つの Nimbus + UI ノード、1 つの ZooKeeper ノード、および 3 つのスーパーバイザー + ログビューアー ノードで構成されています。当社のトポロジ設定は、複数の Kinesis ストリームの処理をサポートし、ストリームごとに以下が含まれます。
- 着信レコードをリッスンする 1 つの Kinesis ストリーム スパウト
- データ ウェアハウスにレコードを挿入するための 1 つの Redshift ボルト
トポロジー:
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
final String spoutId = kinesisStreamSpout.getSpoutId();
topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());
// set the corresponding redshift bolt
final String streamName = kinesisStreamSpout.getStreamName();
final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
topologyBuilder.setBolt(redshiftBolt.getId(),
redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}
return topologyBuilder.createTopology();
このシステムの弱点は、入力メッセージの一度だけの処理を保証できないことであり、その結果、同じビジネス キーを持つ複数のレコードがターゲット データベースに挿入されます。問題の規模を把握するために、制御されたテストを実行したところ、すべての入力レコードの約 3 分の 1 が複数回処理のために送信されたことがわかりました。
このスレッド(現在は未回答) に従って、一度だけの処理を保証するために Trident を使用することも検討しましたが、べき等性をシステムに組み込むことがより重要であるという結論に達しました (少なくとも-once セマンティクス) ではなく、複雑さを追加し、パフォーマンスを低下させ、この別の記事で提案されているように状態を生成します。
現在、クラスタリングをサポートする方法で既存のトポロジ内にべき等性を実装する最善の方法についてアドバイスを求めています。これまでのところ、タプル メッセージ ID によって値をキーにする RedisBolt の導入に傾いています。Apache Storm を使用してこれを達成するための既存のパターンはありますか?