7

マルチスレッドの Web クローラーを作成しようとしています。

私のメインのエントリ クラスには、次のコードがあります。

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

URLCrawler は、指定された URL をフェッチし、HTML を解析してそこからリンクを抽出し、見えないリンクを最前線に戻すようにスケジュールします。

フロンティアは、クロールされていない URL のキューです。問題は get() メソッドの書き方です。キューが空の場合は、URLCrawlers が終了するまで待ってから再試行する必要があります。キューが空で、現在アクティブな URLCrawler がない場合にのみ null を返す必要があります。

私の最初のアイデアは、現在動作中の URLCrawler の数をカウントするために AtomicInteger を使用し、notifyAll()/wait() 呼び出しに補助オブジェクトを使用することでした。各クローラーは、開始時に現在動作中の URLCrawlers の数を増やし、終了時にそれを減らし、完了したことをオブジェクトに通知します。

しかし、notify()/notifyAll() と wait() は、スレッド通信を行うためのやや非推奨のメソッドであると読みました。

この作業パターンでは何を使用すればよいですか? M の生産者と N の消費者の場合と同様に、問題は生産者の疲弊にどう対処するかです。

4

6 に答える 6

3

私はあなたのデザインを理解しているかどうか確信が持てませんが、これはSemaphore

于 2010-08-04T05:50:53.897 に答える
3

1つのオプションは、「フロンティア」をブロッキングキューにすることです。そのため、そこから「取得」しようとするスレッドはブロックされます。他の URLCrawler がオブジェクトをそのキューに入れるとすぐに、他のすべてのスレッドに自動的に通知されます (オブジェクトがキューから取り出されます)。

于 2010-08-04T05:52:00.977 に答える
2

質問は少し古いですが、私はいくつかの簡単で実用的な解決策を見つけたと思います:

以下のようにThreadPoolExecutorクラスを拡張します。新しい機能は、アクティブなタスク数を維持しています(残念ながら、提供されgetActiveCount()ているものは信頼できません)。キューに入れられたタスクがこれ以上ない場合taskCount.get() == 0は、実行する必要がなく、エグゼキュータがシャットダウンすることを意味します。終了基準があります。また、エグゼキュータを作成したが、タスクの送信に失敗した場合、エグゼキュータはブロックされません。

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

あなたがしなければならないもう一つのことは、新しいタスクを提出できるようにするためにあなたが使用していることへRunnableの参照を維持する方法であなたを実装することです。Executorこれがモックです:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}
于 2012-12-07T09:13:25.863 に答える
2

あなたのユースケースの基本的な構成要素は、CountDownLatch に似た「ラッチ」だと思いますが、CountDownLatch とは異なり、カウントをインクリメントすることもできます。

このようなラッチのインターフェイスは次のようになります。

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

カウントの有効な値は 0 以上です。await() メソッドを使用すると、カウントがゼロになるまでブロックできます。

このようなラッチがあれば、ユース ケースはかなり簡単に記述できます。また、このソリューションではキュー (フロンティア) を排除できるのではないかと考えています (エグゼキューターが提供するので、多少冗長です)。メインルーチンを次のように書き直します

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

URLCrawler は次のようにラッチを使用します。

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

ラッチの実装に関しては、wait() と notifyAll() に基づく実装、Lock と Condition を使用する実装から、AbstractQueuedSynchronizer を使用する実装まで、さまざまな実装が考えられます。これらの実装はすべて非常に簡単だと思います。wait()-notifyAll() バージョンと Lock-Condition バージョンは相互排除に基づいているのに対し、AQS バージョンは CAS (compare-and-swap) を利用するため、特定の状況ではより適切にスケーリングされる可能性があることに注意してください。

于 2010-08-05T00:32:38.490 に答える
2

この場合、wait/notify の使用は正当化されると思います。juc を使用してこれを行う簡単な方法を考えることはできません
クラスで、Coordinator を呼び出しましょう:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

それから、

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)
于 2010-08-04T06:49:27.157 に答える
0

AdaptiveExecuter を提案したいと思います。特性値に基づいて、スレッドをシリアル化または並列化して実行することを選択できます。以下のサンプルでは、​​PUID は、その決定を行うために使用したかった文字列/オブジェクトです。コードに合わせてロジックを変更できます。コードの一部は、さらに実験できるようにコメント化されています。

クラス AdaptiveExecutor は Executor を実装します { 最終キュー タスク = new LinkedBlockingQueue(); 実行可能アクティブ ; //ExecutorService threadExecutor=Executors.newCachedThreadPool(); static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

于 2011-02-28T21:35:28.583 に答える