2

Apache Camel 2.13.1 を使用して、30 万行を超えるデータベース テーブルをポーリングしています。Idempotent Consumer EIPを使用して、既に処理された行をフィルタリングしようとしています。

ただし、実装が本当にスケーラブルかどうかは疑問です。私のラクダのコンテキストは次のとおりです:-

<camelContext xmlns="http://camel.apache.org/schema/spring">
        <route id="main">
        <from
            uri="sql:select * from transactions?dataSource=myDataSource&amp;consumer.delay=10000&amp;consumer.useIterator=true" />
        <transacted ref="PROPAGATION_REQUIRED" />
        <enrich uri="direct:invokeIdempotentTransactions" />                
        <!-- Any processors here will be executed on all messages -->
    </route>

    <route id="idempotentTransactions">
        <from uri="direct:invokeIdempotentTransactions" />
        <idempotentConsumer
            messageIdRepositoryRef="jdbcIdempotentRepository">
            <ognl>#{request.body.ID}</ognl>
            <!-- Anything here will only be executed for non-duplicates -->
            <log message="non-duplicate" />
            <to uri="stream:out" />
        </idempotentConsumer>
    </route>            
</camelContext>

完全な 30 万行が 10 秒ごとに (consumer.delay パラメーターを介して) 処理されるように見えますが、これは非常に非効率的です。フィルターにフィードするクエリが既に処理された行のセットを利用できるように、パターンの一部としてある種のフィードバック ループを期待します。

ただし、CAMEL_MESSAGEPROCESSED テーブルの messageid 列には次のパターンがあります。

 {1908988=null}

ここで、1908988 は request.body.ID です。EIP をキーオンに設定したため、クエリに簡単に組み込むことができません。

CAMEL_MESSAGEPROCESSED テーブルを select ステートメントへのフィードバック ループとして使用して、SQL サーバーがほとんどの負荷を実行するためのより良い方法はありますか?

アップデート:

そのため、奇妙なメッセージID列の値を引き起こしているのは私のognlコードであることがわかりました。それをに変更する

<el>${in.body.ID}</el>

修正しました。これで、使用可能な messageId 列ができたので、'from' SQL クエリを

select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')

しかし、Idempotent Consumer EIP を破損していると思います。

他の誰かがこれをしますか?しない理由はありますか?

4

1 に答える 1

1

はい、そうです。ただし、すでに処理されたメッセージのセットを保持するために、スケーラブルなストレージを使用する必要があります。ソリューションに応じて、 Hazelcast - http://camel.apache.org/hazelcast-idempotent-repository-tutorial.htmlまたは Infinispan - http://java.dzone.com/articles/clustered-idempotent-consumerのいずれかを使用できます。はすでにスタックにあります。もちろん、JDBC リポジトリは機能しますが、選択したパフォーマンス基準を満たしている場合に限ります。

于 2014-10-01T14:43:45.187 に答える