Javaで構築された次のkafkaトポロジロジックがあります。
KStream<String, Event> stream = bldr.stream(topicName,Consumed.with(Serdes.String(), eventSerde()));
KGroupedStream<String, Event> aggregateByOrderID = stream.map((key, event) -> new KeyValue<>( event.getOrderId())).groupByKey(with(Serdes.String(), eventSerde()));
final KTable<String, StoreValue> store = aggregateByOrderID.aggregate(() -> new event("", 0.0, "", "", true), (key, value, aggregate) -> {
try {
aggregate.setQuantity(value.getQuantity());
aggregate.setFlag(Boolean.TRUE);
aggregate.setOrderId(key);
aggregate.setEventId(value.getId());
return aggregate;
} catch(NumberFormatException | NullPointerException ex) {
LOGGER.error("Error occured");
}
return aggregate;
}, Materialized.with(Serdes.String(), eventSerde()));
KGroupedStream<String, Store> aggregateAccountQuantityStore = store.toStream().map((key, value) -> new KeyValue<>(value.getAccountNumber(), value))
.groupByKey(with(Serdes.String(), eventSerde()));
final KTable<String, SellingQuantity> finalQuantity = aggregateAccountQuantityStore.aggregate(() -> new SellingQuantity("", 0.0, ""), (key, env, aggr) -> {
Double b = aggr.getQuantity() + env.getQuantity();
aggr.setQuantity(b);
aggr.setAccountNumber(env.getAccountNumber());
aggr.setEventId(env.getEventId());
return aggr;
}, Materialized.with(Serdes.String(), eventSerde()));
finalQuantity.toStream().map((k, event) -> new KeyValue<>(k, event)).to(anotherTopic, Produced.with(Serdes.String(), eventSerde()));
また、SASL プロトコルを介して実装され、Kerberos 認証を使用します。
問題は、トポロジ ロジックを変更するたびに、次のような例外が発生することです。
トピックにアクセスする権限がありません: [KSTREAM-AGGREGATE-STATE-STORE-0000000009-repartition, KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition]
この問題を軽減する方法、またはこれを完全に解決する方法はわかりません。
注:ローカル セットアップでは、セキュリティ プロトコルを実装していないため、問題は発生していません。