3

Kafka Streams has an interface, Processor, the implementation of which is stateful. An example implementation given in the developer guide is:

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Long> kvStore;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // call this processor's punctuate() method every 1000 time units.
      this.context.schedule(1000);

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore) context.getStateStore("Counts");
  }

  @Override
  public void process(String dummy, String line) {
      String[] words = line.toLowerCase().split(" ");

      for (String word : words) {
          Long oldValue = kvStore.get(word);
          if (oldValue == null) {
              kvStore.put(word, 1L);
          } else {
              kvStore.put(word, oldValue + 1L);
          }
      }
  }

  @Override
  public void punctuate(long timestamp) {
      KeyValueIterator<String, Long> iter = this.kvStore.all();
      while (iter.hasNext()) {
          KeyValue<String, Long> entry = iter.next();
          context.forward(entry.key, entry.value.toString());
      }
      iter.close();
      // commit the current processing progress
      context.commit();
  }

  @Override
  public void close() {
      // close the key-value store
      kvStore.close();
  }

}

The init method initializes WordCountProcessor's internal state, such as retrieving a key-value store. Other methods, like process and close, make use of this state.

It's not clear to me how to reify such an interface in Clojure. How would we pass on the state retrieved by init to process, close, etc.?

Using a closure?

One idea I have is to use a closure:

(let [ctx (atom nil)]
  (reify Processor
    (close [this]
      ;; Do something w/ ctx
      )
    (init [this context]
      (reset! ctx context))
    (process [this k v]
      ;; Do something w/ ctx
      )
    (punctuate [this timestamp]
      ;; Do something w/ ctx
      )))

Annoyingly, we'd have to start with the ProcessorContext object each time, so the key-value store code would be repeated across all methods that need the key-value store.

I don't see a (general) way around that, though on a case-by-case basis we can replace the ctx atom with more specific state that the methods need.

Is there a better way?

4

1 に答える 1

2

それを行う主な方法は、アトムを閉じることです。元のクラスには 2 つのフィールドがあるため、2 つ以上のアトムを閉じて同じ効果を得ることができます

(let [ctx (atom nil)
      kv-store (atom nil)]
  (reify Processor
    ,,,
    (init [this context]
      (reset! ctx context)
      (reset! kv-store (.getStateStore context "Counts")))
    ,,,))

それでも面倒な場合は、アトムを閉じるいくつかの便利な関数を追加できます

(let [ctx (atom nil)
      kv-store (atom nil)]

  (def kv-get [key]
    (.get @kv-store key))

  (def kv-all []
    (iterator-seq (.all @kv-store)))

  (def kv-put [key value]
    (.put @kv-store key value))

  (reify Processor
    ,,,
    (init [this context]
      (reset! ctx context)
      (reset! kv-store (.getStateStore context "Counts")))
    ,,,
  (punctuate [this timestamp]
    (do-seq [x (kv-all)]
      ,,,)
  )))

代わりにgen-classを使用することもできますが、reify を使用したほうがよいと思います。

于 2016-12-17T09:53:38.553 に答える