1

総括

  • 私は中規模のdjangoプロジェクトを持っています
  • (DB とは対照的に) メモリ内に多数のプレフィックス ツリーがあります。
  • これらのツリーのノードは、タイムアウトの対象となるエンティティ/オブジェクトを表します。つまり、さまざまな時点でこれらのノードをタイムアウトにする必要があります

デザイン:

  • 基本的に、リセット可能なワンショット タイマーを起動し、関連付けて、タイマーを作成するエンティティ (この場合はツリーのノード) に対して何らかの操作を実行できるコールバックを提供できるようにするタイマー コンストラクトが必要でした。

さまざまなオプションを調べた後、ネイティブに使用できるものは見つかりませんでした (django アプリなど)。Python の Timer オブジェクトは、スケーリング/実行されないため、これには適していません。したがって、以下に基づいて独自のタイマーを作成することにしました。

  1. タイム ホライズンを保持するタイム デルタ オブジェクトのソートされたリスト
  2. 「ダニ」をトリガーするメカニズム

実装の選択肢:

  1. ソートされたデルタ リストの Bisect のラッパーを使用しました: http://code.activestate.com/recipes/577197-sortedcollection/
  2. 目盛りを提供するためにセロリを使用しました-1分の粒度で、ワーカーはタイマークラスによって提供される timer_tick 関数をトリガーします。timer_tick は基本的に、ソートされたリストを通過し、ティックごとにヘッド ノードをデクリメントする必要があります。次に、いずれかのノードが 0 になった場合、コールバックを開始し、ソート済みタイマー リストからそれらのノードを削除します。
  3. タイマーを作成するには、オブジェクトの ID を返す Timer オブジェクトをインスタンス化する必要があります。この ID は db に格納され、タイマーを作成するエンティティを表す DB のエントリに関連付けられます

追加のデータ構造: Timer インスタンス (タイマーの作成ごとにインスタンス化される) を追跡するために、id を obj にマップする WeakRef ディクショナリがあります。

したがって、基本的に、メインの Django プロジェクトのメモリには 2 つのデータ構造があります。

問題文:

セロリ ワーカーはタイマー リストを調べ、id2obj マップを潜在的に変更する必要があるため、セロリ ワーカーとメインの間で状態を共有する方法を見つける必要があるようです。

SO/Google を調べたところ、次の提案が見つかりました

  1. マネジャー
  2. 共有メモリ

残念ながら、bisect ラッパーは、ピクルス化や状態共有にはあまり適していません。辞書を作成し、並べ替えられたリストを辞書内に埋め込もうとすることで、マネージャーのアプローチを試みました..エラーが発生しました(並べ替えられたリストによって保持されているメモリは共有されておらず、" shared" メモリ オブジェクトは機能しません)

最後に...質問:

  1. SortedCollection と Weakref Dict をワーカー スレッドと共有する方法はありますか

代替ソリューション:

ワーカースレッドをシンプルに保つのはどうですか...ティックごとにDBに書き込み、ポストDbシグナルを使用してメインで通知を受け取り、メインで期限切れのタイマーの処理を実行します。もちろん、短所は、並列化が失われることです。

4

1 に答える 1

0

既存の実装に関するいくつかのコメントから始めましょう。

ソートされたデルタ リストの Bisect のラッパーを使用しました: http://code.activestate.com/recipes/577197-sortedcollection/

これにより、O(1) 個の pop が得られますが (リストを時間の逆順で保持している限り)、各挿入は O(N) になります (「キャンセル」API がある場合、任意のジョブを削除するなどのあまり一般的でない操作についても同様です)。 )。ポップとまったく同じ数の挿入を行っているため、アルゴリズム全体がソートされていないリストよりも優れているわけではありません。

heapqこれを a (まさにそれが目的です) に置き換えると、 O(log N) の挿入が得られます。(Pythonheapqには がありませんが、peekこれheap[0]は が と同等heap.peek(0)であるため、必要ないことに注意してください。)

