2

Alpakka-FTPを使用していますが、一般的な akka-stream パターンを探しているのかもしれません。FTP コネクタは、ファイルを一覧表示したり、それらを取得したりできます。

def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]

理想的には、次のようなストリームを作成したいと思います。

LIST
  .FETCH_ITEM
  .FOREACH(do something)

しかし、上で書いた 2 つの関数でそのようなストリームを作成することはできません。Flowのようなものを使用してそこに到達できるはずだと感じています

Ftp.ls
  .via(some flow that uses the Ftp.fromPath above)
  .runWith(Sink.foreach(do something))

ls上記の関数と関数のみが与えられた場合、これは可能fromPathですか?

編集:

1 つのアクターと を使用して解決することはできますがmapAsync、それでももっと簡単にする必要があると感じています。

class Downloader extends Actor {
  override def receive = {
    case ftpFile: FtpFile =>
      Ftp.fromPath(Paths.get(ftpFile.path), settings)
        .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
        .run() pipeTo sender
  }
}

val downloader = as.actorOf(Props(new Downloader))

Ftp.ls("test_path", settings)
  .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
  .runWith(Sink.foreach(res => println("got it!" + res)))
4

1 に答える 1