2

ActiveMQ で、膨大な数のメッセージがトピックから脱落しないという問題があります。トピックは非持続性、非持続性に設定されています。Activemq.xml ファイルは

<beans>

  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">

<!--
    <persistenceAdapter>
      <journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
    </persistenceAdapter>
-->

        <transportConnectors>
            <transportConnector uri="vm://localhost"/>
        </transportConnectors>

  </broker>

</beans>

そして、messaging-config.xml のトピック定義は次のとおりです。

<destination id="traceChannel">

    <properties>

        <network>
        <session-timeout>10</session-timeout>
    </network>

        <server>
            <message-time-to-live>10000</message-time-to-live>
            <durable>false</durable>
            <durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
        </server>

        <jms>
            <destination-type>Topic</destination-type>
            <message-type>javax.jms.ObjectMessage</message-type>
            <connection-factory>ConnectionFactory</connection-factory>
            <destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
            <delivery-mode>NON_PERSISTENT</delivery-mode>
            <message-priority>DEFAULT_PRIORITY</message-priority>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <transacted-sessions>false</transacted-sessions>
            <initial-context-environment>
                <property>
                    <name>Context.INITIAL_CONTEXT_FACTORY</name>
                    <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                </property>
                <property>
                    <name>Context.PROVIDER_URL</name>
                    <value>tcp://localhost:61616</value>
                </property>
            </initial-context-environment>
        </jms>
    </properties>

    <channels>
        <channel ref="rtmps" />
    </channels>

    <adapter ref="trace" />

</destination>

私が達成しようとしているのは、最後の 10 件のメッセージのみが一度にトピックにあるということです。これは、非常に少数しか保持されていないはずなのに、一晩実行したままにしておくと、トピックに関する 15 万件を超えるメッセージが発生するためです。

4

4 に答える 4

5

私の知る限り、サブスクライバーのない非永続トピックに送信されたメッセージはドロップする必要があります。現在登録されているコンシューマーのみがメッセージのコピーを取得します。

トピックにこれらの 150K メッセージが含まれていることをどのように確認しますか? JMX経由?

非永続トピックがこれらの 150K メッセージをキャッシュしてはならないという事実に関係なく、ブローカー ポリシーを使用して、コンシューマーごとに格納されるメッセージの量を制限できます。

<broker>
...    
  <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="10"/>
  </pendingMessageLimitStrategy>
...
</broker>
于 2010-03-01T18:55:43.370 に答える
2

あなたが望むものをアーカイブできるかどうかわかりません。いくつかのことができます:

まず、メッセージに有効期限を設定できます。

public ITopicPublisher CreateTopicPublisher(string selector)
        {
            try
            {
                IMessageProducer producer = m_session.CreateProducer(m_topic);
                ActiveMQPublisher publisher = new ActiveMQPublisher(producer);

                // here we put a time to live to 1min for eg
                TimeSpan messageTTL = TimeSpan.FromMilliseconds(60000);
                publisher.TimeToLive = messageTTL;

                if (!String.IsNullOrEmpty(selector))
                {
                    publisher.IsSelector = true;
                    publisher.Selector = selector;
                }
                return publisher;
            }
            catch (Exception ex)
            {
                Logger.Exception(ex);
                throw;
            }
        }

朝にはまだ 150k メッセージを受け取りますが、1 つのコンシューマーがトピックに接続するとすぐに、期限切れのメッセージはなくなります。

エビクション戦略( Eviction Strategy )も参照してください。

編集:私は1つのことに気づきました。あなたは、「非常に少数しか保持されていないはずなのに、一晩実行したままにしておくと、トピックに関するメッセージが 150,000 件を超えてしまう」と言います。トピックには、ピックアップする必要があるメッセージの概念がありません。誰もトピックにサブスクライブしていない場合、そこに送信されたメッセージは破棄されます。それでも、「キューに入れられたメッセージ」は 1 つ増えます。「最後の 10 件のメッセージ」のみを送信したい場合は、トピックの代わりにキューを使用する必要がありますか?

于 2010-02-18T09:23:47.173 に答える
1

それが機能するかどうかはわかりませんが、まだテストする機会はありませんが、http://activemq.apache.org/slow-consumer-handling.htmlのconstantPendingMessageLimitStrategyをご覧ください。

頑張ってクラウディオ

于 2010-02-26T18:22:25.103 に答える
-1

この投稿からは明らかでないことがいくつかあります

  • これを追加しようとする私の解析と苦労に基づいて..もう少しコンテキストが必要なので、ActiveMQ google の苦労を経験したくない人のために以下に追加しました..
  • これは私のactivemq.xmlにあるもので、私はこの作業をまったく見ていないので、私を正しい方向に向けることができるかもしれない他の人からいくつかの洞察を得られることを望んでいました.

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">"
                       topicPrefetch="10"/>
          <policyEntry topic=">"
                       producerFlowControl="false"/>
          <policyEntry topic=".>"
                       >
            <messageEvictionStrategy>
              <oldestMessageEvictionStrategy/>
            </messageEvictionStrategy>
    
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
    
于 2014-07-10T16:41:12.020 に答える