2

Pythonでマルチスレッドアプリケーションを開発しています。次のシナリオがあります。

  1. DB と通信し、いくつかのデータを大きなチャンクで取得してキューに入れる 2 ~ 3 のプロデューサー スレッドがあります。
  2. プロデューサースレッドによってフェッチされた大きなチャンクを小さなチャンクに分割し、それらを別のキューに入れる中間ワーカーがあります。
  3. 中間ワーカー スレッドによって作成されたキューを消費する 5 つのコンシューマ スレッドがあります。
  4. データ ソースのオブジェクトは、API を介してプロデューサー スレッドによってアクセスされます。これらのデータ ソースは完全に分離されています。したがって、これらのプロデューサは、データ ソース オブジェクトによって提供されるはずのデータの有無のみを理解します。
  5. これら 3 つのタイプのスレッドを作成し、これらのスレッドで join() を呼び出して、メイン スレッドがこれらのスレッドの完了を待機するようにします。

このようなセットアップでは、スレッドの失敗や例外を感知して何をすべきかを決定する共通のエラー ハンドラが必要です。たとえば、アプリケーションの起動後に ctrl+c を押すと、メイン スレッドは終了しますが、プロデューサー スレッドとコンシューマー スレッドは引き続き実行されます。ctrl+c を押すと、アプリケーション全体がシャットダウンするようにしたいと思います。同様に、データ ソース モジュールで何らかの DB エラーが発生した場合、プロデューサー スレッドはそれを通知される必要があります。

これは私がこれまでに行ったことです:

クラス ThreadManager を作成しました。そのオブジェクトはすべてのスレッドに渡されます。エラー ハンドラ メソッドを作成し、 に渡しましたsys.excepthook。このハンドラーは例外、エラーをキャッチし、ThreadManager クラスのメソッドを呼び出して実行中のスレッドを制御する必要があります。スニペットは次のとおりです。

class Producer(threading.Thread):
    ....
    def produce():
        data = dataSource.getData()

class DataSource:
    ....
    def getData():
        raise Exception("critical")

def customHandler(exceptionType, value, stackTrace):
     print "In custom handler"

sys.excepthook = customHandler

これで、プロデューサー クラスのスレッドが DataSource クラスの getData() を呼び出すと、例外がスローされます。しかし、この例外は私の customHandler メソッドによってキャッチされることはありません。

私は何が欠けていますか?また、そのようなシナリオでは、他にどのような戦略を適用できますか? 助けてください。これをすべて読むのに十分な忍耐力を持ってくれてありがとう:)

4

2 に答える 2

0

Ctrl + Cを押したときに「KeyboardInterrupt」をキャッチして実行することはできません:

for thread in threading.enumerate():
    thread._Thread__stop()
    thread._Thread__delete()
while len(threading.enumerate()) > 1:
    time.sleep(1)
os._exit(0)

スレッド化された各クラスにself.aliveのフラグがあり、理論的にはthread.alive = Falseと呼んで、正常に停止させることができますか?

for thread in threading.enumerate():
    thread.alive = False
    time.sleep(5) # Grace period
    thread._Thread__stop()
    thread._Thread__delete()
while len(threading.enumerate()) > 1:
    time.sleep(1)
os._exit(0)

例:

import os
from threading import *
from time import sleep

class worker(Thread):
    def __init__(self):
        self.alive = True
        Thread.__init__(self)
        self.start()
    def run(self):
        while self.alive:
            sleep(0.1)

runner = worker()

try:
    raw_input('Press ctrl+c!')
except:
    pass
for thread in enumerate():
    thread.alive = False
    sleep(1)
    try:
        thread._Thread__stop()
        thread._Thread__delete()
    except:
        pass
# There will always be 1 thread alive and that's the __main__ thread.
while len(enumerate()) > 1:
    sleep(1)
os._exit(0)

内部システムの例外ハンドラーを変更してみてください。

import sys
origExcepthook = sys.excepthook
def uberexcept(exctype, value, traceback):
    if exctype == KeyboardInterrupt:
        print "Gracefully shutting down all the threads"
        # enumerate() thingie here.
    else:
        origExcepthook(exctype, value, traceback)
sys.excepthook = uberexcept
于 2012-12-03T10:38:36.987 に答える