Kafka Streams has an interface, Processor
, the implementation of which is stateful. An example implementation given in the developer guide is:
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 time units.
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Long oldValue = kvStore.get(word);
if (oldValue == null) {
kvStore.put(word, 1L);
} else {
kvStore.put(word, oldValue + 1L);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
}
@Override
public void close() {
// close the key-value store
kvStore.close();
}
}
The init
method initializes WordCountProcessor
's internal state, such as retrieving a key-value store. Other methods, like process
and close
, make use of this state.
It's not clear to me how to reify
such an interface in Clojure. How would we pass on the state retrieved by init
to process
, close
, etc.?
Using a closure?
One idea I have is to use a closure:
(let [ctx (atom nil)]
(reify Processor
(close [this]
;; Do something w/ ctx
)
(init [this context]
(reset! ctx context))
(process [this k v]
;; Do something w/ ctx
)
(punctuate [this timestamp]
;; Do something w/ ctx
)))
Annoyingly, we'd have to start with the ProcessorContext
object each time, so the key-value store code would be repeated across all methods that need the key-value store.
I don't see a (general) way around that, though on a case-by-case basis we can replace the ctx
atom with more specific state that the methods need.
Is there a better way?