1

SinkTask独自のシンク コネクタを作成するために拡張したいと考えています。

フラッシュ中にオフセットを保存し、次にシンク コネクタを起動したときに、保存したオフセットから読み取りを再開したい場合、正しい方法は何ですか?

SinkTaskContextオーバーライドされたのを使用してinitialize(SinkTaskContext context)、独自のオフセットを割り当ててみました。

@Override
public void initialize(SinkTaskContext context) {
  HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
  ...
  context.offset(offsetMap);
}

ただし、パーティションがまだ割り当てられていないため、これは機能しません。例外が発生していました。

次に、コンテキスト ( from initialize()) をグローバル変数に保存し、それを使用してメソッド内でオフセットを割り当てopen(Collection<TopicPartition> partitions)(オーバーライドされた from SinkTask)、内部で行っていたのと同じ方法で使用する必要がありinitializeますか? 例えば:

@Override
public void open(Collection<TopicPartition> partitions) {
  HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
  for (TopicPartition tp : partitions) // for each partition assigned
  {
     Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
     if (offset == null) { offset = 0l; } // 0 Long
     offsetMapNew.put(tp, offset);
  }
  mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}
4

1 に答える 1

0

中にオフセットをリセットするopen() が正しいアプローチですが、まだ解決されていないバグのため、現在は適切に処理されません。

現時点での回避策は、 でオフセットのリセットを処理することput()です。これは少し直感に反するかもしれませんが、独自のオフセットを管理しているため、必要に応じて実際にデータを無視できます。最初のput()呼び出しを取得すると、オフセットのロードとリセットを処理できます。以降のすべてのデータは、リセット時に指定したオフセットから取得されます。これは、HDFS コネクタが現在、正確に 1 回の配信を実装する方法です。(これは、正確に 1 回だけ取得できる方法の良い例ですが、残念ながら比較的複雑なコードです。) 実際、Kafka Connect のオフセット管理機能を駆動したのは HDFS コネクタであったため、リセットを行わないという事実on rebalance はまさにこれが実装で見逃された方法です。

于 2016-07-23T20:56:59.767 に答える