2

次のような構造のプログラムがあります。

main():
    LoopingCall(f1).start(1)
    LoopingCall(f2).start(10)
    reactor.run()

f1 には、いくつかのブロッキング操作を処理するいくつかの deferred があります。このようなもの:

f1():
    deferred = some_blocking_operation()
    deferred.addCallback(do_something_with_blocking_operartion_result)
    deffered.addErrback(do_something_in_case_of_error)

f2 はログを記録します。ねじれたコードはありません。

f1 で何が起こっても、f2 は 10 秒ごとにログを記録することを期待します。

しかし、そうではありません。f2 は、50 秒から 2 秒間隔の非常にワイルドな間隔で呼び出されます。

なぜこうなった?

編集1:

リクエストにより、f2 コード:

f2():
    logging.info("Last log: " + str(time.time() - self.last_log_time))
    self.last_log_time = time.time()

f2 はオブジェクトのメソッドですが、このオブジェクトはヘルパーであり、実際には操作を実行しません。オブジェクト f2 は、データ ホルダーの一部です。

編集 2

したがって、ロギング時間 (f2) は、f1 に必要な時間に直接関連しています。f1 が開始から終了まで 30 秒を必要とする場合、ログ間の時間 (f2) も 30 秒になります。f1 が some_blocking_operation() を待っている間に f2 を呼び出すべきではありませんか?

編集 2 の補遺:

f1 へのループ呼び出しをコメントすると、f2 のループ呼び出しが適切に機能する (10 秒ごとにログが記録される) ため、問題は明らかに f1 の結果です。

私がやっていることをよりよく説明するために、f1 のより詳細なスケッチを追加します。

編集3:

これは、f1 で何が起こっているかのより詳細なスケッチです。

class c1():
    def __init__(self):
        helper_object = c2() # f2 is part of c2, it is a helper function in a helper object

    # I timed the read time, it's in the miliseconds, something like 1.5436354746e for example.
    def read_from_pipe(self):
        data = ""
        try:
            while True:
                data += self.input_pipe.read(1)
        except IOError:
            pass
        return data

    # I timed this as well, it's also in the order of miliseconds.
    def write_to_pipe(to_write):
        self.output_pipe.write(to_write)

    def success(self, result):
        self.write_to_pipe("Good")

    def fail(reason):
        self.write_to_pipe("Bad")

    def process_item(self, item):
        deferred = some_blocking_operation_with_item(item)
        deferred.addCallback(do_another_blocking_operation) # This appear to be the longest in terms of time, however, it's still 1 or 2 seconds, not 80 (longest time between f2 calls so far was 83)
        deferred.addCallback(success)
        deffered.addErrback(fail)

    def schedule_input_for_processing(self, input):
        for item in input: # There are no more than 50 items at a time
            process_item(item)

    def f1(self):
        input = self.read_from_pipe() # read is non blocking - I do something like this
        self.schedule_input_for_processing(input)

ワークフローは次のようなものです。

  1. パイプから入力を取得します ( read はブロックされていないため、 f2 には影響しません)
  2. 入力がリストだとします。リストの各要素で何かを行います。
  3. リストの各要素に対して行われる「何か」は、ブロッキング操作です。ただし、それはかなり短く、通常は 0.5 ~ 1 秒以内です。
  4. その操作の完了時に、エラーが発生しなかった場合は、別のブロック操作を行う 2 番目のコールバックを起動します。通常、これには 1 ~ 2 秒かかります。
  5. 出力パイプへの書き込み

個々のブロック操作には少し時間がかかります。私はそれが複合効果であると推測し始めています。すべてinputに 50 個のアイテムがあるため、各アイテムが終了するのに 1 ~ 2 秒かかる場合、取得するログ時間は理にかなっています。

ただし、私の質問は残っています。入力内のアイテムがブロック操作を実行している間に f2() を起動するべきではありませんか?

編集 4

わかりましたので、動作することがわかっているバージョンに戻しました。コールバックは 10 秒ごとに発生していました。今、そうではありません。これは本当に奇妙です。入力のアイテムのサイズがこれと関係があるのではないかと思います。

4

0 に答える 0