4

良い一日。

Web クローラー プロジェクトにブロッカーの問題があります。ロジックは単純です。最初に を作成しRunnable、html ドキュメントをダウンロードし、すべてのリンクをスキャンしてから、資金提供されたすべてのリンクで新しいRunnableオブジェクトを作成します。次に作成された各 new は、各リンクに対して Runnable新しいオブジェクトを作成し、それらを実行します。Runnable

問題は、ExecutorService決して止まらないことです。

CrawlerTest.java

public class CrawlerTest {

    public static void main(String[] args) throws InterruptedException {
        new CrawlerService().crawlInternetResource("https://jsoup.org/");
    }
}

CrawlerService.java

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

public class CrawlerService {

    private Set<String> uniqueUrls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>(10000));
    private ExecutorService executorService = Executors.newFixedThreadPool(8);
    private String baseDomainUrl;

    public void crawlInternetResource(String baseDomainUrl) throws InterruptedException {
        this.baseDomainUrl = baseDomainUrl;
        System.out.println("Start");
        executorService.execute(new Crawler(baseDomainUrl)); //Run first thread and scan main domain page. This thread produce new threads.
        executorService.awaitTermination(10, TimeUnit.MINUTES);
        System.out.println("End");
    }

    private class Crawler implements Runnable { // Inner class that encapsulates thread and scan for links

        private String urlToCrawl;

        public Crawler(String urlToCrawl) {
            this.urlToCrawl = urlToCrawl;
        }

        public void run() {
            try {
                findAllLinks();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void findAllLinks() throws InterruptedException {
            /*Try to add new url in collection, if url is unique adds it to collection, 
             * scan document and start new thread for finded links*/
            if (uniqueUrls.add(urlToCrawl)) { 
                System.out.println(urlToCrawl);

                Document htmlDocument = loadHtmlDocument(urlToCrawl);
                Elements findedLinks = htmlDocument.select("a[href]");

                for (Element link : findedLinks) {
                    String absLink = link.attr("abs:href");
                    if (absLink.contains(baseDomainUrl) && !absLink.contains("#")) { //Check that we are don't go out of domain
                        executorService.execute(new Crawler(absLink)); //Start new thread for each funded link
                    }
                }
            }
        }

        private Document loadHtmlDocument(String internetResourceUrl) {
            Document document = null;
            try {
                document = Jsoup.connect(internetResourceUrl).ignoreHttpErrors(true).ignoreContentType(true)
                        .userAgent("Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
                        .timeout(10000).get();
            } catch (IOException e) {
                System.out.println("Page load error");
                e.printStackTrace();
            }
            return document;
        }
    }
}

このアプリは、jsoup.org をスキャンしてすべての一意のリンクを探すのに約 20 秒かかります。しかし、10分待つだけでexecutorService.awaitTermination(10, TimeUnit.MINUTES); 、メインスレッドが死んでいて、まだ実行中のエグゼキュータが表示されます。

スレッド

強制ExecutorService的に正しく動作させるには?

問題は、メインスレッドではなく別のタスク内で executorService.execute を呼び出すことだと思います。

4

4 に答える 4

3

を悪用してawaitTerminationいます。javadoc によると、shutdown最初に呼び出す必要があります。

シャットダウン要求の後、すべてのタスクの実行が完了するか、タイムアウトが発生するか、現在のスレッドが中断されるかのいずれかが最初に発生するまでブロックします。

あなたの目標を達成するために、安全に実行できるようにタスクが残っていない正確な瞬間を判断するために(またはこのCountDownLatchような増分をサポートするラッチを)使用することをお勧めします。shutdown

于 2016-08-12T10:27:05.990 に答える
2

私は以前からあなたのコメントを見ます:

リソースから収集する一意のリンクの数が事前にわからないため、CountDownLatch を使用できません。

まず、vsminkov はawaitTermniation、10 分間座って待機する理由について、的確に答えています。代替ソリューションを提供します。

代わりにPhaserCountDownLatchを使用します。新しいタスクごとに、登録して完了を待つことができます。

registeraexecute.submitが呼び出されるarriveたび、および aが完了するたびに、単一の Phaser を作成しますRunnable

public void crawlInternetResource(String baseDomainUrl) {
    this.baseDomainUrl = baseDomainUrl;

    Phaser phaser = new Phaser();
    executorService.execute(new Crawler(phaser, baseDomainUrl)); 
    int phase = phaser.getPhase();
    phase.awaitAdvance(phase);
}

private class Crawler implements Runnable { 

    private final Phaser phaser;
    private String urlToCrawl;

    public Crawler(Phaser phaser, String urlToCrawl) {
        this.urlToCrawl = urlToCrawl;
        this.phaser = phaser;
        phaser.register(); // register new task
    }

    public void run(){
       ...
       phaser.arrive(); //may want to surround this in try/finally
    }
于 2016-08-12T18:52:59.407 に答える
0

シャットダウンを呼び出していません。

これは機能する可能性があります - CrawlerService の AtomicLong 変数。すべての新しいサブタスクが executor サービスに送信される前に増分します。

run() メソッドを変更してこのカウンターをデクリメントし、0 の場合はエグゼキューター サービスをシャットダウンします。

public void run() {
    try {
        findAllLinks();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //decrements counter
        //If 0, shutdown executor from here or just notify CrawlerService who would be doing wait().
    }
}

「最終的に」では、カウンターを減らし、カウンターがゼロの場合は、executor をシャットダウンするか、単に CrawlerService に通知します。0 は、これが最後のものであり、他に実行されておらず、キューに保留されているものがないことを意味します。新しいサブタスクをサブミットするタスクはありません。

于 2016-08-12T12:42:00.387 に答える
0

ExecutorService を強制的に正しく動作させる方法は?

問題は、メインスレッドではなく別のタスク内で executorService.execute を呼び出すことだと思います。

いいえ。問題は ExecutorService にはありません。API を間違った方法で使用しているため、正しい結果が得られません。

正しい結果を得るには、3 つの API を特定の順序で使用する必要があります。

1. shutdown
2. awaitTermination
3. shutdownNow

ExecutorServiceのOracleドキュメントページからの推奨される方法:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

shutdown(): 以前に送信されたタスクが実行される順序どおりのシャットダウンを開始しますが、新しいタスクは受け入れられません。

shutdownNow():アクティブに実行中のすべてのタスクの停止を試み、待機中のタスクの処理を停止し、実行を待機していたタスクのリストを返します。

awaitTermination():シャットダウン要求の後、すべてのタスクの実行が完了するか、タイムアウトが発生するか、または現在のスレッドが中断されるかのいずれかが最初に発生するまでブロックします。

別のメモ: すべてのタスクが完了するまで待ちたい場合は、関連する SE の質問を参照してください。

すべてのスレッドが Java での作業を完了するまで待ちます

ユースケースに最適なinvokeAll()またはを使用することをお勧めします。ForkJoinPool()

于 2016-08-13T11:20:50.493 に答える