2

2 つのクラスで名前が付けられたメソッドがありprocessますCLASS-A and CLASS-B。以下のループではprocess method、両方のクラスを順番に呼び出していますが、それは正常に機能しますが、それは私が探している方法ではありません。

for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
    final Map<String, String> response = entry.getPlugin().process(outputs);

    // write to database
    System.out.println(response);
}

方法はありますか。両方のクラスのプロセス メソッドをマルチスレッドで呼び出すことができます。つまり、1 つのスレッドが CLASS-A のプロセス メソッドを呼び出し、2 つ目のスレッドが CLASS-B のプロセス メソッドを呼び出します。

その後、processメソッドによって返されたデータをデータベースに書き込むことを考えていました。したがって、データベースに書き込むためのスレッドをもう 1 つ持つことができます。

以下は、マルチスレッドの方法で思いついたコードですが、どういうわけかまったく実行されていません。

public void writeEvents(final Map<String, Object> data) {

    // Three threads: one thread for the database writer, two threads for the plugin processors
    final ExecutorService executor = Executors.newFixedThreadPool(3);

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();

    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);

    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                // put the response map in the queue for the database to read
                queue.offer(response);
            }
        });
    }

    Future<?> future = executor.submit(new Runnable () {
        public void run() {
            Map<String, String> map;
            try {
                while(true) {
                    // blocks until a map is available in the queue, or until interrupted
                    map = queue.take();
                    // write map to database
                    System.out.println(map);
                }
            } catch (InterruptedException ex) {
                // IF we're catching InterruptedException then this means that future.cancel(true)
                // was called, which means that the plugin processors are finished;
                // process the rest of the queue and then exit
                while((map = queue.poll()) != null) {
                    // write map to database
                    System.out.println(map);
                }
            }
        }
    });

    // this interrupts the database thread, which sends it into its catch block
    // where it processes the rest of the queue and exits
    future.cancel(true); // interrupt database thread

    // wait for the threads to finish
    try {
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //log error here
    }
}

しかし、最後の行を削除すると、正常にexecutor.awaitTermination(5, TimeUnit.MINUTES);動作し始め、しばらくすると、常に次のようなエラーが発生します-

JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event
JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.

上記のコードで何が問題で、何が間違っているのかを理解するのを手伝ってくれる人はいますか? 順次実行している場合、エラーは発生せず、正常に動作します。

また、私がやっている方法と比較して、これを行うより良い方法はありますか? 将来的には、プラグイン プロセッサを 2 つではなく複数持つことができるからです。

私がやろうとしているのは、両方のクラスのプロセス メソッドをマルチスレッドの方法で呼び出してから、データベース bcoz に書き込むことです。私のプロセス メソッドはマップを返します。

これについて何か助けていただければ幸いです。可能であれば、これに関する実行可能な例を探しています。助けてくれてありがとう、

4

2 に答える 2

1

貼り付けたコード スニペットには問題がほとんどありません。問題を修正すれば、問題なく動作するはずです。
1. ブロッキング キューから要素を取得するために無限ループを使用しており、future を使用してこれを中断しようとしています。これは間違いなく良いアプローチではありません。このアプローチの問題は、データベース スレッドが実行されない可能性があることです。これは、実行前であっても、呼び出し元スレッドで実行される将来のタスクによってキャンセルされる可能性があるためです。これはエラーが発生しやすいです。
- while ループを決まった回数実行する必要があります (プロデューサーの数や、応答を取得する回数は既にわかっています)。

  1. また、エグゼキュータ サービスに送信されるタスクは独立したタスクである必要があります...ここでは、データベース タスクは他のタスクの実行に依存しています..これは、実行ポリシーが変更された場合にもデッドロックにつながる可能性があります..たとえば、単一のスレッド プール エグゼキュータを使用する場合データベース スレッドがスケジュールされている場合、プロデューサーがデータをキューに追加するのを待つだけでブロックされます。

    • 良い方法は、同じスレッドでデータを取得してデータベースを更新するタスクを作成することです。
    • または、最初にすべての応答を取得してから、データベース操作を並行して実行します

    public void writeEvents(最終地図データ) {

    final ExecutorService executor = Executors.newFixedThreadPool(3);       
    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);
    
    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                try {
                    final Map<String, String> response = entry.getPlugin().process(outputs);
                    //process the response and update database.
                    System.out.println(map);
                } catch (Throwable e) {
                    //handle execption
                } finally {
                    //clean up resources
                }               
            }
        });
    }
    

    // これは実行中のスレッドが完了するのを待ちます..これは正常なシャットダウンです。executor.shutdown(); }

于 2013-08-28T05:48:56.320 に答える
0

OK、これが上で提案したコメントのコードです。免責事項:それが機能するのか、コンパイルできるのか、問題が解決するのかはわかりません。future.cancelしかし、アイデアは、私が問題を引き起こす可能性があることに依存するのではなく、キャンセルプロセスを制御することです.

class CheckQueue implements Runnable {
    private volatile boolean cancelled = false;
    public void cancel() { cancelled = true; }
    public void run() {
        Map<String, String> map;
        try {
            while(!cancelled) {
                // blocks until a map is available in the queue, or until interrupted
                map = queue.take();
                if (cancelled) break;
                // write map to database
                System.out.println(map);
        } catch (InterruptedException e) {
        }
        while((map = queue.poll()) != null) {
            // write map to database
            System.out.println(map);
        }
    }
}

CheckQueue queueChecker = new CheckQueue ();
Future<?> future = executor.submit(queueChecker);

// this interrupts the database thread, which sends it into its catch block
// where it processes the rest of the queue and exits
queueChecker.cancel();
于 2013-08-27T22:57:47.867 に答える