0

私はJavaの生産者/消費者問題の変種に取り組んでいます。オブジェクトを作成するプロデューサースレッドがあります。オブジェクトは優先度ブロックキューに入れられ、メインコンテナーであるコントローラー(制限付きバッファー)に渡されます。

キューの理由は、メインコンテナにオブジェクトAの特定の割合がある場合、タイプBのオブジェクト、および確認を求められた他のいくつかのシナリオのみを受け入れるためです。コードの何が問題になっているのかを理解するのに問題があります。デバッガーは、InQueueのin.offerとProducerのin.pushからジャンプしています。任意の方向性やアドバイスをいただければ幸いです。

    import java.util.concurrent.PriorityBlockingQueue;

        public class InQueue implements Runnable {

        Controller c;
        private PriorityBlockingQueue in;

        public InQueue(Controller c) {
            this.c = c;
            in = new PriorityBlockingQueue();
        }

        public void push(C c) {

            in.offer(c);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        public void run() {
            while (true) {
                try {
                    C temp = (C) in.take(); //will block if empty
                    c.arrive(temp);
                } catch (InterruptedException e) {} // TODO
            }
        }
    }

public class Controller {

    private BoundedBuffer buffer;
    private int used;


    Controller(int capacity) {
        this.buffer = new BoundedBuffer(capacity);
        used = 0;
    }


    public void arrive(C c) {
        try {
            buffer.put(c);
            used++;
        } catch (InterruptedException e) { } //TODO
    }

    public C depart() {
        C temp = null; //BAD IDEA?
        try {
            temp = (C)buffer.take();
            used--;
        } catch (InterruptedException e) { } //TODO
        return temp; //could be null
    }
}
4

1 に答える 1

0

ジェネリックの使用方法が間違っているため、コードがコンパイルされていません。もう 1 つのことは、BoundedBuffer のデフォルトの実装がないことです。以下では、キューをブロックすることで、以下の生産者 - 消費者の問題の実用的な実装を作成しました。よく見て間違いを正してください。

package concurrency;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Producer<T> {
    private final BlockingQueue<T> queue;
    private final Consumer consumer;
    private static volatile boolean isShutdown;
    private final static Object lock = new Object();

    public Producer() {
        this.queue = new LinkedBlockingQueue<T>();
        this.consumer = new Consumer();
    }

    public void start() {
        consumer.start();
    }

    public void stop() {
        synchronized (lock) {
            isShutdown = true;
        }
        consumer.interrupt();
    }

    public void put(T obj) throws InterruptedException {
        synchronized (lock) {
            if (isShutdown)
                throw new IllegalStateException("Consumer Thread is not active");
        }
        queue.put(obj);
    }

    private class Consumer extends Thread {

        public void run() {
            while (true) {
                synchronized (lock) {
                    if (isShutdown)
                        break;
                }

                T t = takeItem();
                // do something with 't'
                if(t!=null)
                printItem(t);
            }
        }

        private void printItem(T t) {
            System.out.println(t);
        }

        private T takeItem() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return null;
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Producer<Integer> producer = new Producer<Integer>();
        producer.start();
        for (int i = 0; i <20; i++) {
            producer.put(i);
            if (i >= 7)
                Thread.sleep(500);
        }
        producer.stop();
    }
}
于 2012-12-12T10:36:07.943 に答える