2

Kafka トピックからすべてのメッセージを読み取り、新しいトピックに新しいメッセージを書き込むいくつかの Samza ジョブがあります。新しいメッセージを送信するために、Samza に組み込まれている OutgoingMessageEnvelope を使用しています。また、MessageCollector を使用して新しいメッセージを送信します。次のようになります。

collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))

これを使用して Kafka トピックにパーティションを追加する方法はありますか? ユーザーIDなどでのパーティション化など。

または、もっと良い方法があれば、私はそれを聞きたいです!

4

1 に答える 1

3

パーティショニング キーを使用してメッセージを送信できるはずです。

    public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

この方法を使用すると、データが分割されます。ただし、プログラムでパーティションの数を制御する場合は、kafka API を使用してトピックを作成/変更する必要があると思います

于 2015-09-04T11:21:20.737 に答える