51

Python で実装された GoF Observer の例はありますか? 現在、キークラスを介してデバッグコードのビットが含まれているビットコードがあります(現在、魔法のenvが設定されている場合、stderrにメッセージを生成しています)。さらに、このクラスには、後処理のために結果を (メモリに) 格納するだけでなく、結果を段階的に返すためのインターフェイスがあります。(クラス自体は、ssh を介してリモート マシンでコマンドを同時に実行するためのジョブ マネージャーです)。

現在、クラスの使用法は次のようになります。

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

別の使用モデルは次のとおりです。

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

これはすべて、現在のユーティリティでは正常に機能します。ただし、柔軟性に欠けます。たとえば、私は現在、簡単な出力形式または進行状況バーを増分結果としてサポートしています。関数の簡単で完全な「マージされたメッセージ」出力もサポートしていpost_process()ます。

ただし、複数の結果/出力ストリーム (ターミナルへの進行状況バー、ログ ファイルへのデバッグと警告、1 つのファイル/ディレクトリへの成功したジョブの出力、失敗したジョブからのエラー メッセージおよびその他の結果) をサポートしたいと考えています。など)。

これは、Observer を必要とする状況のように聞こえます...クラスのインスタンスを他のオブジェクトからの登録を受け入れ、発生時に特定のタイプのイベントでそれらをコールバックします。

SO関連の質問でそれへの言及をいくつか見たので、私はPyPubSubを見ています。ユーティリティに外部依存関係を追加する準備ができているかどうかはわかりませんが、他の人が使いやすくなるのであれば、自分のインターフェースをモデルとして使用する価値があると思います。(プロジェクトは、スタンドアロンのコマンド ライン ユーティリティと、他のスクリプト/ユーティリティを作成するためのクラスの両方として意図されています)。

要するに、私は自分がやりたいことをする方法を知っています...しかし、それを達成する方法はたくさんあります。長期的には、コードの他のユーザーにとって何が最も効果的かについての提案が必要です。

コード自体はclasshにあります。

4

9 に答える 9

60

ただし、柔軟性に欠けます。

ええと... 実際、非同期 API が必要な場合、これは良い設計のように思えます。通常はそうです。loggingおそらく、stderr から、独自のパブリッシュ/サブスクライブ モデルの一種を備えたPython のモジュールに切り替えるだけで済みLogger.addHandler()ます。

オブザーバーをサポートしたい場合は、シンプルに保つことをお勧めします。本当に数行のコードしか必要ありません。

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

Job クラスは をサブクラス化できObservableます。興味のあることが起こったら、電話self.fire(type="progress", percent=50)などをしてください。

于 2009-12-18T01:59:40.757 に答える
16

さらにいくつかのアプローチ...

例: ロギング モジュール

おそらく必要なのは、stderr からlogging、強力なパブリッシュ/サブスクライブ モデルを持つ Python のモジュールに切り替えることだけです。

ログ レコードの作成を開始するのは簡単です。

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

消費者側では、もう少し作業があります。残念ながら、ロガー出力の構成には、7 行のコードが必要です。;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

一方、logging パッケージには驚くほど多くのものがあります。ログ データを一連のローテーション ファイル、電子メール アドレス、および Windows イベント ログに送信する必要がある場合は、問題ありません。

例: 最も単純なオブザーバー

ただし、ライブラリをまったく使用する必要はありません。オブザーバーをサポートする非常に簡単な方法は、何もしないメソッドを呼び出すことです。

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

時々、ラムダを書く代わりに、単に と言うことができますjob.on_progress = progressBar.update。これは素晴らしいことです。

これは、それが得られるのと同じくらい簡単です。1 つの欠点は、同じイベントにサブスクライブする複数のリスナーを自然にサポートしないことです。

例: C# ライクなイベント

少しのサポート コードを使用すると、Python で C# のようなイベントを取得できます。コードは次のとおりです。

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

プロデューサーは、デコレーターを使用してイベントを宣言します。

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

