1

Simpy で同期の問題に直面しています。つまり、イベントがコンピューターによって処理された順序で処理されないということです。イベントがコンピューターによってキューに入れられ、並べ替えられ、処理される方法について、既に Simpy のドキュメントに記載されているよりも多くの情報を探しています。それらがトリガーされなければならない時間に関してソートされていることをどこでも見つけました。残りを読む前に、私にアドバイスできるリンクやドキュメントを誰か教えてください。

より具体的には、実世界のシステム (PoolSystem クラスのインスタンス) をモデル化してシミュレートしようとしています。これは、サブサブシステムにさらに分割するか、失敗する可能性のあるサブシステムのプールです (システムの最後のカテゴリ)。アトミックシステムと呼ばれます)。つまり、PoolSystem は、PoolSystem または AtomicSystem のいずれかであるサブシステムで構成されています。

たとえば、車はこの PoolSystem クラスのインスタンスで、エンジンをサブシステムとして持つことができます。しかし、エンジンはピストンやスパーク プラグなど、いくつかのサブシステムに分解される可能性があり、実際にはこれらのサブシステムが故障する可能性があります。この場合、エンジンは PoolSystem インスタンスとして定義され、ピストンとスパーク プラグは AtomicSystem インスタンスとして定義されます。

AtomicSystem クラスと PoolSystem クラスは、同じ標準モデルに基づいています。どちらも次のとおりです。

  • 特定のサブシステムの障害がシステム全体の障害を引き起こす場合 (つまり、他のすべてのサブシステムを中断する必要があることを意味します)、「True」である「クリティカル」ブール属性
  • システムがそのサブシステム (存在する場合) と通信するためのシグナルとして機能する「update_order」イベント
  • サブシステムがシステムに失敗したことを知らせるシグナルである「dysfunction_signal」イベント
  • 特定のシステムが正常に動作しない場合、または上位レベルのシステムによって中断された場合にトリガーされる「中断」イベント
  • サブシステムが更新を終了したことを上位システムに通知するシグナルとして機能する「update_end」イベント
  • 特定のシステムの運用サービスをシミュレートするプロセスである「ライフサイクル」属性

次のスキーマが、今読んだ内容を理解するのに役立つことを願っています: プール システムとして定義された車の内訳

このスキーマでは、自動車はエンジンとタイヤをサブシステムとする PoolSystem インスタンスとして定義されています。タイヤは自動車の故障の直接の原因となる可能性があるため、AtomicSystem インスタンスとして定義されています。エンジンは、サブシステムがピストンとスパーク プラグである別の PoolSystem として定義されます。これらは故障する可能性があるため、AtomicSystem インスタンスとして定義されます。

クラス AtomicSystem は以下にあります。

class AtomicSystem(object):
def __init__(self, env, mtbd, backlog, user_defined_critical=True, ids=None):
    self.env = env                                       # environment()    
    self.mtbd = mtbd                                     # mean time between dysfunction
    self.critical = user_defined_critical                # boolean
    self.ids = ids                                       # list of strings
    self.ttd = self.time_to_dysfunction()                # time before dysfunction
    self.update_order = self.env.event()                 
    self.dysfunction_signal = self.env.event()           
    self.interrupted = self.env.event()
    self.update_end = self.env.event()
    self.lifecycle = self.env.process(self.run(backlog))

def time_to_dysfunction(self):
    return self.mtbd

def run(self, backlog):
    # the atomic system starts service when its update_order event is triggered
    yield self.update_order
    print("t = " + str(self.env.now) + " : " + self.ids[-1] + " starts service.")
    self.update_order = self.env.event()

    # atomic system specifies to higher level system that it has started service
    self.update_end.succeed()
    self.update_end = self.env.event()

    try:
        # as long as the atomic system remains in this while loop, it is said to be in service.
        while True:
            start = self.env.now
            time_out = self.env.timeout(self.ttd)

            # wait for a dysfunction (time_out) or interruption (interrupted) or an update from a higher level system (update_order)
            result = yield time_out | self.interrupted | self.update_order

            if time_out in result:
                print("t = " + str(self.env.now) + " : " + self.ids[-1] + " fails.")

                # if the atomic system fails, trigger dysfunction_signal event destined to be detected by higher level system
                self.dysfunction_signal.succeed()
                # when the atomic system fails, its interrupted event is automatically triggered 
                self.interrupted.succeed()
                if self.ttd > 0:
                    backlog.append({"Dysfunction time": self.env.now, "IDs": self.ids})
                self.ttd = 0

            if self.interrupted.triggered:
                print("t = " + str(self.env.now) + " : " + self.ids[-1] + " interrupts service.")
                if self.ttd > 0:
                    operation_duration = self.env.now - start
                    self.ttd -= operation_duration

                # the atomic system waits for update_order trigger when it has been interrupted
                yield self.update_order


            if self.update_order.triggered:
            # here, the atomic system returns to service
                print("t = " + str(self.env.now) + " : " + self.ids[-1] + " is updated.")
                if self.ttd > 0:
                    operation_duration = self.env.now - start
                    self.ttd -= operation_duration
                self.update_end.succeed()
                self.update_order = self.env.event()
                self.dysfunction_signal = self.env.event()
                self.interrupted = self.env.event()
                self.update_end = self.env.event()

    except:
    # here the atomic system is terminated (end of service)
        print("t = " + str(self.env.now) + " : " + self.ids[-1] + " is terminated.")
        self.env.exit()

