2

ArtemisNetClient を使用して C# で要求応答パターンを実装しようとしていますが、実際のソリューションでより一般的な方法で実装する方法を見つけるのに少し苦労しています。

いくつかの Java の例に基づいて、2 つのコンソール アプリケーションで次のようなことができました。

送信者

static async System.Threading.Tasks.Task Main(string[] args)
{
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    string guid = new Guid().ToString();

    var requestAddress = "TRADE REQ1";
    var responseAddress = "TRADE RESP";

    Message message = new Message("BUY AMD 1000 SHARES");
    message.SetCorrelationId(guid);
    message.ReplyTo = responseAddress;

    var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
    await producer.SendAsync(message);

    var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
    var responseMessage = await consumer.ReceiveAsync();

    Console.WriteLine(responseMessage.GetBody<string>());
    
}

レシーバー

static async System.Threading.Tasks.Task Main(string[] args)
{
    // Create connection
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    var requestAddress = "TRADE REQ1";

    // Create consumer to receive trade request messages
    var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
    var message = await consumer.ReceiveAsync();

    Console.WriteLine($"Received message: {message.GetBody<string>()}");

    // Confirm trade request and ssend response message
    if (!string.IsNullOrEmpty(message.ReplyTo))
    {
        Message responseMessage = new Message("Confirmed trade request");
        responseMessage.SetCorrelationId(message.CorrelationId);
        var producer = await connection.CreateProducerAsync(message.ReplyTo);
        await producer.SendAsync(responseMessage);
    }
}

これは期待どおりに機能しましたが、リクエスト レスポンス パターンの例がないことを除いて、この記事で説明されている内容のさらに下に何かを記載したいと思います。

詳しく説明すると、現在、通信したい 2 つのサービスがあります。

サービス 1では、メッセージを作成して発行し、応答を待ってインスタンス オブジェクトを強化し、データベースに保存します。私は現在これを持っていますが、await 応答メッセージがありません。

public async Task<Instance> CreateInstance(Instance instance)
{
    await _instanceCollection.InsertOneAsync(instance);

    var @event = new InstanceCreated
    {
        Id = instance.Id,
        SiteUrl = instance.SiteUrl
    };

    await _messageProducer.PublishAsync(@event);

    return instance;
}

応答メッセージの返信をサポートするために、一時的なキュー/接続または何かを設定し、PublishAsync()それを変更する必要があるかもしれないと考えました。Task<Message>しかし、どうすればそれを行うことができますか?コンソール アプリの例のように、新しい connectionfactory +CreateConsumerAsyncなどを実行する必要がありますか?

public class MessageProducer
{
    private readonly IAnonymousProducer _producer;

    public MessageProducer(IAnonymousProducer producer)
    {
        _producer = producer;
    }

    public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = typeof(T).Name;
        var msg = new Message(serialized);
        if (replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }

    public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = routeName;
        var msg = new Message(serialized);
        if(replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }
}

サービス 2には、メッセージInstanceCreatedConsumerを受信する がありますが、ここでも応答メッセージを返す方法がありません。

public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
    private readonly MessageProducer _messageProducer;
    public InstanceCreatedConsumer(MessageProducer messageProducer)
    {
        _messageProducer = messageProducer;
    }
    public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
    {
        // consume message and return response
    }
}

戻り値で応答メッセージを処理するandを使用してActiveMqExtensionsクラスを拡張できるかもしれないと考えましたが、まだ実現していません。ConsumeAsyncHandleMessage

public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
    RoutingType routingType)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    builder.Services.TryAddScoped<TConsumer>();
    builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
    return builder;
}

private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    try
    {
        var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
        using var scope = serviceProvider.CreateScope();
        var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
        await typedConsumer.ConsumeAsync(msg, token);
        await consumer.AcceptAsync(message);
    }
    catch(Exception ex)
    {
        // todo
    }
}

ここで達成しようとしていることは完全に間違っていますか、それとも ArtemisNetClient では不可能なのでしょうか?

誰かが例を持っているか、私が正しい道を進んでいるかどうかを確認できるか、別のフレームワークを使用する必要があるかもしれません。

ActiveMQ Artemis などのメッセージを介したこの種のコミュニケーションは初めてなので、アドバイスをいただければ幸いです。

4

1 に答える 1