2

Amazon SQS から読み取り、ユーザーが必要な数の並列プロセスを作成する Python スクリプトを作成しました。Django BaseCommand を継承しており、これがコードです。

def handle(self, *args, **kwargs):
    self.set_up(*args, **kwargs)
    process_queue = JoinableQueue(self.threads)
    process_pool = Pool(
        self.threads,
        self.worker_process,
        (process_queue,)
    )

    is_queue_empty = False
    while not is_queue_empty:
        message = self.get_next_message()
        if len(message) == 0:
            is_queue_empty = True
        else:
            process_queue.put(message[0])
    process_queue.join()
    raise CommandError('Number retries exceeded retry limit')

def worker_process(self, process_queue):
    while True:
        message = process_queue.get(True)
        message_tuple = (message)
        self.process_message(message_tuple)
        process_queue.task_done()

これは正常に機能しており、タスクが完了するとすべてのプロセスが強制終了されます。ただし、ボイラーパイプを使用してデータを抽出する特定のアクティビティではありません。

from boilerpipe.extract import Extractor
extractor = Extractor(extractor='DefaultExtractor', html=soup_html)
extractor.getText()

ボイルパイプ コードを調べると、Extractor のコンストラクターに次のコードがあることがわかりました。

lock = threading.Lock()
class Extractor():
    def __init__():
        # code
        try:
            # code
            lock.acquire()
            # code
        finally:
            lock.release()

完全なコードはこれです

  1. プロセスが強制終了されないのはなぜですか、マルチ処理のやり方に何か問題がありますか?
  2. それとも、このスレッドのロックが問題を引き起こしているのでしょうか (私にはよくわかりませんが、何が問題なのかを考えただけです)。

アドバイスをください、事前に感謝します。

4

0 に答える 0