他の操作 (キャンセル、非破壊反復など) O(log N) も行う必要がある場合は、検索ツリーが必要です。いくつかの良いものについては、PyPI をblist参照してください。bintrees


目盛りを提供するためにセロリを使用しました-1分の粒度で、ワーカーはタイマークラスによって提供される timer_tick 関数をトリガーします。timer_tick は基本的に、ソートされたリストを通過し、ティックごとにヘッド ノードをデクリメントする必要があります。次に、いずれかのノードが 0 になった場合、コールバックを開始し、ソート済みタイマー リストからそれらのノードを削除します。

デルタの代わりに目標時間を維持する方がはるかに優れています。ターゲットタイムでは、これを行うだけです:

while q.peek().timestamp <= now():
    process(q.pop())

繰り返しますが、これは O(N) ではなく O(1) であり、はるかに単純であり、キュー上の要素を不変として扱い、ティック時間よりも長くかかる反復で発生する可能性のある問題を回避します (おそらく問題ではありません) 1分刻みで…)。


さて、あなたの主な質問に進みます:

SortedCollection を共有する方法はありますか

はい。ペアの優先ヒープだけが必要な場合は、長さを明示的に追跡する必要があることを除いて、 と同じくらい簡単に(timestamp, id)それを に収めることができます。次に、すべての操作を同期する必要があります。それだけです。multiprocessing.Arraylist

1 分間に 1 回しか刻んでおらず、頻繁に忙しいことが予想される場合は、 a を使用しLockて同期し、スケジュール ワーカー自体を刻むことができます。

しかし、正直なところ、目盛りを完全に削除して、Conditionより柔軟で、概念的には単純です (コードが少し増えますが)。これは、実行する作業がないときに 0% の CPU を使用していることを意味します。負荷がかかっているときに迅速かつスムーズに応答します。例えば:

def schedule_job(timestamp, job):
    job_id = add_job_to_shared_dict(job) # see below
    with scheduler_condition:
        scheduler_heap.push((timestamp, job))
        scheduler_condition.notify_all()

def scheduler_worker_run_once():
    with scheduler_condition:
        while True:
            top = scheduler_heap.peek()
            if top is not None:
                delay = top[0] - now()
                if delay <= 0:
                    break
                scheduler_condition.wait(delay)
            else:
                scheduler_condition.wait()
        top = scheduler_heap.pop()
        if top is not None:
            job = pop_job_from_shared_dict(top[1])
            process_job(job)

とにかく、それは仕事でいっぱいのweakdictに私たちをもたらします.

weakdict はインプロセス オブジェクトへの参照を明示的に格納しているため、プロセス間で共有しても意味がありません。保存したいのは、変更可能なジョブ自体ではなく、ジョブが実際に何であるかを定義する不変オブジェクトです。それから、それはただの古い辞書です。

それでも、昔ながらの dict をプロセス間で共有するのは簡単なことではありません。

これを行う簡単な方法は、メモリ内の の代わりにdbmデータベース (またはデータベースのラッパー) を使用し、. しかし、これは誰かがデータベースを変更しようとするたびにデータベースを再フラッシュして再度開くことを意味し、受け入れられない場合があります。shelvedictLock

たとえば、sqlite3 データベースへの切り替えはやり過ぎのように思えるかもしれませんが、はるかに簡単な場合があります。

一方、ここで実際に行う操作は、「次の ID をこのジョブにマップして ID を返す」と「この ID で指定されたジョブをポップして返す」だけです。それは本当に口述である必要がありますか?キーは整数であり、それらを制御します。、プラス次のキーArrayのシングル、および 、これでほぼ完了です。問題は、キーのオーバーフローに対して何らかのスキームが必要なことです。の代わりに、ロールオーバーして、既に使用されているスロットを確認する必要があります。ValueLocknext_id += 1

with lock:
    next_id += 1
    if next_id == size: next_id = 0
    if arr[next_id] is None:
        arr[next_id] = job
        return next_id

別のオプションは、辞書をメインプロセスに保存し、 a を使用Queueして他のプロセスにクエリを実行させることです。

于 2013-06-06T00:36:04.450 に答える