Apache Camel 2.13.1 を使用して、30 万行を超えるデータベース テーブルをポーリングしています。Idempotent Consumer EIPを使用して、既に処理された行をフィルタリングしようとしています。
ただし、実装が本当にスケーラブルかどうかは疑問です。私のラクダのコンテキストは次のとおりです:-
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="main">
<from
uri="sql:select * from transactions?dataSource=myDataSource&consumer.delay=10000&consumer.useIterator=true" />
<transacted ref="PROPAGATION_REQUIRED" />
<enrich uri="direct:invokeIdempotentTransactions" />
<!-- Any processors here will be executed on all messages -->
</route>
<route id="idempotentTransactions">
<from uri="direct:invokeIdempotentTransactions" />
<idempotentConsumer
messageIdRepositoryRef="jdbcIdempotentRepository">
<ognl>#{request.body.ID}</ognl>
<!-- Anything here will only be executed for non-duplicates -->
<log message="non-duplicate" />
<to uri="stream:out" />
</idempotentConsumer>
</route>
</camelContext>
完全な 30 万行が 10 秒ごとに (consumer.delay パラメーターを介して) 処理されるように見えますが、これは非常に非効率的です。フィルターにフィードするクエリが既に処理された行のセットを利用できるように、パターンの一部としてある種のフィードバック ループを期待します。
ただし、CAMEL_MESSAGEPROCESSED テーブルの messageid 列には次のパターンがあります。
{1908988=null}
ここで、1908988 は request.body.ID です。EIP をキーオンに設定したため、クエリに簡単に組み込むことができません。
CAMEL_MESSAGEPROCESSED テーブルを select ステートメントへのフィードバック ループとして使用して、SQL サーバーがほとんどの負荷を実行するためのより良い方法はありますか?
アップデート:
そのため、奇妙なメッセージID列の値を引き起こしているのは私のognlコードであることがわかりました。それをに変更する
<el>${in.body.ID}</el>
修正しました。これで、使用可能な messageId 列ができたので、'from' SQL クエリを
select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')
しかし、Idempotent Consumer EIP を破損していると思います。
他の誰かがこれをしますか?しない理由はありますか?