サービスが処理する大量の通知があるため、MSMQ バインディングを使用する WCF サービスを作成したいと考えています。クライアントがサービスによって停止されないこと、および通知が発生した順序で処理されることが重要です。したがって、キューの実装です。
もう 1 つの考慮事項は、回復力です。MSMQ 自体をクラスター化してキューをより堅牢にすることができることはわかっていますが、別のサーバーでサービスのインスタンスを実行できるようにしたいので、サーバーがクラッシュした場合に通知がキューに蓄積されず、別のサーバーが処理を続行します。 .
MSMQ バインディングを試してみたところ、サービスの複数のインスタンスが同じキューでリッスンしている可能性があり、そのままにしておくと、利用可能なサービス全体に負荷が分散された一種のラウンド ロビンを実行することになります。これは素晴らしいことですが、インスタンスが異なればリクエストの処理にかかる時間が異なるため、キューの順序付けが失われてしまいます。
私は単純なコンソール アプリを使って実験を行ってきました。これは以下の壮大なコード ダンプです。実行すると、次のような出力が得られます。
host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed
私がしたいことは次のとおりです。
host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.
S2 が完了していないため、まだ失敗して、処理中のメッセージをキューに返す可能性があると考えていました。したがって、S1 が別のメッセージをキューから取り出すことを許可してはなりません。私のキューはトランザクションでありTransactionScopeRequired = true
、サービスを設定しようとしましたが、役に立ちませんでした。
これは可能ですか?私はそれについて間違った方法で進んでいますか?何らかの中央同期メカニズムなしでフェイルオーバー サービスを構築する他の方法はありますか?
class WcfMsmqProgram
{
private const string QueueName = "testq1";
static void Main()
{
// Create a transactional queue
string qPath = ".\\private$\\" + QueueName;
if (!MessageQueue.Exists(qPath))
MessageQueue.Create(qPath, true);
else
new MessageQueue(qPath).Purge();
// S1 processes as fast as it can
IService s1 = new ServiceImpl("S1");
// S2 is slow
IService s2 = new ServiceImpl("S2", 2000);
// MSMQ binding
NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
// Host S1
ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
ConfigureService(host1, binding);
host1.Open();
Console.WriteLine("host1 open");
// Host S2
ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
ConfigureService(host2, binding);
host2.Open();
Console.WriteLine("host2 open");
// Create a client
ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
IService client = factory.CreateChannel();
// Periodically call the service with a new number
int counter = 1;
using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
{
// Enter to stop
Console.ReadLine();
}
host1.Close();
Console.WriteLine("host1 closed");
host2.Close();
Console.WriteLine("host2 closed");
// Wait for exit
Console.ReadLine();
}
static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
{
var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
}
[ServiceContract]
interface IService
{
[OperationContract(IsOneWay = true)]
void EchoNumber(int number);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class ServiceImpl : IService
{
public ServiceImpl(string name, int sleep = 0)
{
this.name = name;
this.sleep = sleep;
}
private string name;
private int sleep;
public void EchoNumber(int number)
{
Thread.Sleep(this.sleep);
Console.WriteLine("{0}: {1:00}", this.name, number);
}
}
}