0

Rhino Service Bus の使用。処理を処理するバックエンド アプリがあり、バックエンドにメッセージを発行する別のアプリ (クライアント UI) があります。バックエンドに Saga があり、その間に Saga がそれ自体にメッセージを発行し、処理を独自のスレッドで実行できる複数の小さなタスクに分割します。問題は、メッセージが Orchestrates インターフェイス経由でサブスクライブされている場合、メッセージが常に破棄されることです。ConsumerOf を使用して別のクラスでサブスクライブでき、コンシューマーはメッセージを受け取ります。

namespace Sagas
{
public class MoveJobSaga: ISaga<MoveJobState>,
    InitiatedBy<TriggerMoveJobCommand>,
    Orchestrates<TriggerMoveTerminalCommand>
{
    private readonly IServiceBus _bus;
    private readonly ITerminalFilesService _tfService;

    public MoveJobSaga(IServiceBus bus)
    {
        _bus = bus;
        State = new MoveJobState();
    }

    public MoveJobState State { get; set; }
    public Guid Id { get; set; }
    public bool IsCompleted { get; set; }

    public void Consume(TriggerMoveJobCommand message)
    {
        State.TerminalsToProcess = message.Terminals.Count();
        State.JobId = message.JobId;
        foreach (var terminal in message.Terminals)
        {
            _bus.Publish(new TriggerMoveTerminalCommand() 
            { 
                CorrelationId = message.CorrelationId,
                Name = terminal.Name
            });
        }
    }

    public void Consume(TriggerMoveTerminalCommand message)
    {
        var result = _tfService.MoveTerminalFiles(message.SourceTifDir, message.TargetTifDir, message.SourceDatDir, message.TargetDatDir);
        State.TerminalsProcessed++;

        if (State.TerminalsToProcess == State.TerminalsProcessed)
        {
            _bus.Publish(new MoveJobCompletedEvent() 
            { 
               Success = State.Success, 
               JobId = State.JobId });
        }
    }
}

public class MoveJobState
{
    public MoveJobState()
    {
        Success = true;
    }
    public int TerminalsToProcess { get; set; }
    public int TerminalsProcessed { get; set; }
    public int JobId { get; set; }
    public bool Success { get; set; }
}
}

ホスト構成:

<rhino.esb>
<bus threadCount="1" numberOfRetries="5" endpoint="msmq://localhost/myapp.host" />
<messages />
</rhino.esb>

ブートストラップ:

public class HostBootStrapper: StructureMapBootStrapper
{
    protected override void ConfigureContainer()
    {
        base.ConfigureContainer();
        Container.Configure(sm =>
        {
            sm.For<ISagaPersister<MoveJobSaga>>().Use<InMemorySagaPersister<MoveJobSaga>>();
            sm.Scan(x =>
            {
                x.TheCallingAssembly();
                x.WithDefaultConventions();
            });
        });
    }
}
4

1 に答える 1

0

ISagaPersister をシングルトンとして登録する必要がありました。

            sm.For<ISagaPersister<MoveJobSaga>>()
                .Singleton()
                .Use<InMemorySagaPersister<MoveJobSaga>>();

また、サガで使用されるすべてのメッセージの発行をサガの外に移動する必要がありました。そこで、開始メッセージをトリガーし、ジョブ内の各メッセージをトリガーするサービス クラスを作成しました。

于 2014-01-20T19:25:57.950 に答える