ここではKafka コンシューマ(バージョン 1.3.1)を使用しています。
私が達成しようとしているもの:
パーティションは 10 個あります。各パーティションはオフセット 0 から始まります。
コンシューマーのグループ (1、2、3 など) があります。
場合によっては、1 つのコンシューマーが停止または稼働していることがあります。
そのため、グループのメンバーは変わる可能性があります。しかし、各パーティションの各メッセージは、グループによって 1 回だけ (1 または 2 または 3) 消費される必要があります。
私のコードは次のとおりです。
consumer = KafkaConsumer('my_topic',
bootstrap_servers=['ip:9092'],
auto_offset_reset='earliest',
max_partition_fetch_bytes=131072,
group_id='writer.test')
上記の構成で十分ですか?どんなコメントも歓迎します。ありがとう
アップデート
次のコードを試しました。パーティション 760 では毎回、各メッセージが 1 つのグループの 2 人のコンシューマーによって 2 回消費される可能性があります。なんで?何か問題でも?
def test():
#PULL FROM KAFKA
consumer = KafkaConsumer(
'topic',
bootstrap_servers=[ip],
auto_offset_reset='latest',
max_partition_fetch_bytes=131072,
auto_commit_interval_ms=500,
group_id='writer2.test')
print consumer.poll()
for i in range(10000):
msg = next(consumer)
if str(msg[1])=='670':
print 'partition= %s, offset= %s' % (msg[1], msg[2])
consumer.unsubscribe()
if __name__ == "__main__":
for i in range(10):
import time
time.sleep(5)
test()
出力 1:
{}
partition= 670, offset= 224
partition= 670, offset= 225
partition= 670, offset= 226
partition= 670, offset= 227
partition= 670, offset= 228
partition= 670, offset= 229
partition= 670, offset= 230
partition= 670, offset= 231
partition= 670, offset= 232
partition= 670, offset= 233
partition= 670, offset= 234
partition= 670, offset= 235
partition= 670, offset= 236
partition= 670, offset= 237
partition= 670, offset= 238
partition= 670, offset= 239
partition= 670, offset= 240
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
別のウィンドウで同じファイルを実行すると、次のように出力されます。
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259