私は Flume 1.4.0 で作業しており、特定の方法で Flume のコンポーネントを停止しようとしています:
- まず、ソースを停止します。
- 次に、チャネル内のすべてのイベントがシンクによって消費されるまで待ちます。
- すべてのイベントが消費されたら、チャネルとシンクを停止します。
上記のタスクは、で作成されたシャットダウン フックによって実行されますorg.apache.flume.node.Application
(実際、私はカスタム を開発していますApplication
)。
ソース、チャンネル、シンクへの参照を取得する方法は次のとおりです。
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
ポイントは、私がこれを得ているということですNullPointerException
:
2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)
HTTPSource.java:165
ソースの Http サーバー部分を実装する Jetty サーバーを停止することについてです。これは null のように見えます。
162 @Override
163 public void stop() {
164 try {
165 srv.stop();
166 srv.join();
167 srv = null;
168 } catch (Exception ex) {
169 LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170 }
171 sourceCounter.stop();
172 LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173 }
なぜヌルなのですか?ソースは正常に動作し、HTTP リクエストを受信できます。
これは、Flume のコンポーネントを閉じる適切な方法ではないと思います...そうでない場合、どちらですか?
ありがとう!