ツイストを使用して実行されるアプリケーションのネットワークをシミュレートしようとしています。シミュレーションの一環として、特定のイベントを同期し、各プロセスに大量のデータを供給できるようにしたいと思います。マルチプロセッシングのイベントとキューを使用することにしました。しかし、私のプロセスはハングしています。
問題を説明するために、以下のサンプルコードを記述しました。具体的には(私のSandy Bridgeマシンでは約95%の時間)、'run_in_thread'関数は終了しますが、Ctrl-Cを押すまで'print_done'コールバックは呼び出されません。
さらに、サンプルコードのいくつかの変更を行って、これをより確実に機能させることができます。たとえば、生成されるプロセスの数を減らす、reactor_readyからself.ready.setを呼び出す、deferLaterの遅延を変更するなどです。
ツイストリアクターとQueue.get()やEvent.wait()などのマルチプロセッシング呼び出しのブロックの間のどこかに競合状態があると思いますか?
私が直面している問題は正確には何ですか?私が見逃しているコードのバグはありますか?これを修正できますか、それともマルチプロセッシングイベント/キューと互換性がありませんか?
第二に、spawnProcessやAmpouleのようなものが推奨される代替手段でしょうか?( Mix Python Twisted with multiprocessing?で提案されているように)
編集(要求に応じて):
glib2reactor selectreactor、pollreactor、およびepollreactorを試したすべてのreactorで問題が発生しました。epollreactorは最良の結果をもたらすようであり、以下に示す例では正常に機能するようですが、それでも私のアプリケーションで同じ(または同様の)問題が発生します。今後も調査を続けていきます。
Gentoo Linuxカーネル3.3および3.4、python 2.7を実行しており、Twisted 10.2.0、11.0.0、11.1.0、12.0.0、および12.1.0を試しました。
Sandy Bridgeマシンに加えて、デュアルコアAMDマシンでも同じ問題が発生します。
#!/usr/bin/python
# -*- coding: utf-8 *-*
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task
from multiprocessing import Process
from multiprocessing import Event
class TestA(Process):
def __init__(self):
super(TestA, self).__init__()
self.ready = Event()
self.ready.clear()
self.start()
def run(self):
reactor.callWhenRunning(self.reactor_ready)
reactor.run()
def reactor_ready(self, *args):
task.deferLater(reactor, 1, self.node_ready)
return args
def node_ready(self, *args):
print 'node_ready'
self.ready.set()
return args
def reactor_running():
print 'reactor_running'
df = threads.deferToThread(run_in_thread)
df.addCallback(print_done)
def run_in_thread():
print 'run_in_thread'
for n in processes:
n.ready.wait()
def print_done(dfResult=None):
print 'print_done'
reactor.stop()
if __name__ == '__main__':
processes = [TestA() for i in range(8)]
reactor.callWhenRunning(reactor_running)
reactor.run()