2

Azure Service Bus のインスタンスの使用に関して、非常にイライラする問題が発生しています。具体的には、「コマンド」という名前のトピックをセットアップしています。これは、しばらくすると、サブスクリプションへのメッセージのプロキシを正しく停止します。私は、現在困惑している状態に私を残した次の動作を観察しました。

  1. これは、トピックがしばらくの間、通常は一晩存在した後にのみ発生します。夕方にはすべてが機能しますが、朝に戻ったときにトピックが機能しなくなっており、唯一の解決策はそれを削除して再作成することです.

  2. 下の画像に示すように、送信後に例外を受信せず、トピックの「Active Message Count」および「Size In Bytes」プロパティが増加するため、トピックはメッセージを受信して​​いるように見えます。サブスクリプションにメッセージを送信しないだけです。ここに画像の説明を入力

  3. サブスクリプションのフィルターが問題を引き起こしているのではないかと考えたので、これをテストするために、これらのサブスクリプションを削除し、エクスプローラーを使用して既定のフィルターを使用して新しいサブスクリプションを作成しました。これを行った後、トピックを介していくつかの新しいメッセージを送信しましたが、新しいサブスクリプションではまだ受信されませんでした.

  4. サービス バスで実行されている他のトピック (上の画像の「イベント」など) がありますが、同じ動作を示しているようには見えません。それらは同じ方法で構成されていますが、問題なく動作します。

このような奇妙な動作を引き起こしている可能性のあるものについて、私はどんな理論にもオープンです. この問題の解決に役立つ場合は、追加情報を提供させていただきます。

コードブロック:

トピックを作成:

private async Task<bool> CreateTopicAsync(NamespaceManager namespaceManager, string topicName, CancellationToken cancel, TimeSpan maxWaitTime)
        {
            var retVal = false;
            var maxTimeToCreateTopic = DateTime.UtcNow + maxWaitTime;

            while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateTopic)
            {
                try
                {
                    await namespaceManager.CreateTopicAsync(new TopicDescription(topicName)
                    {
                        EnableBatchedOperations = true,
                        EnableFilteringMessagesBeforePublishing = true
                    });
                    retVal = true;
                    break;
                }
                catch (Exception ex)
                {
                    LogError("Exception thrown when creating topic: {0}", ex);
                }

                if (!retVal)
                {
                    LogWarning("Topic still does not exist, pausing and then retrying creation.");
                    await Task.Delay(_delayMs);
                }
            }

            return retVal;
        }

サブスクリプションの作成:

private async Task<bool> CreateSubscriptionAsync(NamespaceManager namespaceManager, string topicName, string subscriptionName, string filter, CancellationToken cancel, TimeSpan maxWaitTime)
        {
            var retVal = false;
            var maxTimeToCreateSubscription = DateTime.UtcNow + maxWaitTime;

            while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateSubscription)
            {
                try
                {
                    if (string.IsNullOrEmpty(filter))
                    {
                        namespaceManager.CreateSubscription(topicName, subscriptionName);
                    }
                    else
                    {
                        namespaceManager.CreateSubscription(topicName, subscriptionName, new SqlFilter(filter));
                    }
                    retVal = true;
                    break;
                }
                catch (Exception ex)
                {
                    LogError("Exception thrown when creating subscription: {0}", ex);
                }

                LogWarning("Subscription still does not exist, pausing and then retrying creation.");
                await Task.Delay(_delayMs);
            }

            return retVal;
        }

トピックにメッセージを送信:

BrokeredMessage brokeredMessage = null;
                    try
                    {
                        var type = nextMessage.GetType().AssemblyQualifiedName;
                        var jsonString = JsonConvert.SerializeObject(nextMessage);
                        var jsonStream = jsonString.ToStream();

                        brokeredMessage = new BrokeredMessage(jsonStream, true);
                        brokeredMessage.Properties["__messageType__"] = type;
                        if (nextData.Properties != null && nextData.Properties.Count > 0)
                        {
                            foreach (var prop in nextData.Properties)
                            {
                                brokeredMessage.Properties.Add(prop.Key, prop.Value);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        LogError("Exception thrown when creating brokered message: {0}", ex);

                        brokeredMessage = null;
                    }

                    if (brokeredMessage != null)
                    {
                        var messageSentSuccessfully = false;
                        try
                        {
                            await client.SendAsync(brokeredMessage);
                            numConsecutiveFailures = 0;
                            messageSentSuccessfully = true;
                        }
                        catch (Exception ex)
                        {
                            numConsecutiveFailures++;
                            LogError("Exception thrown from SendAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
                            await Task.Delay(_delayMs);
                        }
                    }

渡されたトピック クライアントは、TopicClient.CreateFromConnectionString メソッドで作成されます。

サブスクリプションからメッセージを受信:

private async Task ReceiveLoopAsync(SubscriptionClient client, CancellationToken cancel, TimeSpan maxReceiveWaitTime)
        {
            var numConsecutiveFailures = 0;
            var maxConsecutiveFailures = 5;

            while (!cancel.IsCancellationRequested && numConsecutiveFailures < maxConsecutiveFailures)
            {
                BrokeredMessage newMsg = null;

                try
                {
                    newMsg = await client.ReceiveAsync(maxReceiveWaitTime);
                    numConsecutiveFailures = 0;
                }
                catch (Exception ex)
                {
                    numConsecutiveFailures++;
                    LogError("Exception thrown from ReceiveAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
                    await Task.Delay(_delayMs);
                }

                // newMsg will be null if there were no messages to process after the allotted timeout expired.
                if (newMsg != null)
                {
                    // Just a function call.
                    _onMessageReceived?.Invoke(newMsg);
                }

                //LogDebug("Bottom of Receive");
            }

            //LogDebug("Exit Receive");
        }

サブスクリプション クライアントに渡されたものは、SubscriptionClient.CreateFromConnectionString メソッドで作成されるだけです。

4

0 に答える 0