0

Javaで構築された次のkafkaトポロジロジックがあります。

        KStream<String, Event> stream = bldr.stream(topicName,Consumed.with(Serdes.String(), eventSerde()));

        KGroupedStream<String, Event> aggregateByOrderID = stream.map((key, event) -> new KeyValue<>( event.getOrderId())).groupByKey(with(Serdes.String(), eventSerde()));

        final KTable<String, StoreValue> store = aggregateByOrderID.aggregate(() -> new event("", 0.0, "", "", true), (key, value, aggregate) -> {
                    try {
                        aggregate.setQuantity(value.getQuantity());
                        aggregate.setFlag(Boolean.TRUE);
                        aggregate.setOrderId(key);
                        aggregate.setEventId(value.getId());
                        return aggregate;
                    } catch(NumberFormatException | NullPointerException ex) {
                        LOGGER.error("Error occured");
                    }
                    return aggregate;
                }, Materialized.with(Serdes.String(), eventSerde()));

        KGroupedStream<String, Store> aggregateAccountQuantityStore = store.toStream().map((key, value) -> new KeyValue<>(value.getAccountNumber(), value))
                .groupByKey(with(Serdes.String(), eventSerde()));

        final KTable<String, SellingQuantity> finalQuantity = aggregateAccountQuantityStore.aggregate(() -> new SellingQuantity("", 0.0, ""), (key, env, aggr) -> {
                        Double b = aggr.getQuantity() + env.getQuantity();
                        aggr.setQuantity(b);
                        aggr.setAccountNumber(env.getAccountNumber());
                        aggr.setEventId(env.getEventId());
                        return aggr;   
                }, Materialized.with(Serdes.String(), eventSerde()));
                
        finalQuantity.toStream().map((k, event) -> new KeyValue<>(k, event)).to(anotherTopic, Produced.with(Serdes.String(), eventSerde()));

また、SASL プロトコルを介して実装され、Kerberos 認証を使用します。

問題は、トポロジ ロジックを変更するたびに、次のような例外が発生することです。

トピックにアクセスする権限がありません: [KSTREAM-AGGREGATE-STATE-STORE-0000000009-repartition, KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition]

この問題を軽減する方法、またはこれを完全に解決する方法はわかりません。

注:ローカル セットアップでは、セキュリティ プロトコルを実装していないため、問題は発生していません。

4

0 に答える 0