0

別のバンドルからの受信データを待機し、受信時にデータを処理するサービスをOSGiに実装しようとしています。受信するパケット数がわからないため、を使用しています。私のコードは次のようになります。LinkedBlockingQueue

public class MyClass {

protected static LinkedBlockingQueue<JSONObject> inputQueue = new LinkedBlockingQueue<JSONObject>();
private ExecutorService workerpool = Executors.newFixedThreadPool(4);

public void startMyBundle() {
    start();
}

protected void start() {
    new Thread(new Runnable() {
        public void run() {
            while(true){
                workerpool.execute(new Runnable() {
                    public void run() {
                        try {
                            process(inputQueue.take());
                        } catch (InterruptedException e) {
                            System.out.println("thread was interrupted.");
                        }
                    }
                });
            }
        }
    }).start();
}

public void transmitIn(JSONObject packet) {
    try {
        inputQueue.put(packet);
    } catch (InterruptedException e) {

    }
}

protected  void process(JSONObject packet) {
    //Some processing
}

これを実行していて、サービスに 1 つのパケットのみを送信すると、パケットは最初に処理されますが、プロセッサはその容量をすべて使用し、ほとんどの場合、次のOutOfMemoryErrorようになります。

java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "[Timer] - Periodical Task (Bundle 46) (Bundle 46)"

これの原因は何ですか?

4

3 に答える 3

1

次のコード行が原因で、メモリ不足の例外が発生しています。

while(true){
   workerpool.execute(new Runnable() {
   ...

これは永久にスピンして、新しいRunnableインスタンスを作成し、それらをスレッドプールのタスク キューに追加します。これらは無制限のキューに入り、すぐにメモリがいっぱいになります。

ループinputQueue.take()で呼び出している 4 つのスレッドが必要だと思います。while (true)

for (int i = 0; i < 4; i++) {
    workerpool.execute(new Runnable() {
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                process(inputQueue.take());
            }
        }
    });
}
// remember to shut the queue down after you've submitted the last job
workerpool.shutdown();

Threadまた、タスクをスレッドプールに送信する必要はありません。これは非ブロッキング操作であるため、呼び出し元が直接実行できます。

于 2013-07-30T14:15:53.137 に答える
0

わかりました、少し衒学的ですが、これはOSGiタグ付きの質問なので...

  1. クリーンアップ — スレッドとエグゼキュータ サービスを作成していますが、これをクリーンアップすることはありません。一般に、メソッドのアクティブ化/非アクティブ化のペアが必要であり、非アクティブ化後に何も残さないでください。結束の観点から、これを 1 つのオブジェクトで確認し、これを管理するための中央ポイントを必要としません。Declarative Services は、このパターンに最適です。
  2. 共有 — 一般に、Executor を他のユーザーと共有したい場合は、サービス レジストリから Executor を取得することをお勧めします。これにより、デプロイヤはシステム内のすべてのバンドルの使用状況に基づいてスレッド数を調整できます。

もう 1 つ、Boris は正しい解決策を示しましたが、常に 4 つのスレッドと無制限の LinkedQueue を占有するため、あまり効率的ではありません。さらに悪いことに、コードはサービスのように歩き、サービスのように話しますが、サービスとして使用されているようには見えません。queue + executor は少し倍増しており、OSGi ではこれはサービスであるべきなので、もっとうまくやれると思います。

@Component
public class JSONPackageProcessor implement TransmitIn {
  Executor executor;

  @Reference void setExecutor(Executor e) { this.executor = e; }

  public void transmitIn( final JSONPacket packet ) {
    executor.execute(new Runnable() {
       public void run() { 
         try { process(packet); } 
         catch(Throwable t) { log(packet, t); }
       }
    }
  }

  void process( JSONPacket packet ) { ... }
}

process(...)これは、常に「すぐに」終了すると仮定して、クリーンアップを行う必要はありません。このモデルでは、プール内の (任意の?) 4 つのワーカー スレッドで行ったように、フローは調整されません。Executor の内部キューは、バッファリングに使用されます。これを次のように制限できます。

  Semaphore throttle= new Semaphore(4)

  public void transmitIn( final JSONPacket packet ) throws Exception {
    throttle.acquire();
    executor.execute(new Runnable() {
       public void run() { 
         try { process(packet); } 
         catch(Throwable t) { log(packet, t); }
         finally { throttle.release(); }
    }
  }

これは、Configuration Admin を使用して非常に簡単に構成することもできます。

 @Activate void configure( Map<String,Object> map) throws Exception {
   if ( map.containsKey("throttle") )
     throttle = new Semaphore( map.get("throttle"));
 }

このコードの優れた点は、ほとんどのエラー ケースがログに記録され、OSGi で得られる保証により前後の同時実行性が正しいことです。このコードは実際にはそのままで機能します (タイプミスの保証はなく、実際には実行していません)。

于 2013-07-31T07:09:10.437 に答える
0

次のコードが原因です。

protected void start() {
    new Thread(new Runnable() {
        public void run() {
            while(true){
                workerpool.execute(new Runnable() {
                    public void run() {
                        try {
                            process(inputQueue.take());
                        } catch (InterruptedException e) {
                            System.out.println("thread was interrupted.");
                        }
                    }
                });
            }
        }
    }).start();
}

それが行うことはRunnableExecutorService作業キューに無限の数を追加するバックグラウンド タスクを作成することです。これにより、最終的に OOME が発生します。

あなたがするつもりだったのは、次のことだったと思います。

protected void start() {
    for (int i = 0; i < 4; ++i) {
        workerpool.execute(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        process(inputQueue.take());
                    } catch (InterruptedException e) {
                        //killed, exit.
                        return;
                    }
                }
            }
        });
    }
}

ExecutorServiceつまり、入力を待つ4 つのワーカーを実行します。

于 2013-07-30T14:18:20.303 に答える