問題タブ [amazon-kcl]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
amazon-kcl - AWS KCL で processRecords が失敗した場合の対処方法は?
KCL ベースの nodejs でアプリケーションを作成しています。関数ではprocessRecords
、kinesis から取得したレコードを http リクエストで Web サービスに送信しようとしました。ただし、Web サービスが利用できない場合、http 要求は失敗します。kinesis からレコードを取得するために KCL を停止し、pagerduty にアラートを送信したいと考えています。
誰でもそれを行う方法を知っていますか?
amazon-web-services - Amazon KCL チェックポイントとトリム ホライズン
チェックポイントとトリミングは AWS KCL ライブラリでどのように関連していますか?
ドキュメンテーションページの処理の開始、シャットダウン、およびスロットリングには次のように記載されています。
デフォルトでは、KCL は、最後に追加されたレコードであるストリームの先端からレコードの読み取りを開始します。この構成では、受信側のレコード プロセッサが実行される前に、データ生成アプリケーションがレコードをストリームに追加すると、そのレコードは、起動後にレコード プロセッサによって読み取られません。
常にストリームの先頭からデータを読み取るようにレコード プロセッサの動作を変更するには、Amazon Kinesis Streams アプリケーションのプロパティ ファイルに次の値を設定します。
initialPositionInStream = TRIM_HORIZON
Java で Amazon Kinesis クライアント ライブラリ コンシューマーを開発するドキュメント ページには、次のように記載されています。
Streams では、レコード プロセッサが、シャードで既に処理されたレコードを追跡する必要があります。KCL は、checkpointer (IRecordProcessorCheckpointer) を processRecords に渡すことで、この追跡を処理します。レコード プロセッサは、このインターフェイスで checkpoint メソッドを呼び出して、シャード内のレコードの処理がどの程度進んだかを KCL に通知します。ワーカーに障害が発生した場合、KCL はこの情報を使用して、最後に処理された既知のレコードからシャードの処理を再開します。
最初のページは、KCL がストリームの先頭で再開したことを示しているように見えます。2 ページ目は、最後に処理された既知のレコード ( をRecordProcessor
使用して処理済みとしてマークされたcheckpointer
) です。私の場合、最後に処理された既知のレコードから再起動する必要があります。initialPositionInStream を TRIM_HORIZON に設定する必要がありますか?
streaming - DynamoDB ストリームの伝搬レイテンシーを測定するにはどうすればよいですか?
DynamoDB Streams + Kinesis Client Library (KCL) を使用しています。イベントがストリームで作成されてから KCL 側で処理されるまでのレイテンシーを測定するにはどうすればよいですか?
私が知っているように、KCL のMillisBehindLatest
メトリクスは (DynamoDB ストリームではなく) Kinesis ストリームに固有のものです。
approximateCreationDateTime
record 属性には分レベルの概算があり、1 秒未満の遅延システムでの監視には受け入れられません。
DynamoDB ストリームのレイテンシを監視するための便利なメトリクスを教えてください。
amazon-kinesis - kinesis クライアント ワーカー ロジック
IRecordProcessor の processRecords メソッドがいつワーカーから呼び出されるかを理解したいと思います。以前の processRecords への呼び出しがまだ完了していない場合、ワーカーは次の processRecords を呼び出しますか? ワーカーは kinesis から新しいレコードの取得を開始しますか、それとも現在のレコードの実行が終了するまで待機しますか。
基本的に、dbがダウンしているかその他のエラーが発生したため、processRecordsが外部データベースにレコードを保存しているときに例外が発生した場合、長時間待ちたいと思います。ワーカーが以前の処理が完了するまで新しいレコードのフェッチを開始しない場合、問題がないことを確認したいですか?
java - スパーク ストリーミングでキネシス コンシューマー プロパティを変更する方法
KinesisReceiver のドキュメントは次のとおりです。ここでは、 KinesisClientLibConfigurationのすべてのデフォルト値が使用されます。
Spark ストリーミング アプリケーションでプロパティ ファイルを使用して既定値を変更する方法はありますか?
amazon-web-services - N 分ごとにトリガーするラムダ用の dynamodb ストリーム KCL アダプターにチェックポイントを設定する方法
CloudWatch イベントをスケジュールして N 分ごとにラムダをトリガーすることで、DynamoDB Streams Kinesis Adapter を使用して、最後の N 分間 DynamoDB Streams にアクセスしたいと考えていました。次回ラムダがトリガーされたときに、最後に処理されたレコードを追跡して、最後に処理されたレコードから続行できるようにするにはどうすればよいですか。
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.htmlから、 チェックポイントを使用して最後に処理されたレコードを追跡できることがわかりましたが、これがどこにあるのか疑問です保管されていますか?N 分後に次のラムダ トリガーにこのチェックポイントを使用するにはどうすればよいですか?