クラス PoolSystem は以下にあります。

class PoolSystem(object):
def __init__(self, env, id, init_subsystems, user_defined_critical=True):
    self.env = env
    self.id = id
    self.subsystems = init_subsystems
    self.working_subsystems = [self.subsystems[key] for key in self.subsystems.keys()]
    self.critical = user_defined_critical
    self.update_order = self.env.event()
    self.dysfunction_signal = simpy.AnyOf(self.env, [syst.dysfunction_signal for syst in self.working_subsystems])
    self.interrupted = self.env.event()
    self.update_end = self.env.event()
    self.lifecycle = self.env.process(self.run())

def start_subsystems(self):
    for key in self.subsystems.keys():
        self.subsystems[key].update_order.succeed()

def run(self):
    user_defined_critical = self.critical

    # the pool system is started here when its update_order event is triggered
    yield self.update_order
    print("t = " + str(self.env.now) + " : " + self.id + " starts service.")
    self.update_order = self.env.event()

    # Here, the pool system starts all of its subsystems (which can be atomic and/or pool systems)
    self.start_subsystems()

    # here, update_end is triggered if all the update_end events of the subsystems have been triggered
    self.update_end = simpy.AllOf(self.env, [self.subsystems[key].update_end for key in self.subsystems.keys()])
    yield self.update_end
    try:
        while True:

            # wait for a dysfunction (dysfunction_signal), interruption (interrupted) or an update from a higher level system (update_order)
            yield self.dysfunction_signal | self.interrupted | self.update_order


            if self.dysfunction_signal.triggered:
                crit = []
                for syst in self.working_subsystems:
                    if syst.dysfunction_signal.triggered:
                        crit.append(syst.critical)
                if True in crit: # if one of the failed subsystems is critical (critical = True), then trigger interrupted event()
                    print("t = " + str(self.env.now) + " : " + self.id + " fails completely.")
                    # pool system is interrupted
                    self.critical = user_defined_critical
                    self.interrupted.succeed()
                else:
                    # no critical subsystem has failed yet so the pool system can continue working (no interruption here)
                    self.critical = False
                    self.working_subsystems = [self.subsystems[key] for key in self.subsystems.keys() if
                                               not self.subsystems[key].interrupted.triggered]
                    if len(self.working_subsystems) is not 0:
                        print("t = " + str(self.env.now) + " : " + self.id + " fails partially.")
                        self.dysfunction_signal = simpy.AnyOf(self.env, [syst.dysfunction_signal for syst in
                                                                         self.working_subsystems])
                    else:
                    # pool system is interrupted if all of its subsystems have failed
                        print("t = " + str(self.env.now) + " : " + self.id + " fails completely (no working EUs).")
                        self.interrupted.succeed()

            if self.interrupted.triggered:
                print("t = " + str(self.env.now) + " : " + self.id + " interrupts service.")
                # interrupt all subsystems
                for key in self.subsystems.keys():
                    if not self.subsystems[key].interrupted.triggered:
                        self.subsystems[key].interrupted.succeed()

                # waits for update_order from higher level system
                yield self.update_order

            if self.update_order.triggered:
                print("t = " + str(self.env.now) + " : " + self.id + " is updated.")
                # update_order has been troggered by higher level system                    
                self.update_order = self.env.event()
                self.start_subsystems()
                self.update_end = simpy.AllOf(self.env,
                                              [self.subsystems[key].update_end for key in self.subsystems.keys()])
                # wait for the end of the update of the subsystems
                yield self.update_end
                print("t = " + str(self.env.now) + " : " + self.id + " receives update-end signal.")
                self.working_subsystems = [self.subsystems[key] for key in self.subsystems.keys()]
                self.dysfunction_signal = simpy.AnyOf(self.env,
                                                      [syst.dysfunction_signal for syst in self.working_subsystems])
                self.interrupted = self.env.event()


    except simpy.Interrupt:
    # here the pool system is terminated, it leaves service.
        for key in self.subsystems.keys():
            self.subsystems[key].lifecycle.interrupt()
        self.env.exit()

