1

あるプロセスがメッセージを発行し、別のプロセスがメッセージをサブスクライブできるように、単純なメッセージング モジュールを作成しています。プロセス外通信メカニズムとして EF/SqlServer を使用しています。「サーバー」は、パブリッシャー/サブスクライバーのペアに共通する名前です (「チャネル」と呼ばれることもありました)。

名前付きの「サーバー」を表す行をデータベースに追加する次のメソッドがあります

    public void AddServer(string name)
    {
        if (!context.Servers.Any(c => c.Name == name))
        {
            context.Servers.Add(new Server { Name = name });
        }
    }

私が抱えている問題は、同時に 2 つのクライアントを起動すると、1 つだけが新しいサーバー エントリを追加することになっているということですが、それはうまくいきません。私は実際には、同じ名前の 2 つのエントリの非常に間違った結果を得ており、これには Any() ガードでは不十分であることに気付きました。

サーバーのエンティティは int PK を使用し、おそらく私のリポジトリは名前フィールドの一意性を強制します。でも、これはうまくいかないと思い始めています。

public class Server
{
    public int Id { get; set; }
    public string Name { get; set; }
}

これを修正できると思う2つの方法は、どちらも理想的ではないようです。

  1. 文字列主キー
  2. 例外を無視する

これは並行性の問題ですよね?

2 つのクライアントに同じ名前のリポジトリを呼び出してもらいたいが、データベースでその名前の行が 1 つだけの結果を取得したい場合、どうすれば対処できますか?

ここに画像の説明を入力

更新:これがリポジトリコードです

namespace MyBus.Data
{
    public class Repository : IDisposable
    {
        private readonly Context context;
        private readonly bool autoSave;

        public delegate Chain Chain(Action<Repository> action);
        public static Chain Command(Action<Repository> action)
        {
            using (var repo = new Data.Repository(true))
            {
                action(repo);
            }
            return new Chain(next => Command(next));
        }

        public Repository(bool autoSave)
        {
            this.autoSave = autoSave;
            context = new Context();
        }

        public void Dispose()
        {
            if (autoSave)
                context.SaveChanges();
            context.Dispose();
        }

        public void AddServer(string name)
        {
            if (!context.Servers.Any(c => c.Name == name))
            {
                context.Servers.Add(new Server { Name = name });
            }
        }

        public void AddClient(string name, bool isPublisher)
        {
            if (!context.Clients.Any(c => c.Name == name))
            {
                context.Clients.Add(new Client
                {
                    Name = name,
                    ClientType = isPublisher ? ClientType.Publisher : ClientType.Subscriber
                });
            }
        }

        public void AddMessageType<T>()
        {
            var typeName = typeof(T).FullName;
            if (!context.MessageTypes.Any(c => c.Name == typeName))
            {
                context.MessageTypes.Add(new MessageType { Name = typeName });
            }
        }

        public void AddRegistration<T>(string serverName, string clientName)
        {
            var server = context.Servers.Single(c => c.Name == serverName);
            var client = context.Clients.Single(c => c.Name == clientName);
            var messageType = context.MessageTypes.Single(c => c.Name == typeof(T).FullName);
            if (!context.Registrations.Any(c =>
                    c.ServerId == server.Id &&
                    c.ClientId == client.Id &&
                    c.MessageTypeId == messageType.Id))
            {
                context.Registrations.Add(new Registration
                {
                    Client = client,
                    Server = server,
                    MessageType = messageType
                });
            }
        }

        public void AddMessage<T>(T item, out int messageId)
        {
            var messageType = context.MessageTypes.Single(c => c.Name == typeof(T).FullName);
            var serializer = new XmlSerializer(typeof(T));
            var sb = new StringBuilder();
            using (var sw = new StringWriter(sb))
            {
                serializer.Serialize(sw, item);
            }
            var message = new Message
            {
                MessageType = messageType,
                Created = DateTime.UtcNow,
                Data = sb.ToString()
            };
            context.Messages.Add(message);
            context.SaveChanges();
            messageId = message.Id;
        }

        public void CreateDeliveries<T>(int messageId, string serverName, string sendingClientName, T item)
        {
            var messageType = typeof(T).FullName;

            var query = from reg in context.Registrations
                        where reg.Server.Name == serverName &&
                              reg.Client.ClientType == ClientType.Subscriber &&
                              reg.MessageType.Name == messageType
                        select new
                        {
                            reg.ClientId
                        };

            var senderClientId = context.Clients.Single(c => c.Name == sendingClientName).Id;

            foreach (var reg in query)
            {
                context.Deliveries.Add(new Delivery
                {
                    SenderClientId = senderClientId,
                    ReceiverClientId = reg.ClientId,
                    MessageId = messageId,
                    Updated = DateTime.UtcNow,
                    DeliveryStatus = DeliveryStatus.Sent
                });
            }
        }

        public List<T> GetDeliveries<T>(string serverName, string clientName, out List<int> messageIds)
        {
            messageIds = new List<int>();
            var messages = new List<T>();
            var clientId = context.Clients.Single(c => c.Name == clientName).Id;
            var query = from del in context.Deliveries
                        where del.ReceiverClientId == clientId &&
                              del.DeliveryStatus == DeliveryStatus.Sent
                        select new
                        {
                            del.Id,
                            del.Message.Data
                        };
            foreach (var item in query)
            {
                var serializer = new XmlSerializer(typeof(T));
                using (var sr = new StringReader(item.Data))
                {
                    var t = (T)serializer.Deserialize(sr);
                    messages.Add(t);
                    messageIds.Add(item.Id);
                }
            }
            return messages;
        }

        public void ConfirmDelivery(int deliveryId)
        {
            using (var context = new Context())
            {
                context.Deliveries.First(c => c.Id == deliveryId).DeliveryStatus = DeliveryStatus.Received;
                context.SaveChanges();
            }
        }
    }
}
4

3 に答える 3

1

現在、このソリューションを使用しています:

    public void AddServer(string name)
    {
        if (!context.Servers.Any(c => c.Name == name))
        {
            context.Database.ExecuteSqlCommand(@"MERGE Servers WITH (HOLDLOCK) AS T
                                                 USING (SELECT {0} AS Name) AS S
                                                 ON T.Name = S.Name
                                                 WHEN NOT MATCHED THEN 
                                                 INSERT (Name) VALUES ({0});", name);
        }
    }
于 2013-07-08T23:38:56.947 に答える
1

int 主キーを保持できますが、列に一意のインデックスNameを定義することもできます。

このように、同時実行の状況では、最初の挿入のみが成功します。同じサーバー名を挿入しようとする後続のクライアントは、SqlException.

于 2013-07-08T20:24:29.053 に答える