zmq に「ファイル ディスパッチャー」を実装しようとしています (実際には jeromq、jni は避けたいと思います)。
必要なのは、着信ファイルをプロセッサに負荷分散することです。
- 各ファイルは 1 つのプロセッサのみで処理されます
- ファイルは潜在的に大きいため、ファイル転送を管理する必要があります
理想的にはhttps://github.com/zeromq/filemqのようなものが欲しいのですが、
- パブリッシュ/サブスクライブではなく、プッシュ/プル動作を使用する
- ディスクに書き込むのではなく、受信したファイルを処理できる
私の考えは、taskvent/tasksink と asyncsrv のサンプルを組み合わせて使用することです。
クライアント側:
- 処理するファイルを通知する 1 つの PULL ソケット
- (非同期) ファイル転送チャンクごとに処理する 1 つの DEALER ソケット
サーバ側:
- 着信ファイル (名前) をディスパッチするための 1 つの PUSH ソケット
- ファイル要求を処理するための 1 つの ROUTER ソケット
- クライアントのファイル転送を管理し、インプロセス プロキシ経由でルーターに接続されている数人の DEALER ワーカー
私の最初の質問は次のとおりです。これは正しい方法のように思えますか? もっと簡単なものはありますか?
2 番目の質問は、現在の実装が実際のファイル データの送信でスタックしてしまうことです。
- クライアントはサーバーから通知を受け、リクエストを発行します。
- サーバーワーカーはリクエストを取得し、応答をインプロセスキューに書き戻しますが、応答はサーバーから送信されないようで (wireshark では表示されません)、クライアントは応答を待っている poller.poll でスタックします。
ソケットがいっぱいになってデータがドロップするという問題ではありません。一度に送信される非常に小さなファイルから始めています。
洞察はありますか?
ありがとう!
==================
ラフィアンのアドバイスに従って、コードを簡素化し、プッシュ/プルの余分なソケットを削除しました (あなたがそれを言うのは理にかなっています)。
「機能していない」ソケットが残っています!
これが私の現在のコードです。今のところ範囲外の多くの欠陥があります (クライアント ID、次のチャンクなど)。
今のところ、私は両方の男がそのシーケンスでお互いに大雑把に話しているようにしようとしています.
サーバ
object FileDispatcher extends App { val context = ZMQ.context(1) // server is the frontend that pushes filenames to clients and receives requests val server = context.socket(ZMQ.ROUTER) server.bind("tcp://*:5565") // backend handles clients requests val backend = context.socket(ZMQ.DEALER) backend.bind("inproc://backend") // files to dispatch given in arguments args.toList.foreach { filepath => println(s"publish $filepath") server.send("newfile".getBytes(), ZMQ.SNDMORE) server.send(filepath.getBytes(), 0) } // multithreaded server: router hands out requests to DEALER workers via a inproc queue val NB_WORKERS = 1 val workers = List.fill(NB_WORKERS)(new Thread(new ServerWorker(context))) workers foreach (_.start) ZMQ.proxy(server, backend, null) } class ServerWorker(ctx: ZMQ.Context) extends Runnable { override def run() { val worker = ctx.socket(ZMQ.DEALER) worker.connect("inproc://backend") while (true) { val zmsg = ZMsg.recvMsg(worker) zmsg.pop // drop inner queue envelope (?) val cmd = zmsg.pop //cmd is used to continue/stop cmd.toString match { case "get" => val file = zmsg.pop.toString println(s"clientReq: cmd: $cmd , file:$file") //1- brute force: ignore cmd and send full file in one go! worker.send("eof".getBytes, ZMQ.SNDMORE) //header indicates this is the last chunk val bytes = io.Source.fromFile(file).mkString("").getBytes //dirty read, for testing only! worker.send(bytes, 0) println(s"${bytes.size} bytes sent for $file: "+new String(bytes)) case x => println("cmd "+x+" not implemented!") } } } }
クライアント
object FileHandler extends App { val context = ZMQ.context(1) // client is notified of new files then fetches file from server val client = context.socket(ZMQ.DEALER) client.connect("tcp://*:5565") val poller = new ZMQ.Poller(1) //"poll" responses poller.register(client, ZMQ.Poller.POLLIN) while (true) { poller.poll val zmsg = ZMsg.recvMsg(client) val cmd = zmsg.pop val data = zmsg.pop // header is the command/action cmd.toString match { case "newfile" => startDownload(data.toString)// message content is the filename to fetch case "chunk" => gotChunk(data.toString, zmsg.pop.getData) //filename, chunk case "eof" => endDownload(data.toString, zmsg.pop.getData) //filename, last chunk } } def startDownload(filename: String) { println("got notification: start download for "+filename) client.send("get".getBytes, ZMQ.SNDMORE) //command header client.send(filename.getBytes, 0) } def gotChunk(filename: String, bytes: Array[Byte]) { println("got chunk for "+filename+": "+new String(bytes)) //callback the user here client.send("next".getBytes, ZMQ.SNDMORE) client.send(filename.getBytes, 0) } def endDownload(filename: String, bytes: Array[Byte]) { println("got eof for "+filename+": "+new String(bytes)) //callback the user here } }