2

Camel の FTP2 コンポーネントに問題があり、コンシューマー アクターが Akka システム内に住んでいます。

基本的な考え方は、ファイルの FTP ディレクトリを監視し、子アクターを生成して各ファイルを個別に処理することです。Akka は、並行性と信頼性の管理に使用されています。親コンシューマ アクターはディレクトリを noop=true でポーリングするため、何も実行されません。その後、子コンシューマ アクターは、'include' Camel オプションでフィルタリングされたファイルをダウンロードすることになっています。ダウンロードが同時に行われることが重要であり、ファイルがメモリにロードされないことが重要です (したがって、localWorkDirectory が使用されます)。

私は簡単な再現を書きました:

package camelrepro;

import java.io.InputStream;

import org.mockftpserver.core.command.Command;
import org.mockftpserver.core.command.ReplyCodes;
import org.mockftpserver.core.session.Session;
import org.mockftpserver.core.session.SessionKeys;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockftpserver.fake.UserAccount;
import org.mockftpserver.fake.command.AbstractFakeCommandHandler;
import org.mockftpserver.fake.filesystem.FileEntry;
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.testkit.JavaTestKit;

public class Main {

    public static class ParentActor extends UntypedConsumerActor {

        public ParentActor() {
            System.out.println("Parent started");
        }
        @Override
        public String getEndpointUri() {
            return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true";
        }

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof CamelMessage) {
                getContext().actorOf(new Props(ChildActor.class), "0");
            } else {
                unhandled(msg);
            }
        }
    }

    public static class ChildActor extends UntypedConsumerActor {

        public ChildActor() {
            System.out.println("Child started");
        }

        @Override
        public String getEndpointUri() {
            return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp";
        }

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof CamelMessage) {
                System.out.println("Child got message");
                CamelMessage camelMsg = (CamelMessage) msg;

                InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext());
                System.out.println(source.getClass().getName());
                System.exit(0);
            } else {
                unhandled(msg);
            }
        }
    }

    public static void main(String[] args) {

        ActorSystem system = ActorSystem.create("default");

        FakeFtpServer ftpServer = new FakeFtpServer();
        UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem();
        ftpServer.setFileSystem(ftpFileSystem);
        ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/"));
        ftpServer.setServerControlPort(8021);

        // fix bug in PWD handling (either Apache FTP client or mock server depending on opinion)
        ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() {
            @Override
            protected void handle(Command command, Session session) {
                String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY);
                this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR;
                verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet");
                int replyCode = ReplyCodes.PWD_OK;
                String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\""));
                session.sendReply(replyCode, replyText);
            }
        });
        ftpFileSystem.add(new FileEntry("/test.txt", "hello world"));
        ftpServer.start();

        new JavaTestKit(system) {{
            getSystem().actorOf(new Props(ParentActor.class));
        }};
    }
}

バージョンを示す Maven の依存関係:

    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-camel_2.10</artifactId>
            <version>2.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-testkit_2.10</artifactId>
            <version>2.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-ftp</artifactId>
            <version>2.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.mockftpserver</groupId>
            <artifactId>MockFtpServer</artifactId>
            <version>2.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>commons-net</groupId>
            <artifactId>commons-net</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>

BufferedInputStream が標準出力に書き込まれることを期待しており、ByteArrayInputStream がそうでないことを確認します。

しかし、代わりに、ファイルが見つからないという例外が表示されます。

[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162)

数回、それは機能し、どこかでレースである可能性があるのではないかと思いました. しかし、ほとんどの場合失敗します。

手がかり、アイデア、提案はありますか?

FWIW:

uname -a: Linux 3.2.0-37-generic #58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
java: 1.7.0_11-b21
4

3 に答える 3

3

上記の問題の解決策を見つけました。

これは、子コンシューマーautoAck()が true を返すという事実です (これはデフォルトで行われます)。そのような場合、akka-camel はCamelMessageファイア アンド フォーゲットを送信し、クリーンアップを続行します。InputStream一方、子アクターは、によって呼び出された型コンバーターの 1 つによって開かれるまで、実際には開かれませんgetBodyAs()。そのため、子アクターが を介してファイルを開くことgetBodyAs()と、非同期でメッセージを送信した後にファイルを削除する Camel クリーンアップとの間で競合が発生します。

したがって、修正はautoAck()false を返すようにオーバーライドし、子メッセージ ハンドラーの最後でAck.getInstance()(または必要に応じて) 送信することです。new Status.Failure(<cause>)

于 2013-02-18T14:38:21.960 に答える
1

2.10.3 には ftp コンポーネントに関する問題があるため、Camel 2.10.2 を使用してください。

于 2013-02-15T15:38:21.303 に答える
0

localWorkDirectory=/tmp を使用する場合、そのディレクトリはルーティング中にファイルを一時的に格納するためのものです。Camel Exchange が完了すると、ファイルは削除されます。これが非同期イベントである Akka でどのように機能するかはわかりません。そのため、Camel Exchange が完了した後に Akka onReceive が非同期で呼び出される可能性があるため、一時ファイルが削除されます。

Camel では、より永続的な性質のファイルの場所にファイルをルーティングします。

 from("ftp:...")
   .to("file:inbox")

そして、代わりに Akka が ("file:inbox") から消費するようにすることができます。

于 2013-02-16T08:10:02.743 に答える