11

(この例ではpyprocessingモジュールを使用していますが、処理を multiprocessing に置き換えると、 python 2.6を実行するかmultiprocessing backportを使用する場合におそらく機能するはずです)

私は現在、UNIXソケットをリッスンし(processing.connection.Listenerを使用)、接続を受け入れ、リクエストを処理するスレッドを生成するプログラムを持っています。ある時点でプロセスを正常に終了したいのですが、accept() 呼び出しがブロックされているため、うまくキャンセルする方法がありません。少なくともここ(OS X)で機能する方法が1つあります。シグナルハンドラーを設定し、次のように別のスレッドからプロセスにシグナルを送信します。

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

私が考えた他の唯一の方法は、接続 (listener._listener._socket) の奥深くに到達し、非ブロック オプションを設定することです...しかし、それにはおそらくいくつかの副作用があり、一般的に本当に怖いです。

これを達成するためのよりエレガントな(そしておそらく正しい!)方法を誰かが持っていますか?OS X、Linux、BSD への移植性が必要ですが、Windows への移植性などは必要ありません。

明確化:ありがとうございます!いつものように、私の元の質問のあいまいさが明らかになりました:)

  • リスニングをキャンセルした後にクリーンアップを実行する必要があり、そのプロセスを実際に終了したいとは限りません。
  • 同じ親から生成されていない他のプロセスからこのプロセスにアクセスできるようにする必要があるため、キューが扱いにくくなります
  • スレッドの理由は次のとおりです。
    • それらは共有状態にアクセスします。実際には多かれ少なかれ一般的なメモリ内データベースなので、別の方法で実行できると思います。
    • 同時に複数の接続を受け入れることができる必要がありますが、実際のスレッドはほとんどの場合、何かをブロックしています。受け入れられた接続ごとに、新しいスレッドが生成されます。これは、I/O ops ですべてのクライアントをブロックしないようにするためです。

スレッドとプロセスに関しては、スレッドを使用してブロッキング ops を非ブロッキングにし、プロセスを使用してマルチプロセッシングを有効にします。

4

5 に答える 5

3

それがselectの目的ではありませんか??

selectがブロックしないことを示している場合にのみ、ソケットでacceptを呼び出します...

選択にはタイムアウトがあるため、時々中断して、シャットダウンする時間かどうかを確認できます....

于 2009-01-10T19:03:29.077 に答える
3

回避できると思ったのですが、次のようにする必要があるようです。

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

これがデメテルの法則を破り、邪悪なモンキー パッチを導入することは承知していますが、これがこれを達成するための最も移植しやすい方法であるように思われます。誰かがよりエレガントなソリューションを持っているなら、私はそれを聞いてうれしいです:)

于 2009-01-21T13:46:04.070 に答える
1

私はマルチプロセッシング モジュールを初めて使用しますが、プロセッシング モジュールとスレッド モジュールを混在させることは直感に反しているように思えます。同じ問題を解決することを目的としているのではないでしょうか?

とにかく、リッスン関数をプロセス自体にラップするのはどうですか? これがコードの残りの部分にどのように影響するかはわかりませんが、これはよりクリーンな代替手段になる可能性があります。

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'
于 2008-12-11T03:46:03.413 に答える
0

おそらく理想的ではありませんが、シグナルハンドラまたはプロセスを終了しているスレッドからソケットにデータを送信することで、ブロックを解放できます。

編集:これを実装する別の方法は、タイムアウトをサポートしているように見えるため、 Connection Queuesを使用することです (申し訳ありませんが、最初の読み取りでコードを読み違えました)。

于 2008-12-10T22:14:28.683 に答える