これは上記の「単純なオブザーバー」コードとまったく同じように機能しますが、 を使用して好きなだけリスナーを追加できます+=。(C# とは異なり、イベント ハンドラーの型はありませんnew EventHandler(foo.bar)。イベントをサブスクライブするときに必要がなく、イベントを発生させる前に null をチェックする必要もありません。C# と同様に、イベントは例外をスケルチしません。)

選び方

logging必要なものがすべて揃っている場合は、それを使用してください。それ以外の場合は、自分にとって最も簡単なことを行ってください。注意すべき重要なことは、大きな外部依存を引き受ける必要がないということです。

于 2009-12-18T04:48:29.387 に答える
9

何かを観察しているという理由だけでオブジェクトが生き続けない実装はどうですか? 以下に、次の機能を備えたオブザーバー パターンの実装を示します。

  1. 使い方はpythonicです。.barインスタンスのバインドされたメソッドにオブザーバーを追加するにはfoo、単にfoo.bar.addObserver(observer).
  2. オブザーバーは、オブザーバーであることによって生かされているわけではありません。つまり、オブザーバー コードは強い参照を使用しません。
  3. サブクラス化は必要ありません (記述子 ftw)。
  4. ハッシュ不可能な型で使用できます。
  5. 1回のレッスンで何度でもご利用いただけます。
  6. (ボーナス) 今日の時点で、コードは githubの適切なダウンロード可能でインストール可能なパッケージに存在します。

コードは次のとおりです ( github パッケージまたはPyPI パッケージには最新の実装があります)。

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks = {}  #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances = {}  # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]

これを使用するには、観測可能にしたいメソッドを で装飾するだけです@event。これが例です

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()
于 2014-05-15T03:24:56.493 に答える
5

ウィキペディアから:

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

Observable はこのように使用されます。

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)
于 2009-12-14T23:57:24.550 に答える
4

Jason の回答に基づいて、C# のようなイベントの例を、ドキュメントとテストを含む本格的な Python モジュールとして実装しました。私は派手なpythonicのものを愛しています:)

したがって、すぐに使用できるソリューションが必要な場合は、github のコードを使用できます。

于 2013-08-02T16:38:59.053 に答える
2

例:ツイスト ログ オブザーバー

オブザーバー (ディクショナリを受け入れる呼び出し可能オブジェクト) を登録して、yourCallable()(他のオブザーバーに加えて) すべてのログ イベントを受け取るには:

twisted.python.log.addObserver(yourCallable)

例:完全な生産者/消費者の例

Twisted-Python メーリング リストから:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()
于 2009-12-15T00:09:43.060 に答える
2

OPは、「Pythonで実装されたGoF Observerの例はありますか?」と尋ねます。これはPython 3.7 での例です。この Observable クラスは、 1 つのオブザーバブル多くのオブザーバーの間の関係を作成するという要件を満たしながら、それらの構造から独立したままにします。

from functools import partial
from dataclasses import dataclass, field
import sys
from typing import List, Callable


@dataclass
class Observable:
    observers: List[Callable] = field(default_factory=list)

    def register(self, observer: Callable):
        self.observers.append(observer)

    def deregister(self, observer: Callable):
        self.observers.remove(observer)

    def notify(self, *args, **kwargs):
        for observer in self.observers:
            observer(*args, **kwargs)


def usage_demo():
    observable = Observable()

    # Register two anonymous observers using lambda.
    observable.register(
        lambda *args, **kwargs: print(f'Observer 1 called with args={args}, kwargs={kwargs}'))
    observable.register(
        lambda *args, **kwargs: print(f'Observer 2 called with args={args}, kwargs={kwargs}'))

    # Create an observer function, register it, then deregister it.
    def callable_3():
        print('Observer 3 NOT called.')

    observable.register(callable_3)
    observable.deregister(callable_3)

    # Create a general purpose observer function and register four observers.
    def callable_x(*args, **kwargs):
        print(f'{args[0]} observer called with args={args}, kwargs={kwargs}')

    for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']:
        observable.register(partial(callable_x, gui_field))

    observable.notify('test')


if __name__ == '__main__':
    sys.exit(usage_demo())
于 2018-10-22T12:37:03.970 に答える