3

ハンドブレーキを少し自動化したいので、Python で小さなプログラムを書きました。今、サブプロセスとスレッド化モジュールに問題があります。実行するハンドブレーキ プロセスの数を動的に変更したいと考えています。そして、ムービーを取得および配置するための queue モジュールを実装します。

CompressThreadは handbrake クラスの encode メソッドを呼び出し、 encoding 呼び出しをエンコードします_execute。ここで、ハンドブレーキ クラスで読み取った進行状況を、一元化されたコンプレッサー クラスに保存したいと考えています。そのため、進行状況を asocketserverおよび aに公開できwebguiます。いいえ、sqlite3データベースに書き込みますが、これは(スレッドの問題のため)削除する必要があり、プログラムの終了時にのみ保存されます。

データを集中的に保存する唯一の方法は、別のスレッドを作成し、CompressThreadクラスでデータをポーリングすることです。これに関する私の問題は、私のプログラムに 4 つのスレッドがあることです。

より良い解決策はありますか?多分データベースは間違っていないので、削除すべきではありませんか?

コンプレッサークラス:

class CompressThread(threading.Thread):
    """ Manage the queue of movies to be compressed
    """

    def __init__(self):
        threading.Thread.__init__(self)
        self._config = ConfigParser()
        self._config.process_config()
        self._handbrake = self._config.get_handbrake()
        self._lock = threading.Lock()

    def run(self):
        while True:
            movie_id = QUEUE.get()
            return_code = self._handbrake.encode(movie_id)
            print(return_code)
            QUEUE.task_done()


class Compressor(object):
    """ Compresses given mkv file

    Attributes:


    """

    __MAX_THREADS = 1

    def __init__(self):
        self._dest_audio_tracks = None
        self._log = None
        self.settings = None
        self.config = ConfigParser()
        self._database = db.DB()
        self._database.connect()
        self._running = True
        self._threads = []
        try:
            self.handbrake, self._log = self.config.process_config()
            self._log = logging.getLogger("Compressor")
        except ConfigError as error:
            raise Error("Config error: {0}".format(error))

    def process_file(self, input_file, output_file, title):
        if not os.path.exists(input_file):
            self._log.warning("Input file not exists: {0}".format(input_file))
            print("Input file not found: {0}".format(input_file))
        else:
            media_info = mediainfo.Mediainfo.parse(input_file)
            movie_settings = settings.Settings(input_file, title, output_file)
            movie_settings.parse(media_info)
            self._log.info("Added file {0} to list".format(movie_settings.input_file))
            QUEUE.put(self._database.insert_movie(movie_settings))

            print("File added.")

    def start(self):
        self._threads = [CompressThread() for i in range(self.__MAX_THREADS)]
        for thread in self._threads:
            thread.setDaemon(True)
            thread.start()
        while self._running:
            cmd = input("mCompress> ")
            if cmd == "quit":
                self._running = False
            elif cmd == "status":
                print("{0}".format(self._threads))
            elif cmd == "newfile":
                input_file = input("mCompress> newFile> Input filename> ")
                output_file = input("mCompress> newFile> Output filename> ")
                title = input("mCompress> newFile> Title> ")
                self.process_file(input_file, output_file, title)

    def _initialize_logging(self, log_file):
        try:
            self._log_file = open(log_file, "a+")
        except IOError as error:
            log_error = "Could not open log file {0}".format(error)
            self._log.error(log_error)
            raise IOError(log_error)
        self._log_file.seek(0)

if __name__ == "__main__":
    options_parser = OptionsParser()
    args = options_parser.parser.parse_args()
    if args.start:
        Compressor().start()

ハンドブレーキ クラスの一部:

def _execute(self, options):
    command = ["{0}".format(self._location)]
    if self._validate_options(options):
        for option in options:
            command.extend(option.generate_command())
        print(" ".join(command))
        state = 1
        returncode = None
        process = None
        temp_file = tempfile.TemporaryFile()
        try:
            process = subprocess.Popen(command, stdout=temp_file, stderr=temp_file, shell=False)
            temp_file.seek(0)
            while True:
                returncode = process.poll()
                if not returncode:
                    for line in temp_file.readlines():
                        p = re.search("Encoding:.*([0-9]{1,2}\.[0-9]{1,2}) % \(([0-9]{1,2}\.[0-9]{1,2}) fps, avg "
                                      "([0-9]{1,2}\.[0-9]{1,2}) fps, ETA ([0-9]{1,2}h[0-9]{1,2}m[0-9]{1,2})",
                                      line.decode("utf-8"))
                        if p is not None:
                            self._database.update_progress(p.group(1), p.group(2), p.group(3), p.group(4))
                else:
                    break
            temp_file.seek(0)
            print(temp_file.readline())
            self._write_log(temp_file.readlines())
            if returncode == 0:
                state = 5
            else:
                state = 100
                raise ExecuteError("HandBrakeCLI stopped with an exit code not null: {0}".format(returncode))
        except OSError as error:
            state = 105
            raise ExecuteError("CLI command failed: {0}".format(error))
        except KeyboardInterrupt:
            state = 101
        finally:
            try:
                process.kill()
            except:
                pass
            temp_file.close()
            return state
    else:
        raise ExecuteError("No option given")
4

1 に答える 1

2

計画していたことを正確に実行するだけです。

これが 4 つではなく 5 つのスレッドを持っていることを意味する場合は、どうすればよいでしょうか?

どのスレッドも CPU バウンドではありません。つまり、数値を計算したり、文字列を解析したり、その他の計算作業を行ったりするのではなく、I/O、外部プロセス、または別のスレッドを待機しているだけです。したがって、OS がスレッドをスムーズに処理できなくなるほど暴走しない限り、CPU にバインドされていないスレッドをさらに作成しても害はありません。何百ものです。

スレッドのいずれかCPU バウンドである場合、2 つでも多すぎます。CPython では、* スレッドは作業を行うためにグローバル インタープリター ロックを取得する必要があります**。そのため、スレッドは並列に実行されず、作業よりも GIL をめぐる争いに多くの時間を費やします。しかし、その場合でも、CPU バウンド スレッドがいっぱいになるキューで待機することにすべての時間を費やす別の非 CPU バウンド スレッドを追加しても、状況が以前よりも大幅に悪化することはありません。***


データベースに関しては…</p>

SQLite3 自体は、十分に新しいバージョンを使用している限り、マルチスレッドで問題ありません。sqlite3しかし、SQLite3 エンジンの非常に古いバージョンとの下位互換性のために、Pythonモジュールはそうではありません。詳細については、ドキュメントのマルチスレッドを参照してください。私の記憶が正しければ (サイトが一時的にダウンしているようで、確認できません)、pysqlite必要に応じて、スレッドをサポートするサードパーティ モジュール (stdlib モジュールのベース) をビルドできます。

ただし、データベースをあまり頻繁に使用していない場合は、単一のスレッドを実行してデータベースと通信し、他のスレッドをリッスンするキューを使用することは、完全に合理的な設計です。


* および PyPy ですが、必ずしも他の実装では必要ありません。

** 拡張モジュールは、Python から見える値に触れない限り、C で動作するように GIL を解放できます。NumPy のようないくつかのよく知られたモジュールは、これを利用しています。

*** 特に Python 3.1 以前では、待機中のスレッド自体が CPU バウンド スレッドによって妨げられる可能性がありますが、干渉することはありません。

于 2013-11-13T18:51:08.210 に答える