Pythonでマルチスレッドアプリケーションを開発しています。次のシナリオがあります。
- DB と通信し、いくつかのデータを大きなチャンクで取得してキューに入れる 2 ~ 3 のプロデューサー スレッドがあります。
- プロデューサースレッドによってフェッチされた大きなチャンクを小さなチャンクに分割し、それらを別のキューに入れる中間ワーカーがあります。
- 中間ワーカー スレッドによって作成されたキューを消費する 5 つのコンシューマ スレッドがあります。
- データ ソースのオブジェクトは、API を介してプロデューサー スレッドによってアクセスされます。これらのデータ ソースは完全に分離されています。したがって、これらのプロデューサは、データ ソース オブジェクトによって提供されるはずのデータの有無のみを理解します。
- これら 3 つのタイプのスレッドを作成し、これらのスレッドで join() を呼び出して、メイン スレッドがこれらのスレッドの完了を待機するようにします。
このようなセットアップでは、スレッドの失敗や例外を感知して何をすべきかを決定する共通のエラー ハンドラが必要です。たとえば、アプリケーションの起動後に ctrl+c を押すと、メイン スレッドは終了しますが、プロデューサー スレッドとコンシューマー スレッドは引き続き実行されます。ctrl+c を押すと、アプリケーション全体がシャットダウンするようにしたいと思います。同様に、データ ソース モジュールで何らかの DB エラーが発生した場合、プロデューサー スレッドはそれを通知される必要があります。
これは私がこれまでに行ったことです:
クラス ThreadManager を作成しました。そのオブジェクトはすべてのスレッドに渡されます。エラー ハンドラ メソッドを作成し、 に渡しましたsys.excepthook
。このハンドラーは例外、エラーをキャッチし、ThreadManager クラスのメソッドを呼び出して実行中のスレッドを制御する必要があります。スニペットは次のとおりです。
class Producer(threading.Thread):
....
def produce():
data = dataSource.getData()
class DataSource:
....
def getData():
raise Exception("critical")
def customHandler(exceptionType, value, stackTrace):
print "In custom handler"
sys.excepthook = customHandler
これで、プロデューサー クラスのスレッドが DataSource クラスの getData() を呼び出すと、例外がスローされます。しかし、この例外は私の customHandler メソッドによってキャッチされることはありません。
私は何が欠けていますか?また、そのようなシナリオでは、他にどのような戦略を適用できますか? 助けてください。これをすべて読むのに十分な忍耐力を持ってくれてありがとう:)