1

zmq に「ファイル ディスパッチャー」を実装しようとしています (実際には jeromq、jni は避けたいと思います)。

必要なのは、着信ファイルをプロセッサに負荷分散することです。

  • 各ファイルは 1 つのプロセッサのみで処理されます
  • ファイルは潜在的に大きいため、ファイル転送を管理する必要があります

理想的にはhttps://github.com/zeromq/filemqのようなものが欲しいのですが、

  • パブリッシュ/サブスクライブではなく、プッシュ/プル動作を使用する
  • ディスクに書き込むのではなく、受信したファイルを処理できる

私の考えは、taskvent/tasksink と asyncsrv のサンプルを組み合わせて使用​​することです。

クライアント側:

  • 処理するファイルを通知する 1 つの PULL ソケット
  • (非同期) ファイル転送チャンクごとに処理する 1 つの DEALER ソケット

サーバ側:

  • 着信ファイル (名前) をディスパッチするための 1 つの PUSH ソケット
  • ファイル要求を処理するための 1 つの ROUTER ソケット
  • クライアントのファイル転送を管理し、インプロセス プロキシ経由でルーターに接続されている数人の DEALER ワーカー

私の最初の質問は次のとおりです。これは正しい方法のように思えますか? もっと簡単なものはありますか?

2 番目の質問は、現在の実装が実際のファイル データの送信でスタックしてしまうことです。

  • クライアントはサーバーから通知を受け、リクエストを発行します。
  • サーバーワーカーはリクエストを取得し、応答をインプロセスキューに書き戻しますが、応答はサーバーから送信されないようで (wireshark では表示されません)、クライアントは応答を待っている poller.poll でスタックします。

ソケットがいっぱいになってデータがドロップするという問題ではありません。一度に送信される非常に小さなファイルから始めています。

洞察はありますか?

ありがとう!

==================

ラフィアンのアドバイスに従って、コードを簡素化し、プッシュ/プルの余分なソケットを削除しました (あなたがそれを言うのは理にかなっています)。

「機能していない」ソケットが残っています!

これが私の現在のコードです。今のところ範囲外の多くの欠陥があります (クライアント ID、次のチャンクなど)。

今のところ、私は両方の男がそのシーケンスでお互いに大雑把に話しているようにしようとしています.

  • サーバ

    object FileDispatcher extends App
    {
      val context = ZMQ.context(1)
    
      // server is the frontend that pushes filenames to clients and receives requests
      val server = context.socket(ZMQ.ROUTER)
      server.bind("tcp://*:5565")
      // backend handles clients requests
      val backend = context.socket(ZMQ.DEALER)
      backend.bind("inproc://backend")
    
      // files to dispatch given in arguments
      args.toList.foreach { filepath =>
        println(s"publish $filepath")
        server.send("newfile".getBytes(), ZMQ.SNDMORE)
        server.send(filepath.getBytes(), 0)
      }
    
      // multithreaded server: router hands out requests to DEALER workers via a inproc queue
      val NB_WORKERS = 1
      val workers = List.fill(NB_WORKERS)(new Thread(new ServerWorker(context)))
      workers foreach (_.start)
      ZMQ.proxy(server, backend, null)
    }
    
    class ServerWorker(ctx: ZMQ.Context) extends Runnable 
    {
      override def run() 
      {
        val worker = ctx.socket(ZMQ.DEALER)
        worker.connect("inproc://backend")
    
        while (true) 
        {
          val zmsg = ZMsg.recvMsg(worker) 
          zmsg.pop // drop inner queue envelope (?)
          val cmd = zmsg.pop //cmd is used to continue/stop
          cmd.toString match {
            case "get" => 
              val file = zmsg.pop.toString
              println(s"clientReq: cmd: $cmd , file:$file")
              //1- brute force: ignore cmd and send full file in one go!
              worker.send("eof".getBytes, ZMQ.SNDMORE) //header indicates this is the last chunk
              val bytes = io.Source.fromFile(file).mkString("").getBytes //dirty read, for testing only!
              worker.send(bytes, 0)
              println(s"${bytes.size} bytes sent for $file: "+new String(bytes))
            case x => println("cmd "+x+" not implemented!")
          }
        }
      }
    }
    
  • クライアント

    object FileHandler extends App
    {
      val context = ZMQ.context(1)
    
      // client is notified of new files then fetches file from server
      val client = context.socket(ZMQ.DEALER)
      client.connect("tcp://*:5565")
      val poller = new ZMQ.Poller(1) //"poll" responses
      poller.register(client, ZMQ.Poller.POLLIN)
    
      while (true) 
      {
        poller.poll
        val zmsg = ZMsg.recvMsg(client)
        val cmd = zmsg.pop
        val data = zmsg.pop
        // header is the command/action
        cmd.toString match {
          case "newfile" => startDownload(data.toString)// message content is the filename to fetch
          case "chunk" => gotChunk(data.toString, zmsg.pop.getData) //filename, chunk
          case "eof" => endDownload(data.toString, zmsg.pop.getData) //filename, last chunk
        }
      }
    
      def startDownload(filename: String) 
      {
        println("got notification: start download for "+filename)
        client.send("get".getBytes, ZMQ.SNDMORE) //command header
        client.send(filename.getBytes, 0) 
      }
    
      def gotChunk(filename: String, bytes: Array[Byte]) 
      {
        println("got chunk for "+filename+": "+new String(bytes)) //callback the user here
        client.send("next".getBytes, ZMQ.SNDMORE)
        client.send(filename.getBytes, 0) 
      }
    
      def endDownload(filename: String, bytes: Array[Byte]) 
      {
        println("got eof for "+filename+": "+new String(bytes)) //callback the user here
      }
    }
    
4

1 に答える 1