Eu (AtomicSystem から継承) と ModSat (PoolSystem から継承) の 2 つのクラスを定義しました。基本的に、いくつかの Eu オブジェクト (2 つのシステム レベルのみ) から modsat オブジェクトを構築しています。以下のコードを投稿しました。

class Eu(AtomicSystem):
def __init__(self, env, identity, mtbd, backlog, critical=True, ids=None):
    self.id = identity
    ids.append(self.id)
    AtomicSystem.__init__(self, env, mtbd, backlog, critical, ids)

class ModSat(PoolSystem):
def __init__(self, env, digit_id, eu_mtbds_criticals, backlog, critical=True):
    identity = "ModSat" + str(digit_id)
    self.eus = self.initialize(env, identity, eu_mtbds_criticals, backlog)
    PoolSystem.__init__(self, env, identity, self.eus, critical)

def initialize(self, env, identity, eu_mtbds_criticals, backlog):
    eus = {}
    for i in range(1, len(eu_mtbds_criticals) + 1):
        eu_id = "EU" + str(i) + ":" + identity
        eu = Eu(env, eu_id, eu_mtbds_criticals[i - 1][0], backlog, eu_mtbds_criticals[i - 1][1], [identity])
        eus[eu_id] = eu
    return eus

最後に、ModSat オブジェクトをテストし、modsat オブジェクトの正常な動作に影響を与えずに、故障した modsat オブジェクトのサブシステム (Eu タイプ) の 1 つを簡単に交換できるかどうかを確認したいと思いました。modsat オブジェクトとやり取りできるシミュレート関数を作成しました。以下で定義された 2 つの modsat オブジェクトを使用してテストを実行しました。

backlog = []
eu_mtbds_criticals1 = [[5, False], [11, False], [19, False]]
eu_mtbds_criticals2 = [[4, False], [27, False], [38, False]]
env = simpy.Environment()
sat1 = ModSat(env, 1, eu_mtbds_criticals1, backlog, True)
sat2 = ModSat(env, 2, eu_mtbds_criticals2, backlog, True)
constellation = {'ModSat1': sat1, 'ModSat2': sat2}
env.process(simulate(constellation, env, backlog))
env.run(until=100)

最初のテストは、次のシミュレート関数を使用した非常に単純なものでした。

def simulate(constellation, env, backlog):
for key in constellation.keys():
    # start service of each ModSat object included in the constellation dictionary, 
    # by triggering their update_order event.
    constellation[key].update_order.succeed()

# wait for a while to be sure that the modsat objects have been completely simulated.
yield env.timeout(50)

すべてのイベントがコンピューターによって正しい順序でトリガーおよび処理されているように見えるため、出力は私が望んでいたものです。

# the 1st update_order event of PoolSystem is triggered
t = 0 : ModSat1 starts service.
t = 0 : ModSat2 starts service.
# the 1st update_order event of AtomicSystem is triggered
t = 0 : EU1:ModSat1 starts service.
t = 0 : EU3:ModSat1 starts service.
t = 0 : EU2:ModSat1 starts service.
t = 0 : EU2:ModSat2 starts service.
t = 0 : EU1:ModSat2 starts service.
t = 0 : EU3:ModSat2 starts service.
# 1st failure here. Since critical attribute of EU1:ModSat2 is set to False ModSat2 is not interrupted (partial failure)
t = 4 : EU1:ModSat2 fails.
t = 4 : EU1:ModSat2 interrupts service.
t = 4 : ModSat2 fails partially.
# 2nd failure here
t = 5 : EU1:ModSat1 fails.
t = 5 : EU1:ModSat1 interrupts service.
t = 5 : ModSat1 fails partially.
t = 11 : EU2:ModSat1 fails.
t = 11 : EU2:ModSat1 interrupts service.
t = 11 : ModSat1 fails partially.
# here the last failure of ModSat1: ModSat1 is interrupted because it has no more working Eus
t = 19 : EU3:ModSat1 fails.
t = 19 : EU3:ModSat1 interrupts service.
t = 19 : ModSat1 fails completely (no working EUs).
t = 19 : ModSat1 interrupts service.
t = 27 : EU2:ModSat2 fails.
t = 27 : EU2:ModSat2 interrupts service.
t = 27 : ModSat2 fails partially.
# here the last failure of ModSat2: ModSat2 is interrupted because it has no more working Eus
t = 38 : EU3:ModSat2 fails.
t = 38 : EU3:ModSat2 interrupts service.
t = 38 : ModSat2 fails completely (no working EUs).
t = 38 : ModSat2 interrupts service.

ここで、次のシミュレート関数を使用してコードをテストしたいと思います。

