3

私は、ftp サーバーを消費するために小さなラクダを書きました。

しかし、しばらく実行した後、例外をスローし、実行を続けますが、それ以上何も消費しません。また、再起動したときに、消費されるのを待っているファイルが多数あると、再びクラッシュします。既に例外ハンドラーを追加しましたが、例外をキャッチしていないようです。

これは私が受け取る例外です:

Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - File operation failed: 150 Opening ASCII mode data connection for 2386442.XML(3895 bytes).
Accept timed out. Code: 150]
org.apache.camel.component.file.GenericFileOperationFailedException: File operation failed: 150 Opening ASCII mode data connection for 2386442.XML(3895 bytes).
Accept timed out. Code: 150
    at org.apache.camel.component.file.remote.FtpOperations.retrieveFileToStreamInBody(FtpOperations.java:336)
    at org.apache.camel.component.file.remote.FtpOperations.retrieveFile(FtpOperations.java:297)
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:333)
    at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:94)
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:175)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:136)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:140)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:92)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)
    at java.net.ServerSocket.implAccept(ServerSocket.java:462)
    at java.net.ServerSocket.accept(ServerSocket.java:430)
    at org.apache.commons.net.ftp.FTPClient._openDataConnection_(FTPClient.java:560)
    at org.apache.commons.net.ftp.FTPClient.retrieveFile(FTPClient.java:1442)
    at org.apache.camel.component.file.remote.FtpOperations.retrieveFileToStreamInBody(FtpOperations.java:328)
    ... 16 more
Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: GenericFile[2386448.XML] from: Endpoint[ftp://1.1.1.1?delay=15000&delete=true&disconnect=true&exclude=((?i).*pdf$)&password=******&username=user]
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: GenericFile[2386448.XML] from:   Endpoint[ftp://1.1.1.1?delay=15000&delete=true&disconnect=true&exclude=((?i).*pdf$)&password=******&username=user]
   at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:338)
  at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:94)
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:175)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:136)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:140)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:92)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

これは、Java DSL を使用して作成したルートです。

    // XML Predicate
    // only allows names without spaces
    Predicate xmlPredicate = header(RssUtils.CAMEL_FILE_NAME).regex(
            "([\\S]+(\\.(?i)(xml))$)");
    // Images Predicate
    // only allows names without spaces
    Predicate imgPredicate = header(RssUtils.CAMEL_FILE_NAME).regex(
            "([\\S]+(\\.(?i)(jpg|png|gif))$)");

    onException(SchemaValidationException.class).to(
            "file://" + props.getProperty(RssUtils.ROOT_DIR)
                    + "/errors/SchemaValidationException");

    onException(GenericFileOperationFailedException.class).to(
            "file://" + props.getProperty(RssUtils.ROOT_DIR)
                    + "/errors/GenericFileExceptions");

    from(
            "ftp://"
                    + props.getProperty(RssUtils.FTP_URL)
                    + "?username="
                    + props.getProperty(RssUtils.FTP_USER)
                    + "&password="
                    + props.getProperty(RssUtils.FTP_PWD)
                    + "&disconnect=true&delete=true&exclude=((?i).*pdf$)&delay="
                    + props.getProperty(RssUtils.FTP_DELAY))
            .choice()
            .when(xmlPredicate)
            .to("jms:xmlQueue")
    .to("jms:archiveQueue")
            .when(imgPredicate)
            .to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/img")
            .otherwise()
            .to("file://" + props.getProperty(RssUtils.ROOT_DIR)
                    + "/errors/other");

    from("jms:xmlQueue").to("validator:FtpXmlValidator.xsd")
            .to("xslt://XmlToRssConverter.xsl")
            .process(rssFeedProcessor)
            .to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/rss/");

from("jms:archiveQueue")
    .to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/archive/");

この種の af 動作を回避するためにできることはありますか? テストするのは本当に難しいので、誰かが私のコードの欠陥を見つけてくれることを願っています。かなり長い間探していますが、しっかりしたものは見つかりません。たぶん、この問題をデバッグできる方法はありますか?

誰かが彼の考えを与えることができると私が見つけたいくつかのことがあります:

  • onException を使用する場合は、handled(true) を使用します
  • コンシューマの最大バッチ サイズを設定できますか? (これにスロットルを使用できますか?)
  • 私は Java DSL を使用しているため、最後に明示的な try catch を使用します

ここで間違ったことを言っても撃たないでください。私は Camel を学んでいるだけです。したがって、上記のコードについて提案がある場合は、それをいただければ幸いです。

よろしくお願いします!

4

2 に答える 2

1

ここにあるのは FTP の問題です。それが Apache Camel で発生しているという事実は、ほとんど無関係です。

爆弾の証拠部分は次のとおりです。

java.net.PlainSocketImpl.socketAccept(ネイティブメソッド)で java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)で java.net.ServerSocket.implAccept(ServerSocket.java:462)で java.net.ServerSocket.acceptで(ServerSocket.java:430) org.apache.commons.net.ftp.FTPClient. openDataConnection (FTPClient.java:560)

org.apache.commons.net.ftp.FTPClientのopenDataConnectionメソッドは、アクティブ モードの FTP をサポートするためにあります。パッシブ モードはコマンドと同じポートを使用するだけなので、別のポート接続は必要ありません。

パッシブ モードに切り替えてみてください (Apache Camel では passiveMode = true)。

于 2015-10-23T11:12:58.067 に答える
0

表面的には、ルートが失敗する理由を見なくても、処理して続行すること、つまり、この例外を処理して中断したところからルートを続行することのように思えます。ドキュメントごと:

Camel 2.3 で利用可能 Camel 2.3では、例外が発生しなかったかのように元のルートでルーティングを 処理および続行できる
新しいオプションを導入しました。continued

たとえば、IDontCareException がスローされた場合に無視して続行するには、次のようにします。

onException(IDontCareException).continued(true);

ここで何が起こるか:

Camel は例外をキャッチし、. . . 無視して、元のルートでルーティングを続行してください。でも 。. . 元のルートでルーティングを続行する前に、最初にその [onException] ルートをルーティングします。

これを試してみると、問題が解決する場合があります。上記で暗示したように、根本的な問題が何であるかによっては、これは適切な解決策というよりは応急処置のようなものになる可能性があります. より良いアプローチは、FTP コンシューマーが失敗している理由を突き止めることです。一見すると、という名前のファイルが見つからないように見えます2386448.XML

根本原因を特定したら、 を使用して、次choiceのように適切なタイミングで異なる動作を行うことができます。

.choice()
    .when(isValidFtpResponse())
         .to(DIRECT_CONTINUE_FTP_ROUTE)
    .otherwise()
        .setBody(constant(null))
        .log(ERROR, "FTP failed: ${headers}")
.end()

うまくいけば、いくつかのアイデアが得られ、この問題を解決するのに役立ちます.

于 2012-09-11T16:37:00.200 に答える