41

CQRSベースのシステムでのコマンドハンドラー、アグリゲート、リポジトリ、およびイベントストア間の関係の詳細を理解したいと思います。

私がこれまでに理解したこと:

  • コマンドハンドラはバスからコマンドを受け取ります。彼らは、リポジトリから適切なアグリゲートをロードし、アグリゲートのドメインロジックを呼び出す責任があります。終了すると、バスからコマンドを削除します。
  • アグリゲートは、動作と内部状態を提供します。州は決して公開されません。状態を変更する唯一の方法は、動作を使用することです。この動作をモデル化するメソッドは、コマンドのプロパティからイベントを作成し、これらのイベントをアグリゲートに適用します。アグリゲートは、それに応じて内部状態を設定するイベントハンドラーを呼び出します。
  • リポジトリでは、特定のIDにアグリゲートをロードし、新しいアグリゲートを追加するだけです。基本的に、リポジトリはドメインをイベントストアに接続します。
  • イベントストアは、最後になりましたが、データベース(または使用されているストレージ)にイベントを保存し、これらのイベントをいわゆるイベントストリームとしてリロードする役割を果たします。

ここまでは順調ですね。今、私がまだ得ていないいくつかの問題があります:

  • コマンドハンドラーがまだ存在するアグリゲートで動作を呼び出す場合、すべてが非常に簡単です。コマンドハンドラーはリポジトリへの参照を取得し、そのloadByIdメソッドを呼び出して、集計が返されます。しかし、まだ集計がない場合、コマンドハンドラーは何をしますが、集計を作成する必要がありますか?私の理解では、アグリゲートは後でイベントを使用して再構築する必要があります。これは、集約の作成がfooCreatedイベントに応答して行われることを意味します。ただし、任意のイベント(fooCreatedイベントを含む)を保存できるようにするには、集計が必要です。したがって、これは鶏が先か卵が先かという問題のように見えます。イベントなしでアグリゲートを作成することはできませんが、イベントを作成する必要がある唯一のコンポーネントはアグリゲートです。つまり、基本的には次のようになります。新しい集計を作成するにはどうすればよいですか。誰が何をしますか。
  • アグリゲートがイベントをトリガーすると、内部イベントハンドラーがそれに応答し(通常はapplyメソッドを介して呼び出されます)、アグリゲートの状態を変更します。このイベントはどのようにリポジトリに渡されますか?「新しいイベントをリポジトリ/イベントストアに送信してください」アクションを開始したのは誰ですか?骨材自体?集約を監視することによるリポジトリ?内部イベントに登録している他の誰か?...?
  • 最後になりましたが、イベントストリームの概念を正しく理解するのに問題があります。私の想像では、これは単にイベントの順序付きリストのようなものです。重要なのは、それが「順序付けられている」ということです。これは正しいですか?
4

3 に答える 3

41

以下は、私自身の経験と、Lokad.CQRS、NCQRS などのさまざまなフレームワークでの実験に基づいています。これを処理するには複数の方法があると確信しています。私にとって最も意味のあるものを投稿します。

1. 集計の作成:

コマンド ハンドラーが集約を必要とするたびに、リポジトリが使用されます。リポジトリは、イベント ストアからそれぞれのイベントのリストを取得し、オーバーロードされたコンストラクターを呼び出して、イベントを注入します。

var stream = eventStore.LoadStream(id)
var User = new User(stream)

集約が以前に存在しなかった場合、ストリームは空になり、新しく作成されたオブジェクトは元の状態になります。この状態では、いくつかのコマンドのみが集約を有効にすることが許可されていることを確認したい場合がありますUser.Create()

2. 新しいイベントの保存

コマンド処理はUnit of Work内で行われます。コマンドの実行中に、結果として生じるすべてのイベントが集約 ( User.Changes) 内のリストに追加されます。実行が完了すると、変更がイベント ストアに追加されます。以下の例では、これは次の行で発生します。

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. イベントの順序

CustomerMoved後続の 2 つのイベントが間違った順序で再生されるとどうなるか想像してみてください。

疑似コードを使って説明してみます (舞台裏で何が起こるかを示すために、意図的にコマンド ハンドラー内にリポジトリの問題を残しました)。

アプリケーション サービス:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

集計:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        isNewEvent = false
        foreach (event in eventStream)
            this.Apply(event, isNewEvent)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        isNewEvent = true
        this.Apply(new UserCreated(...), isNewEvent)

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        isNewEvent = true
        this.Apply(new UserBlocked(...), isNewEvent)

    Apply(userCreatedEvent, isNewEvent)
        this.created = true
        if (isNewEvent) this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent, isNewEvent)
        this.blocked = true
        if (isNewEvent) this.Changes.Add(userBlockedEvent)

アップデート:

余談ですが、Yves の回答は、数年前のUdi Dahanによる興味深い記事を思い出させてくれました。

于 2012-09-11T08:04:44.807 に答える
12

デニスの優れた答えの小さなバリエーション:

  • 「創造的な」ユースケース (つまり、新しい集約をスピンオフする必要がある) を扱うときは、その責任を移動できる別の集約またはファクトリを見つけてみてください。これは、ハイドレートするためにイベントを取得するアクタ (またはその件に関してリハイドレートするためのその他のメカニズム) を持つことと競合しません。ファクトリは単なる静的メソッド (「コンテキスト」/「意図」のキャプチャに適している) である場合もあれば、別の集約のインスタンス メソッドである場合もあり (「データ」継承に適している)、明示的なファクトリ オブジェクトである場合もあります ("複雑な」作成ロジック)。
  • 内部リストを配列として返す明示的な GetChanges() メソッドを集約に提供するのが好きです。集計が 1 回の実行を超えてメモリ内にとどまる場合は、AcceptChanges() メソッドも追加して、内部リストをクリアする必要があることを示します (通常、物事がイベント ストアにフラッシュされた後に呼び出されます)。ここでは、プル (GetChanges/Changes) またはプッシュ (.net イベントまたは IObservable を考えてください) ベースのモデルのいずれかを使用できます。トランザクションのセマンティクス、技術、ニーズなどに大きく依存します...
  • イベントストリームはリンクされたリストです。各リビジョン (イベント/変更セット) は、前のリビジョン (別名、親) を指します。イベントストリームは、特定の集計に発生した一連のイベント/変更です。順序は、集約境界内でのみ保証されます。
于 2012-09-11T08:26:17.117 に答える
1

私は yves-reynhout と dennis-traub にほぼ同意しますが、私がこれを行う方法をお見せしたいと思います。イベントを自分自身に適用したり、自分自身を再水和したりする責任を自分の集合体から取り除きたいのです。そうしないと、コードの重複が多くなります。すべての集約コンストラクターは同じように見えます。

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)

これらの責任は、コマンド ディスパッチャに任せることができます。コマンドは集合体によって直接処理されます。

Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

これは PHP アプリケーションから投影された疑似コードであることに注意してください。実際のコードには何かが注入され、他の責任は他のクラスでリファクタリングされる必要があります。アイデアは、集約を可能な限りクリーンに保ち、コードの重複を避けることです。

集計に関するいくつかの重要な側面:

  1. コマンド ハンドラーは状態を変更しないでください。イベントを生成するか、例外をスローします
  2. event apply は例外をスローせず、何も返すべきではありません。内部状態のみを変更します

これのオープンソースの PHP 実装は、ここにあります。

于 2016-09-23T07:28:26.787 に答える