def simulate(constellation, env, backlog):
    for key in constellation.keys():
    # start service of each ModSat object included in the constellation dictionary, 
    # by triggering their update_order event.
        constellation[key].update_order.succeed()


    # detect failure
    request_signal = simpy.AnyOf(env, [constellation[key].dysfunction_signal for key in constellation.keys()])
    yield request_signal

    # The servicer's backlog is updated with the first item of the backlog list
    print("t = " + str(env.now) + " : a service request is detected.")
    servicer_backlog = []
    servicer_backlog.append(backlog[0])
    del backlog[0]

    # the next line models the servicer time of service
    yield env.timeout(5)

    # The servicer gets the ID of the failed Eu to replace from its backlog
    sat_id = servicer_backlog[0]['IDs'][0]
    eu_id =  servicer_backlog[0]['IDs'][1]
    failed_eu = constellation[sat_id].eus[eu_id]
    # the servicer gives the values of the attributes of the failed EU to the new EU
    new_eu = Eu(failed_eu.env, failed_eu.id, failed_eu.mtbd, backlog, failed_eu.critical, failed_eu.ids)
    # the failed eu is terminated (its service ends)
    failed_eu.lifecycle.interrupt()
    # the new EU replaces the failed_eu
    constellation[sat_id].eus[eu_id] = new_eu
    # the modsat concerned by the replacement has its update_order event triggered
    constellation[sat_id].update_order.succeed()
    print("t = " + str(env.now) + " : a service is provided")

上記のシミュレート関数は、最初に失敗した Eu を新しいものに置き換えることをモデル化しているだけです。出力は次のとおりです。

# the 1st update_order event of PoolSystem is triggered
t = 0 : ModSat1 starts service.
t = 0 : ModSat2 starts service.
# the 1st update_order event of AtomicSystem is triggered
t = 0 : EU3:ModSat1 starts service.
t = 0 : EU2:ModSat1 starts service.
t = 0 : EU1:ModSat1 starts service.
t = 0 : EU1:ModSat2 starts service.
t = 0 : EU2:ModSat2 starts service.
t = 0 : EU3:ModSat2 starts service.
t = 0 : ModSat1 receives update-end signal.
t = 0 : ModSat2 receives update-end signal.
# the first Eu of modsat2 fails, and its failure is detected by the simulate function
t = 4 : EU1:ModSat2 fails.
t = 4 : EU1:ModSat2 interrupts service.
t = 4 : a service request is detected.
t = 4 : ModSat2 fails partially.
# HERE IS MY CONCERN: at time t = 5, EU1 of modsat1 fails and interrupts service. However, there should be a line "t = 5 : ModSat1 fails partially" which does not appear... 
t = 5 : EU1:ModSat1 fails.
t = 5 : EU1:ModSat1 interrupts service.
t = 9 : a service is provided
t = 9 : EU1:ModSat2 is terminated.
t = 9 : ModSat2 is updated.
t = 9 : EU1:ModSat2 starts service.
t = 9 : EU2:ModSat2 is updated.
t = 9 : EU3:ModSat2 is updated.
t = 9 : ModSat2 receives update-end signal.
t = 11 : EU2:ModSat1 fails.
t = 11 : EU2:ModSat1 interrupts service.
t = 13 : EU1:ModSat2 fails.
t = 13 : EU1:ModSat2 interrupts service.
t = 13 : ModSat2 fails partially.
t = 19 : EU3:ModSat1 fails.
t = 19 : EU3:ModSat1 interrupts service.
t = 27 : EU2:ModSat2 fails.
t = 27 : EU2:ModSat2 interrupts service.
t = 27 : ModSat2 fails partially.
t = 38 : EU3:ModSat2 fails.
t = 38 : EU3:ModSat2 interrupts service.
t = 38 : ModSat2 fails completely (no working EUs).
t = 38 : ModSat2 interrupts service.

上記のように、「t = 5 : EU1:ModSat1 がサービスを中断する」という行の後に、「t = 5 : ModSat1 が部分的に失敗する」という行があるはずです。しかし代わりに、コンピューターはシミュレート関数の「yield env.timeout(5)」の後の最初の行に直接ジャンプします。

ここで何が起こっているのか理解できません。これは、Simpy によってイベントのキューがどのように定義およびソートされるかについての知識が不足しているためだと思います。ここで何が起こっているのかについてのヒントをオンラインで見つけることができませんでした。この種の質問は、stackoverflow や他のフォーラムで見たことがありません。喜んで助けていただければ幸いです。

私のコードは説明するにはかなり長いので、投稿したコードのコメントで十分であることを願っています :\

どうもありがとうございました!

4

1 に答える 1

0

私は(ついに)SimPyで時間についてのガイドを始めました。まだ WIP ですが、ここで議論をフォローできます。

于 2016-12-12T11:07:48.850 に答える