1

私はキャメルでいくつかの小さなプロジェクトを実行しましたが、キャメルルートで消費するときにビッグデータ (メモリに収まらない) を処理する方法を理解するのに苦労しています。

camel を使用してルーティングしたい数 GB 相当のデータを含むデータベースがあります。明らかに、すべてのデータをメモリに読み込むことはオプションではありません。

これをスタンドアロン アプリとして実行する場合は、データをページングしてチャンクを JMS エンドポイントに送信するコードを作成します。素敵なパターンを提供するので、キャメルを使用したいと思います。ファイルから消費する場合は、streaming() 呼び出しを使用できます。

また、camel-sql/camel-jdbc/camel-jpa を使用するか、Bean を使用してデータベースから読み取る必要があります。

みんながまだ私と一緒にいることを願っています。私はJava DSLに精通していますが、人々が提供できるヘルプ/提案をいただければ幸いです。

更新 : 2012 年 5 月 2 日

それで、私はこれをいじる時間がありました。私が実際に行っているのは、ルートで使用できるようにプロデューサーの概念を悪用していると思います。

public class MyCustomRouteBuilder extends RouteBuilder {

    public void configure(){
         from("timer:foo?period=60s").to("mycustomcomponent:TEST");

         from("direct:msg").process(new Processor() {
               public void process(Exchange ex) throws Exception{
                   System.out.println("Receiving value" : + ex.getIn().getBody() );
               }
         }
    }

}

私のプロデューサーは次のようになります。わかりやすくするために、CustomEndpoint または CustomComponent は含まれていません。これは単なるラッパーのように見えるためです。

public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e;
    CamelContext c;

    public MyCustomProducer(Endpoint epoint){
          super(endpoint)   
          this.e = epoint;
          this.c = e.getCamelContext();
    }

    public void process(Exchange ex) throws Exceptions{

        Endpoint directEndpoint = c.getEndpoint("direct:msg");
        ProducerTemplate t = new DefaultProducerTemplate(c);

        // Simulate streaming operation / chunking of BIG data.
        for (int i=0; i <20 ; i++){
           t.start();
           String s ="Value " + i ;                  
           t.sendBody(directEndpoint, value)
           t.stop();         
        }
    }
} 

まず、上記はあまりきれいではないようです。これを実行する最もクリーンな方法は、キャメル ルートが消費するスケジュールされたクォーツ ジョブを介して (direct:msg の代わりに) jms キューにデータを入力することです。これにより、キャメル内で受信したメッセージ サイズをより柔軟に調整できます。パイプライン。しかし、Route の一部として時間ベースのアクティベーションを設定するセマンティクスは非常に気に入りました。

これを行う最善の方法について考えている人はいますか。

4

1 に答える 1

1

私の理解では、あなたがする必要があるのは次のことだけです:

from("jpa:SomeEntity" + 
    "?consumer.query=select e from SomeEntity e where e.processed = false" +
    "&maximumResults=150" +
    "&consumeDelete=false")
.to("jms:queue:entities");

maximumResultsクエリごとに取得するエンティティ数の制限を定義します。

エンティティ インスタンスの処理が終了したら、エンティティが再度処理されないように設定する必要がありe.processed = true;ますpersist()

これを行う 1 つの方法は、@Consumed注釈を使用することです。

class SomeEntity {
    @Consumed
    public void markAsProcessed() {
        setProcessed(true);
    }
}

もう 1 つ注意する必要があるのは、エンティティをキューに送信する前にシリアル化する方法です。from と to の間にエンリッチャーを使用する必要がある場合があります。

于 2012-04-24T12:21:23.823 に答える