6

ツイストを使用して実行されるアプリケーションのネットワークをシミュレートしようとしています。シミュレーションの一環として、特定のイベントを同期し、各プロセスに大量のデータを供給できるようにしたいと思います。マルチプロセッシングのイベントとキューを使用することにしました。しかし、私のプロセスはハングしています。

問題を説明するために、以下のサンプルコードを記述しました。具体的には(私のSandy Bridgeマシンでは約95%の時間)、'run_in_thread'関数は終了しますが、Ctrl-Cを押すまで'print_done'コールバックは呼び出されません。

さらに、サンプルコードのいくつかの変更を行って、これをより確実に機能させることができます。たとえば、生成されるプロセスの数を減らす、reactor_readyからself.ready.setを呼び出す、deferLaterの遅延を変更するなどです。

ツイストリアクターとQueue.get()やEvent.wait()などのマルチプロセッシング呼び出しのブロックの間のどこかに競合状態があると思いますか?

私が直面している問題は正確には何ですか?私が見逃しているコードのバグはありますか?これを修正できますか、それともマルチプロセッシングイベント/キューと互換性がありませんか?

第二に、spawnProcessやAmpouleのようなものが推奨される代替手段でしょうか?( Mix Python Twisted with multiprocessing?で提案されているように)

編集(要求に応じて):

glib2reactor selectreactor、pollreactor、およびepollreactorを試したすべてのreactorで問題が発生しました。epollreactorは最良の結果をもたらすようであり、以下に示す例では正常に機能するようですが、それでも私のアプリケーションで同じ(または同様の)問題が発生します。今後も調査を続けていきます。

Gentoo Linuxカーネル3.3および3.4​​、python 2.7を実行しており、Twisted 10.2.0、11.0.0、11.1.0、12.0.0、および12.1.0を試しました。

Sandy Bridgeマシンに加えて、デュアルコアAMDマシンでも同じ問題が発生します。

#!/usr/bin/python
# -*- coding: utf-8 *-*

from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task

from multiprocessing import Process
from multiprocessing import Event

class TestA(Process):
    def __init__(self):
        super(TestA, self).__init__()
        self.ready = Event()
        self.ready.clear()
        self.start()

    def run(self):
        reactor.callWhenRunning(self.reactor_ready)
        reactor.run()

    def reactor_ready(self, *args):
        task.deferLater(reactor, 1, self.node_ready)
        return args

    def node_ready(self, *args):
        print 'node_ready'
        self.ready.set()
        return args

def reactor_running():
    print 'reactor_running'
    df = threads.deferToThread(run_in_thread)
    df.addCallback(print_done)

def run_in_thread():
    print 'run_in_thread'
    for n in processes:
        n.ready.wait()

def print_done(dfResult=None):
    print 'print_done'
    reactor.stop()

if __name__ == '__main__':
    processes = [TestA() for i in range(8)]
    reactor.callWhenRunning(reactor_running)
    reactor.run()
4

1 に答える 1

10

簡単な答えは「はい」です。ツイストとマルチプロセッシングは相互に互換性がなく、意図したとおりに確実に使用することはできません。

すべてのPOSIXプラットフォームで、子プロセス管理は処理と密接に関連していSIGCHLDます。POSIXシグナルハンドラーはプロセスグローバルであり、シグナルタイプごとに1つしか存在できません。

Twistedとstdlibmultiprocessingの両方にSIGCHLDハンドラーをインストールすることはできません。そのうちの1つだけができます。つまり、子プロセスを確実に管理できるのはそのうちの1つだけです。あなたのサンプルアプリケーションは、それらのどれがその能力を獲得するかを制御していないので、その事実から生じるその振る舞いにはいくつかの非決定論があると私は期待します。

ただし、この例のより直接的な問題は、親プロセスでTwistedをロードしてから、すべての子プロセスを実行するのではなくmultiprocessing、フォークするために使用することです。Twistedは、このような使用をサポートしていません。フォークしてから実行すれば、問題はありません。ただし、新しいプロセス(おそらく、Twistedを使用するPythonプロセス)の実行がないため、Twistedが考慮していないあらゆる種類の追加の共有状態が発生します。あなたの特定のケースでは、この問題を引き起こす共有状態は、を実装するために使用される内部の「ウェイカーfd」です。親とすべての子の間でfdが共有されているため、親が呼び出しの結果を配信するためにメインスレッドをウェイクアップしようとすると、代わりに子プロセスの1つがウェイクアップする可能性があります。deferToThreaddeferToThread。子プロセスには何の役にも立たないので、それは時間の無駄です。その間、親のメインスレッドがウェイクアップすることはなく、スレッド化されたタスクが完了したことに気付くこともありません。

子プロセスを作成するまでTwistedをロードしないことで、この問題を回避できる可能性があります。これにより、Twistedに関する限り、使用法が単一プロセスのユースケースに変わります(各プロセスで最初にロードされ、その後、そのプロセスはまったくフォークに進まないため、フォークとツイスト相互作用はもうありません)。これは、子プロセスを作成するまでTwistedをインポートしないことを意味します。

もちろん、これはツイステッドが行く限りあなたを助けるだけです。使用している他のライブラリでも同様の問題が発生する可能性があります(glib2について言及しました。これは、このように使用しようとすると完全に詰まる別のライブラリの優れた例です)。

multiprocessingモジュールをまったく使用しないことを強くお勧めします。代わりに、forkだけではなく、forkとexecを含むマルチプロセスアプローチを使用してください。アンプルはそのカテゴリーに分類されます。

于 2012-07-01T15:50:15.433 に答える