問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
943 参照

message-queue - Kafkaで分散処理と高可用性を同時に実現するには?

n 個のパーティションで構成されるトピックがあります。分散処理を行うために、異なるマシンで実行される 2 つのプロセスを作成します。同じグループ ID でトピックにサブスクライブし、n/2 スレッドを割り当てます。それぞれが単一のストリームを処理します (プロセスごとに n/2 パーティション)。

これで負荷分散が達成されますが、プロセス 1 がクラッシュした場合、プロセス 2 はプロセス 1 に割り当てられたパーティションからメッセージを消費できません。

または、HA を構成し、両方のプロセスで n スレッド/ストリームを開始すると、1 つのノードに障害が発生すると、すべてのパーティションが他のノードによって処理されます。しかし、ここでは、すべてのパーティションが一度に 1 つのノードによって処理されるため、分散に問題があります。

両方を同時に達成する方法はありますか?

0 投票する
0 に答える
272 参照

apache-kafka - sockect 127.0.0.1 の kafka サーバーでエラーが発生しました

私は kafka -storm-cassandra を実行しようとしています。私の場合、tail2kafka 自体がプロデューサーであり、cosumer を開始してトピックを消費すると、以下のエラーがスローされます。私を助けてください。

ありがとう

[2015-05-13 15:28:51,784] エラーのため、/127.0.0.1 のソケットを閉じるエラー (kafka.network.Processor) java.lang.OutOfMemoryError: kafka.api.ProducerRequest$$anonfun$1$ の Java ヒープ領域$anonfun$apply$1.apply(ProducerRequest.scala:45) で kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42) で scala.collection.TraversableLike$$anonfun$map scala.collection.immutable.Range$ByOne$class.foreach(Range.scala: 282) scala.collection.immutable.Range$$anon$1.foreach(Range.scala:27​​4) で scala.collection.TraversableLike$class.map(TraversableLike.scala:206) で scala.collection.immutable.Range.map (Range.scala:39) kafka.api で。ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42) で kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38) で scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. scala:227) で scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) で scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) で scala.collection .immutable.Range$$anon$1.foreach(Range.scala:27​​4) で scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227) で scala.collection.immutable.Range.flatMap(Range.scala:39) ) kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38) で kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) で kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys .scala:36) kafka.network.RequestChannel$Request.(RequestChannel.scala:53) で kafka.network.Processor.read(SocketServer.scala:353) で kafka.network.Processor.run(SocketServer.scala:245) で Java で.lang.Thread.run(Thread.java:745)

そして、私の消費者コードは

インポート構造体のインポート時間

import kafka.io import kafka.request_type

クラス コンシューマー (kafka.io.IO):

CONSUME_REQUEST_TYPE = kafka.request_type.FETCH

最大サイズ = 1024 * 1024

# 秒。DEFAULT_POLLING_INTERVAL = 2

def init (self, topic, partition=0, host='localhost', port=9092): kafka.io.IO. init (自己、ホスト、ポート)

def consumer(self): """ トピック キューからデータを消費します。 """

def loop(self): """ キューからの着信メッセージをブロッキング方式でループします。pollingチェック間隔を秒単位で設定します。 """

# リクエスト タイプ ID + トピックの長さ + トピック + パーティション + オフセット + 最大サイズ def request_size(self): return 2 + 2 + len(self.topic) + 4 + 8 + 4

def encode_request_size(self): return struct.pack('>i', self.request_size())

def encode_request (self): 長さ = len (self.topic)

def send_consume_request(self): self.write(self.encode_request_size()) self.write(self.encode_request())

def read_data_response(self): buf_length = struct.unpack('>i', self.read(4))[0]

def parse_message_set_from(self, data): メッセージ = [] 処理済み = 0 長さ = len(データ) - 4