2

私は Flume 1.4.0 で作業しており、特定の方法で Flume のコンポーネントを停止しようとしています:

  • まず、ソースを停止します。
  • 次に、チャネル内のすべてのイベントがシンクによって消費されるまで待ちます。
  • すべてのイベントが消費されたら、チャネルとシンクを停止します。

上記のタスクは、で作成されたシャットダウン フックによって実行されますorg.apache.flume.node.Application(実際、私はカスタム を開発していますApplication)。

ソース、チャンネル、シンクへの参照を取得する方法は次のとおりです。

MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();

ポイントは、私がこれを得ているということですNullPointerException:

2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
    at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
    at     org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
    at     es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
    at     es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)

HTTPSource.java:165ソースの Http サーバー部分を実装する Jetty サーバーを停止することについてです。これは null のように見えます。

162  @Override
163  public void stop() {
164    try {
165      srv.stop();
166      srv.join();
167      srv = null;
168    } catch (Exception ex) {
169      LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170    }
171    sourceCounter.stop();
172    LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173  }

なぜヌルなのですか?ソースは正常に動作し、HTTP リクエストを受信できます。

これは、Flume のコンポーネントを閉じる適切な方法ではないと思います...そうでない場合、どちらですか?

ありがとう!

4

2 に答える 2

1

これは、srv が複数のスレッドによって共有されているためです (したがって、揮発性宣言です)。Flume は close を呼び出してソースを終了しようとしますが、これが複数回発生します。srv はすでに無効化されているため、stop() の 2 回目の呼び出しは失敗します。

これがあなたのケースで発生し、標準のバニラ Flume エージェントでは発生しない理由は、おそらく SourceCounter を更新していないためです。詳細については、MonitoredCounterGroup を参照してください。

于 2015-03-03T19:49:03.123 に答える
1

修理済み。configurationProvider.getConfiguration()エリックのポインターのおかげで、文が呼び出されるたびに新しい文MaterializedConfigurationが作成されることに気付くまで、コードをデバッグしました。このような実体化された構成は、実行中のソース、チャネル、およびシンクの完全なセットです。したがって、同じソースに対していくつかのコピーがありました... おっと!それにもかかわらず、どういうわけか、Flume は構成の複数の実体化を検出するのに十分スマートであり、複製されたすべてのコンポーネントをシャットダウンすることを見てきました... しかし、これには Jetty サーバーなどの volatile 変数が含まれます。

したがって、これを行う代わりに:

MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();

これで、必要な参照を取得できhandleConfigurationEvent(MaterializedConfiguration conf)ます (上書きされます):

@Override
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    sourcesRef = conf.getSourceRunners();
    channelsRef = conf.getChannels();
    sinksRef = conf.getSinkRunners();
    super.handleConfigurationEvent(conf);
} // handleConfigurationEvent

エリックに再び感謝します!

于 2015-03-04T14:24:08.043 に答える