1

多くの(> 1000)ワーカー(プロセス)がいくつかの作業を行い、作業結果をデータベースに保存したいと考えています。作業の結果はJSONオブジェクトです。ワーカーは1秒あたり1〜5のJSONオブジェクトを生成します。データベースセーバーは分離されたプロセスです。JSONオブジェクトをワーカーからセーバーに転送するための単方向接続はmultiprocessing.Pipeです。パイプの数は労働者の数と同じです。

セーバープロセスでは、定期的に次のことを行います。

def recv_data(self):
    data = []
    for pipe in self.data_pipe_pool:
        if pipe.poll():
            data.append(pipe.recv())
    return data

self.data_pipe_pool-ワーカーからのパイプのリスト。

100人までのワーカーを実行すれば、すべて正常に動作します。1000を超えるワーカーを実行すると、例外が発生します。

2013-02-13T15:17:40.731429
Traceback (most recent call last):
  File "saver.py", line 44, in run
    profile = self.poll_data()
  File "saver.py", line 116, in poll_data
    ret = self.recv_data()
  File "saver_unit.py", line 127, in recv_data
    if pipe.poll():
IOError: handle out of range in select()

私はこれがselect()電話によるものであることを知っています:

FD_SETSIZEは通常、GNU/Linuxシステムでは1024に定義されています

しかし、どこで呼ばれselectますか?の場合pipe.poll()、なぜFD_SETSIZE制限を超えるのpipe.poll()ですか、個別に1つのパイプを呼び出していますか?この呼び出しでPython言語ソースをどこで見ることができますselectか?

FD_SETSIZE制限を超えない、または使用しない回避策は何selectですか?

4

4 に答える 4

2

selectマニュアルページを確認すると、次のことがわかります。

fdの値が負またはFD_SETSIZE以上のFD_CLR()またはFD_SET()を実行すると、未定義の動作が発生します。

これはselect、呼び出しの舞台裏で使用されておりpoll(可能性が高いと思われる)、ファイル記述子がより大きい場合FD_SETSIZE(1000を超えるパイプがある場合)、結果は何でもかまいません。

于 2013-02-13T12:43:32.353 に答える
0

ワーカーからサーバーへのパイプが 1 つあるように聞こえるので、beanstalkdのようなものを使用することを考えたことはありますか? 通常、1000 のワーカーと、結果キューから読み取ってデータベースに格納する 10 のサーバーがあります。

利点は、ワーカーがサーバーと通信するよりも作業に多くの時間を費やしている可能性が高いことですが、そのサーバーに対してパイプが開かれています。

何千ものワーカーが取得する「ジョブキュー」と、結果を保存する「結果キュー」を持つことができます。その後、多くのサーバーが「results-queue」から取得して DB に保存することができます。

これは、「少なくとも、ファイル ハンドルが不足することはない」と言うには長い道のりです。

于 2013-02-13T14:17:40.500 に答える
0

同様の問題がありました。各ワーカーに multiprocessing.Queue を使用し、ワーカー数が約 600 の場合、キューのメソッドは で失敗しましIOError: handle out of range in select()た。

問題の原因となった問題を公式のバグトラッカーで見つけました。2.7.4 で修正されました (私は 2.7.3 を持っていました)。pythonパッケージを更新すると問題が解決しました。

于 2016-08-05T17:30:37.340 に答える
0

epollを使用してこの問題を解決しました。解決策は非常に簡単です:

def set_data_pipe_poll(self, data_pipe_poll):
    self.epoll = select.epoll()
    for p in data_pipe_poll:
        self.epoll.register(p, select.EPOLLIN)
    self.data_pipe_poll = data_pipe_poll

def recv_data(self):
    data = []
    events = self.epoll.poll(timeout = 0)
    for fileno, _ in events:
        p = filter(lambda x: x.fileno() == fileno, self.data_pipe_poll)[0]
        data.append(p.recv())
    return data

私が電話するとき、私は電話しepoll.poll()ませんselect

于 2013-03-20T13:58:05.117 に答える