0

終了したスレッドを返す Java 印刷結果でマルチスレッド プログラムを作成しようとしています。問題は、このコードを実行すると、キューにあった 2 番目の値でスタックすることです。

        System.out.println("[!] Creaing pool");
        int max_threads     = 50;
        ExecutorService threadPool  = Executors.newFixedThreadPool(max_threads);
        CompletionService<String> taskCompletionService =
        new ExecutorCompletionService<String>(threadPool);
        String url;

        while(our_file.hasNext()){

            url = our_file.next();
            if (url.length()>0){

                futures.add(
                    taskCompletionService.submit(
                    new GoGo(url)
                    )
                    );
                    }



            int total_tasks = futures.size();

            while(total_tasks>0){
            for (int i=0; i<futures.size(); i++){

                try{    

                    Future result = taskCompletionService.poll();
                    if(result!=null && result.isDone()){
                        System.out.println(result.get());
                        total_tasks--;

                    }
                }
                catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }

                }


            }

                 }


        threadPool.shutdown();
        try {
        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e ) {
        }



...



     class GoGo implements Callable{

        private String url;

        public GoGo(String received_url){
            this.url = received_url;
        }

        public String call(){

            String URL = this.url;
            return url;

        }
    }

出力は次のようになります。

[!] Creaing pool
http://www.www1.com/
http://www.www2.ch/

この時点で、プログラムは動かなくなります。スレッドを送信するメイン ループから先物配列を反復処理するループを移動しようとしましたが、正常に機能しましたが、非常に大きなファイルを処理する場合に備えて、リアルタイムの出力が必要です。CompletionService のノンブロッキング poll() メソッドを使用する適切なコードが見つかりませんでした。回答または参考にしていただきありがとうございます。

4

2 に答える 2

1

問題は、1 つのスレッドで 2 つのこと (作業の送信と作業結果の読み取り) を同時に実行しようとしていることです。

それは意味がありません-同時タスクの場合、複数のスレッドが必要です。

そのため、別のスレッドを作成して結果を読み取ります。または、タスクを送信する別のスレッド。どちらの方法でもかまいません。いずれにせよ、スレッドは 1 つではなく 2 つになります。

于 2013-10-27T15:31:51.537 に答える
0

Robin Green のアドバイスに感謝します。将来のハーベスター クラスを別のスレッドに配置することで問題が解決しました。したがって、poll() で引数をポップする無限ループ スレッドを開始するだけで、ポップされた将来のオブジェクトがスレッド isDone() を示しているかどうかを確認し、出力を書き込みます。そして、fixedThreadPool をシャットダウンした後、出力ライター クラスを停止します。コードは次のとおりです (GoGo クラスを除く)。

    public class headScanner {


        public static List<Future<String>> gloabal_futures   = new ArrayList<Future<String>>();


        public static void main(String args[]){

            Scanner our_file                 = null;
            ArrayList<String> our_urls       = new ArrayList<String>();
            List<Future<String>> futures     = new ArrayList<Future<String>>();
            ArrayList<String> urls_buffer    = new ArrayList<String>();


            try {
            our_file = new Scanner (new File ("list.txt"));
            }
            catch(IOException e){
                System.out.println("[-] Cant open the file!");
                System.exit(0);
            }

            System.out.println("[!] Creaing pool");
            int max_threads     = 50;
            ExecutorService threadPool  = Executors.newFixedThreadPool(max_threads);
            CompletionService<String> taskCompletionService =
            new ExecutorCompletionService<String>(threadPool);
            String url;



            Thread result_thread = new Thread(new ResultHarvester(futures.size(), taskCompletionService));
            result_thread.start();  

            while(our_file.hasNext()){

                url = our_file.next();
                if (url.length()>0){

                    futures.add(
                        taskCompletionService.submit(
                        new GoGo(url)
                        )
                        );
                        }

                }



            threadPool.shutdown();
            try {
            threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            }
            catch (InterruptedException e ) {
            }

            result_thread.stop();

        }
    }


 class ResultHarvester implements Runnable {

     private int size;
     private CompletionService<String> all_service;

     public ResultHarvester (int size, CompletionService<String> service){
        this.size = size;
        this.all_service = service;
     }

     public void run(){
            int future_size = 1;
            CompletionService<String> this_service = this.all_service; 
                while(true){
                    Future result = this_service.poll();
                    try {
                            if(result!=null && result.isDone()){
                                String output = result.get().toString();
                                if(output.length()>1){
                                    System.out.println(output);
                                }

                            }
                        }

                    catch (InterruptedException e) {
                        // Something went wrong with a task submitted
                        System.out.println("Error Interrupted exception");
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        // Something went wrong with the result
                        e.printStackTrace();
                        System.out.println("Error get() threw exception");
                    }


                    }
                }

        }
于 2013-10-27T17:40:04.330 に答える