問題タブ [apache-samza]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
exception - サムザ メッセージの送信に失敗しました。例外
aws emr インスタンスで samza を使用していますが、常に次のような例外が発生します。誰か助けてもらえますか?:
org.apache.samza.SamzaException: メッセージの送信に失敗しました。例外: java.lang.IllegalStateException: プロデューサが閉じられた後は送信できません。org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120) で org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer. scala:111) org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) で org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:86) で org.apache.samza .system.SystemProducers.send(SystemProducers.scala:87) org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala) :72) org.apache.samza.storage.kv.SerializedKeyValueStore で。
java - シリアル化されたメッセージを別の protobuf メッセージに挿入できますか
protobuf でエンコードされたメッセージを使用して、kafka/samza ジョブのパイプラインを操作します。パイプラインは、特定のデータ セットに対して非常に長くなる可能性があるため、パイプラインの各ステージにタイムスタンプ/ID を追加して、効率とサービスの正常性を監視したいと考えています。
追加情報は、タッチポイントと呼ばれるスキーマの繰り返しフィールドに追加されます。明らかに、Java/samza でメッセージをデコードし、追加のメッセージを追加して再度シリアル化すると、メッセージのサイズに応じてオーバーヘッドが増加します (逆シリアル化時間が長くなる場合もあります)。パイプの一部は、メッセージをチェックする単なるフィルターです。キーであり、逆シリアル化する必要さえない場合があるため、これらのオーバーヘッドが少ないほど良い.
逆シリアル化せずに2番目のシリアル化されたメッセージを既存のメッセージに挿入することは可能ですか?もしそうなら、これは非常に悪い習慣であり(そうなると思います)、逆シリアル化/追加/する必要がないより良い解決策はありますか?メッセージ パス/フロー時間を監視するためのシリアル化
apache-kafka - Samza タスクが 1 つのパーティションで受信しない
samza タスクの 1 つに不可解な問題があります。1 つのパーティションのメッセージを除いて、正しく動作します。このトピックには 9 つのパーティションがあります。1000 件のメッセージを送信すると、約 890 件しか受信しません。
samza ジョブによって処理されないことがわかっているパーティション キーを使用して kafka-console-consumer を確認しましたが、コンソール コンシューマーにはメッセージが表示されるため、トピックに書き込まれていることがわかり、少なくともバニラ コンシューマーはそれをうまく見てください。
samza でデバッグ ログを有効にしましたが、次のような多くのメッセージが表示されorg.apache.samza.checkpoint.kafka.KafkaCheckpointManager
ます。
チェックポイントの追加 チェックポイント [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}] for taskName パーティション 4
パーティション 4 は常に 448 と表示されます。パーティション 0 にも同様のログがありますが、448 と表示されている場合は着実に増加しています。
これを絞り込むのに役立つ興味深い構成情報を喜んで共有しますが、今のところ、何を共有するかについて少し戸惑っています.
私は次のように実行しThreadJobFactory
ています:
samza-kafka_2.10 バージョン 0.9.1
クライアント上の kafka_2.10 バージョン 0.8.2.1
カフカブローカー 0.9.0.0
アップデート
同じパーティション キーを使用してアップストリームの samza ジョブを調べたところ、アップストリームのパーティション 4 で問題が見つかりました。kafkacat で samza チェックポイントのトピックを確認すると、パーティション 4 のチェックポイントが進んでいないことがわかります。最初に私が見る:
それから 1 分後、次のように表示されます。
数値は 2556 を超えません。ただし、resource.mutation
パーティション 4 の実際のトピックを見ると、最後のオフセットの範囲は他のものと同様で、現在は約 61000 であり、増加しています。
エラー メッセージや警告メッセージはまったくありません。パーティション 4 からの消費を停止するだけです。
java - YARN ジョブを強制終了できません
簡単な Samza ジョブがあり、YARN クラスターに送信します。ジョブは 1 つのコンテナーを割り当て、問題なく実行されます。
ただし、ジョブを強制終了しようとすると、RM がジョブが正常に強制終了されたと主張しているにもかかわらず、AM とジョブ コンテナーの両方が NM で実行されたままになります。
NM ログから、次のことがわかります。
ステータスが遷移することFINISHING_CONTAINERS_WAIT
はなくkill -9
、コンテナー プロセスに移行する必要がありました。
私は Samza バージョン0.10.0
と YARN バージョンを使用してHadoop 2.6.0-cdh5.4.9
います。
何か案が?
アップデート:
掘り下げた後、私はこれを見ることができます:
hadoop-yarn - ContainerRequestState [INFO] キューに保留中のリクエストはありません
3 つのノードを持つ MapR (YARN) クラスターを使用しています。データ ストリームの処理のために、クラスターに 6 つの Samza ジョブをデプロイしようとしています。すべてのジョブは正しいです。2〜3個を並行してデプロイしてみましたが、動作します。ただし、6 つの Samza ジョブすべてを並行して展開すると、次のログが表示されます。タスクは引き続き実行され、期待される出力データ ストリームが生成されません。
ResourceManager Web ダッシュボードのノードのステータスは次のとおりです。
誰でもこれを解決する方法を提案できますか? アプリケーションには、それらすべてを並行して実行するのに十分なリソースがない可能性があると思います。どのような変更を試すことができますか?
mysql - Spark / Samza / Storm は過去のコミットを元に戻し、ビューを再生成できますか?
Turning the database inside-outを見たところ、Samza と Redux の類似点に気付きました。すべての状態は、不変オブジェクトのストリームで構成されています。
これにより、事後的にストリームを編集した場合、理論上はトランザクションの新しいリストに基づいてすべての具体化されたビューを再生成し、実際にはデータベースへの過去の変更を「元に戻す」ことができることに気付きました。
例として、次の一連の差分があるとします。
この一連の変更の後、データベースは次のようになります。
番号「3」を元に戻したい場合はどうすればよいでしょうか。新しい差分セットは次のようになります。
そして私たちのデータベース:
これは理論上は良さそうに思えますが、実際に Samza、Storm、または Spark を使用して実行できるのでしょうか? どのトランザクション ストリーム データベースでもこれを実行できますか? このような管理目的の機能に興味があります。クライアントが誤って従業員を削除したり、意図しないレコードを変更したりするサイトがいくつかあります。過去に、データベースへのすべての変更を記録する別のテーブルを作成することでこれを解決し、問題が発生したときに (手動で) このテーブルを見て、何が間違っていたかを把握し、(手動で) データを修正しました。
トランザクション ストリームを見て、悪いものを削除し、「データベースを再生成してください」と言うことができれば、とてもクールです。
java - task.commit.ms を 1ms ごとに設定できますか?
Apache-Samza を使用したプロジェクトがあり、データの重複に問題があります。
これは私のチェックポイント構成です:
ドキュメントでは、これを読むことができます:
task.checkpoint.factory が設定されている場合、このプロパティはチェックポイントが書き込まれる頻度を決定します。値は、ミリ秒単位のチェックポイント間の時間です。チェックポイントの頻度は、障害回復に影響します。コンテナが予期せず (クラッシュやマシンの障害などにより) 失敗し、再起動された場合、最後のチェックポイントで処理を再開します。失敗したコンテナーの最後のチェックポイント以降に処理されたメッセージは、再度処理されます。チェックポイントをより頻繁に設定すると、2 回処理される可能性のあるメッセージの数が減りますが、より多くのリソースが使用されます。
task.commit.ms=20000
250ms または 1msに変更できますか。それは良いですか、それとも非常に悪いですか?私は非常に良いクラスターを持っています。
この Samza (ワーカー) は毎週 1 ~ 3 回クラッシュするため、これを変更する必要があるのはなぜですか。そして今、一時的な解決策は毎回オフセットをコミットすることです。
ドキュメント参照: