0

私は次のデザインを持っています:

拡張するタスクがあり、TimerTask毎分実行するようにスケジュールされています。このタスクは、(単一のコンシューマーとして)中央キューからアイテムを取得し、それらの表現をファイルに書き込もうとします。

さらに、時々アイテムを中央キューに入れる複数のプロデューサーがあります。

タスクが実行されるたびに( )、アイテムがある場合はキューからすべてrun() method executedのアイテムが抽出され、アイテムがない場合は何もしません。

キューがいっぱいの場合、プロデューサーはキューでスリープする必要があります。

この問題の私の解決策は次のとおりです。

TimerTaskを拡張するExtractTaskを作成します。ExtractTaskにはBlockingQueueが含まれます。各プロデューサーは、メソッドgetQueue()を実行することにより、キューインスタンスへの参照を受け取ります。プロデューサーはBlockingQueue.put()メソッドを実行します。コンシューマーは、run()内でBlockingQueue.poll()メソッドを実行します。

より良いデザインを提案できますか?私のデザインには問題のあるシナリオケースが含まれていますか?このデザインで発生する可能性のある同期の問題はありますか?

4

5 に答える 5

2

私は...するだろう:

  • デザイン内のタスクからキューを分離しておく、
  • ルックアップを行う代わりにキューを挿入し、
  • TimerTaskの代わりにSchedulerServiceを使用する

それ以外はあなたはそれを持っています。

Springに依存するリスクを冒したい場合は、SpringIntegrationを調べる必要があります。あなたが説明するすべてのコンポーネントはそこにあります。CamelやAkkaなど、他の多くのフレームワークを使用して問題を解決することもできます。ここでの私の主なポイントは、絶対に必要がない場合は、このコードを自分で保守しないことです。

免責事項:私はSpringIntegrationについていくらか偏見があります

于 2012-10-22T15:09:27.643 に答える
1

デザインは良さそうです。ここには詳細があまりないので、確認するのは難しいです。すべての依存関係をタイマータスクに挿入することをお勧めします。

また、多くのカスタムコードがなくても、ApacheCamelでこれを実現できる可能性があります。https://camel.apache.org/timer.htmlを参照してください

于 2012-10-22T15:07:32.613 に答える
1

あなたがデザインについて尋ねたので、私はいくつかのことを提案します:

  • 私は個人的にタイマータスクよりもエグゼキューターサービスに行きます。こちらをご覧ください。executorを使用すると、要件が変更された場合に、将来、別々のスレッドでタスクを実行できるようになります。
  • キューをタスクオブジェクトから分離してみてください。
  • 通常、コードでDIを使用して、テスト可能にします。
  • コンストラクターでキューを受け取るようなプロデューサーが必要です。
于 2012-10-22T15:20:43.423 に答える
1

あなたのデザインに基づいて、私は以下のようなものを考えることができます。ConsumerTaskはジェネリックスを利用できますが、Producerスレッドで同じことを行う方法を理解するのに苦労しました。生産者と消費者の両方が、生産/消費されるアイテムの数に制限があります。TimerTaskロジックは、TimerTask自体のrun()メソッドでタイマーをキャンセルして停止するために不可欠です。この場合、シャットダウンに使用できるのはPOISONPILLアプローチのみです。Executors.newSingleThreadExecutor()またはscheduledThreadPoolExecutor()を使用する場合、shutdown()メソッドとshutdownNow()メソッドを使用して、プロデューサーまたはコンシューマーのいずれかを停止できます。TimerTaskは、ConcurrentQueueの動作を確認するための良い例ですが、実動システムでは使用されません。

編集 プロデューサースレッドに汎用機能を追加します。コンストラクターは、キューにアイテムを追加するメソッドを実装するテンプレートクラスを取得するようになりました。プロデューサーがキューにアイテムを追加するたびに呼び出されるaddItem()メソッドを含む抽象クラスPASSWORDを定義しました。

import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConsumerTask<T> extends TimerTask {
    Timer timer;
    ConcurrentLinkedQueue<T> itemQueue;
    AtomicLong count = new AtomicLong(0);
    final long limit;

    public ConsumerTask(ConcurrentLinkedQueue<T> itemQ, long lim, int seconds) {
        limit = lim;
        timer = new Timer();
        timer.scheduleAtFixedRate(this, new Date(), seconds * 1000);
        itemQueue = itemQ;
    }

    public void run() {
        T item = itemQueue.peek();
        if (item != null) {
            if (count.incrementAndGet() <= limit) {
                System.out.println("Extracting Item : " + itemQueue.poll());
            } else {
                System.out
                        .println("Consumed : " + (count.get() - 1) + " items");
                timer.cancel();
            }

        }
    }

    public static void main(String args[]) throws InterruptedException {
        ConcurrentLinkedQueue<Integer> itemQ = new ConcurrentLinkedQueue<Integer>();
        ConsumerTask<Integer> ct = new ConsumerTask<Integer>(itemQ, 10, 1);

        new Thread(new Producer<Integer>(itemQ, new IntegerAddItem(itemQ), 20))
                .start();
        new Thread(ct).start();

    }
}

abstract class AddItem<T> {
    ConcurrentLinkedQueue<T> itemQ;
    T t;

    public AddItem(ConcurrentLinkedQueue<T> itemQ) {
        this.itemQ = itemQ;
    }

    abstract boolean addItem();

    public boolean addItem(T t) {
        return itemQ.add(t);
    }
}

class IntegerAddItem extends AddItem<Integer> {
    public IntegerAddItem(ConcurrentLinkedQueue<Integer> itemQ) {
        super(itemQ);
    }

    AtomicInteger item = new AtomicInteger(0);

    @Override
    boolean addItem() {
        return addItem(item.incrementAndGet());
    }

}

class Producer<T> implements Runnable {
    private final ConcurrentLinkedQueue<T> itemQueue;
    AtomicInteger item = new AtomicInteger(0);
    AtomicLong count = new AtomicLong(0);
    AddItem<T> addMethod;
    final long limit;

    public Producer(ConcurrentLinkedQueue<T> itemQ, AddItem<T> addMethod,
            long limit) {
        itemQueue = itemQ;
        this.limit = limit;
        this.addMethod = addMethod;
    }

    public void run() {
        while (count.getAndIncrement() < limit) {
            addMethod.addItem();
            try {
                Thread.sleep(new Random().nextInt(5000));
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                Thread.currentThread().interrupt();
            }

        }
    }
}
于 2012-10-22T16:18:38.370 に答える
0

タイマーが実行されると、コンシューマーがすべてのアイテムを抽出すると言いました。

キューからすべてのアイテムを抽出する操作はブロック操作ではなく、poll()ブロックメソッド呼び出しの繰り返しであることに注意する必要があります。これは、アイテムの抽出時にプロデューサーがキューにアイテムを追加できることを意味します。

于 2012-10-22T15:17:36.550 に答える