SinkTask
独自のシンク コネクタを作成するために拡張したいと考えています。
フラッシュ中にオフセットを保存し、次にシンク コネクタを起動したときに、保存したオフセットから読み取りを再開したい場合、正しい方法は何ですか?
SinkTaskContext
オーバーライドされたのを使用してinitialize(SinkTaskContext context)
、独自のオフセットを割り当ててみました。
@Override
public void initialize(SinkTaskContext context) {
HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
...
context.offset(offsetMap);
}
ただし、パーティションがまだ割り当てられていないため、これは機能しません。例外が発生していました。
次に、コンテキスト ( from initialize()
) をグローバル変数に保存し、それを使用してメソッド内でオフセットを割り当てopen(Collection<TopicPartition> partitions)
(オーバーライドされた from SinkTask
)、内部で行っていたのと同じ方法で使用する必要がありinitialize
ますか? 例えば:
@Override
public void open(Collection<TopicPartition> partitions) {
HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
for (TopicPartition tp : partitions) // for each partition assigned
{
Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
if (offset == null) { offset = 0l; } // 0 Long
offsetMapNew.put(tp, offset);
}
mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}