-1

マルチスレッドの概念を使用して Java で Producer Consumer の問題を実装するプログラムを作成しています。以下は、私がそれを行うことになっている方法の詳細です。

1) メイン スレッドは、コマンド ライン引数として指定された容量でバッファを作成する必要があります。プロデューサ スレッドとコンシューマ スレッドの数も、コマンド ライン引数として指定されます。各生産者スレッドと消費者スレッドに一意の番号を割り当てることになっています。プロデューサー スレッドとコンシューマー スレッドに一意の番号を割り当てるにはどうすればよいですか?

2) 生産者スレッドは無限ループで動作します。次の形式のデータ項目 (文字列) を生成します<producer number>_<data item number>。たとえば、スレッド番号 1 の最初のデータ項目は 1_1 になり、スレッド番号 3 の 2 番目のデータ項目は 3_2 になります。このような形式でデータ項目を作成するにはどうすればよいですか?

3) 次に、Producer スレッドがエントリを Producer ログ ファイルに書き込みます (< Producer number > "Generated" <data item>)。ログ エントリを書き込むと、バッファに挿入しようとします。挿入が成功すると、ログ ファイルにエントリが作成されます ( <producer number> <data item>「挿入成功」)。そのようなコードを書くにはどうすればよいですか?

以下は私が書いたJavaコードです。

import java.util.*;
import java.util.logging.*;

public class PC2
{
    public static void main(String args[])
    {
            ArrayList<Integer> queue = new ArrayList<Integer>();

            int size = Integer.parseInt(args[2]);
            Thread[] prod = new Thread[Integer.parseInt(args[0])];
            Thread[] cons = new Thread[Integer.parseInt(args[1])];

            for(int i=0; i<prod.length; i++)
            {
                    prod[i] = new Thread(new Producer(queue, size));
                    prod[i].start();
            }

            for(int i=0; i<cons.length; i++)
            {
                    cons[i] = new Thread(new Consumer(queue, size));
                    cons[i].start();
                }

    }
}

class Producer extends Thread
{
    private final ArrayList<Integer> queue;
    private final int size;

    public Producer(ArrayList<Integer> queue, int size)
    {
            this.queue = queue;
            this.size = size;
    }

    public void run()
    {
            while(true){
            for(int i=0; i<size; i++)
            {
                    System.out.println("Produced: "+i+" by id " +Thread.currentThread().getId());
try
                    {
                            produce(i);
                            Thread.sleep(3000);
                    }
                    catch(Exception e)
                    {
                            Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
                    }
            }}
    }


    public void produce(int i) throws InterruptedException
    {
            while(queue.size() == size)
            {
                    synchronized(queue)
                    {
                            System.out.println("Queue is full "+Thread.currentThread().getName() +" is waiting, size: "+queue.size());
                            queue.wait();
                       }
            }
            synchronized(queue)
            {
                    queue.add(i);
                    queue.notifyAll();
            }
    }
}

class Consumer extends Thread
{
    private final ArrayList<Integer> queue;
    private final int size;

    public Consumer(ArrayList<Integer> queue, int size)
    {
            this.queue = queue;
            this.size = size;
    }

    public void run()
    {
            while(true)
            {
                    try
                    {       System.out.println("Consumed: "+consume());
                            Thread.sleep(1000);
                    }
                    catch(Exception e)
                    {
                            Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e);
                    }
            }
    }

    public int consume() throws InterruptedException
    {
            while(queue.isEmpty())
            {
                    synchronized(queue)
                    {
                            System.out.println("Queue is empty "+Thread.currentThread().getName()+" is waiting, size: "+queue.size());
                            queue.wait();
                        }
            }

            synchronized (queue)
            {
                    queue.notifyAll();
                    System.out.println("Consumed by id "+Thread.currentThread().getId());
                    return (Integer) queue.remove(0);

            }
    }
}

上記の手順を実行するにはどうすればよいですか?

4

5 に答える 5

1

プロデューサ コンシューマの問題の最適な解決策は、BlockingQueue です。私はいくつかのことをテストしていたので、同じ種類のプログラムを設計し、必要に応じて変更しました。

それが役立つかどうかを確認してください。

import java.util.concurrent.*;
public class ThreadingExample {

    public static void main(String args[]){
        BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(100);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(blockingQueue));
        exec.execute(new Consumer(blockingQueue));
    }

}
class Message{
    private static int count=0;
    int messageId;
    Message(){
        this.messageId=count++;
        System.out.print("message Id"+messageId+" Created ");
    }
}
class Producer implements Runnable{

    private BlockingQueue<Message> blockingQueue;
    Producer(BlockingQueue<Message> blockingQueue){
        this.blockingQueue=blockingQueue;
    }

    @Override
    public void run(){
        while(!Thread.interrupted()){
            System.out.print("Producer Started");
            try {
                blockingQueue.put(new Message());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Producer Done");
        }
    }
}

class Consumer implements Runnable{
    private BlockingQueue<Message> blockingQueue;
    Consumer(BlockingQueue<Message> blockingQueue){
        this.blockingQueue=blockingQueue;
    }

    @Override
    public void run(){
        while(!Thread.interrupted()){
            System.out.print("Concumer Started");
            try{
                Message message  = blockingQueue.take();
                System.out.print("message Id"+message.messageId+" Consumed ");
            }
            catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println("Concumer Done");
        }
    }
}
于 2013-10-31T16:11:53.400 に答える
1

各生産者スレッドと消費者スレッドに一意の番号を割り当てることになっています。プロデューサー スレッドとコンシューマー スレッドに一意の番号を割り当てるにはどうすればよいですか?

Producer/Consumer クラスにインスタンス (非静的) 変数を追加します。新しい Producer/Consumer オブジェクトを初期化するときは、一意の番号を渡します。int counterメインクラスの を使用して、現在の番号を追跡できます。

2) 生産者スレッドは無限ループで動作します。次の形式のデータ項目 (文字列) を生成します: < プロデューサー番号 >_< データ項目番号 > 。たとえば、スレッド番号 1 の最初のデータ項目は 1_1 になり、スレッド番号 3 の 2 番目のデータ項目は 3_2 になります。このような形式でデータ項目を作成するにはどうすればよいですか?

同期されたメソッドやアトミック変数を使用してください。Java Concurrencyを調べてください。

3) 次に、Producer スレッドがエントリを Producer ログ ファイルに書き込みます (< Producer Number > "Generated" < Data Item >)。ログ エントリを書き込むと、バッファに挿入しようとします。挿入が成功すると、ログ ファイルにエントリが作成されます (< プロデューサー番号 > < データ項目 > 「挿入成功」)。そのようなコードを書くにはどうすればよいですか?

私の答えは前の質問と同じです。Java の並行性について読んでください。同期、ロック、およびアトミック変数について 1 時間ほど読んでいただければ、プログラムを簡単に作成できることを保証します。

于 2013-10-31T01:49:19.280 に答える
0

以下のコードを参照してください。コマンドライン引数に基づいて定数値を変更できます。コードをテストしましたが、要件どおりに機能します。

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumerProblem {
    public static int CAPACITY = 10; // At a time maximum of 10 tasks can be
                                        // produced.
    public static int PRODUCERS = 2;
    public static int CONSUMERS = 4;

    public static void main(String args[]) {
        Queue<String> mTasks = new LinkedList<String>();
        for (int i = 1; i <= PRODUCERS; i++) {
            Thread producer = new Thread(new Producer(mTasks));
            producer.setName("Producer " + i);
            producer.start();
        }
        for (int i = 1; i <= CONSUMERS; i++) {
            Thread consumer = new Thread(new Consumer(mTasks));
            consumer.setName("Consumer " + i);
            consumer.start();
        }

    }

}

class Producer implements Runnable {

    Queue<String> mSharedTasks;
    int taskCount = 1;

    public Producer(Queue<String> mSharedTasks) {
        super();
        this.mSharedTasks = mSharedTasks;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (mSharedTasks) {
                try {
                    if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY) {
                        System.out.println("Producer Waiting!!");
                        mSharedTasks.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            while (mSharedTasks.size() != ProducerConsumerProblem.CAPACITY) {

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }

                String produceHere = Thread.currentThread().getName()
                        + "_Item number_" + taskCount++;

                synchronized (mSharedTasks) {
                    mSharedTasks.add(produceHere);
                    System.out.println(produceHere);
                    if (mSharedTasks.size() == 1) {
                        mSharedTasks.notifyAll(); // Informs consumer that there
                                                    // is something to consume.
                    }
                }
            }

        }
    }
}

class Consumer implements Runnable {
    Queue<String> mSharedTasks;

    public Consumer(Queue<String> mSharedTasks) {
        super();
        this.mSharedTasks = mSharedTasks;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (mSharedTasks) {
                if (mSharedTasks.isEmpty()) { // Checks whether there is no task
                                                // to consume.
                    try {
                        mSharedTasks.wait(); // Waits for producer to produce!
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }
            while (!mSharedTasks.isEmpty()) { // Consumes till task list is
                                                // empty
                try {
                    // Consumer consumes late hence producer has to wait...!
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                synchronized (mSharedTasks) {

                    System.out.println(Thread.currentThread().getName()
                            + " consumed " + mSharedTasks.poll());
                    if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY - 1)
                        mSharedTasks.notifyAll();
                }

            }

        }
    }

}
于 2015-07-03T12:06:36.610 に答える
-1
public class ProducerConsumerTest {

    public static void main(String[] args) {
        CubbyHole c = new CubbyHole();
        Producer p1 = new Producer(c, 1);
        Consumer c1 = new Consumer(c, 1);
        p1.start();
        c1.start();
    }
}

class CubbyHole {

    private int contents;
    private boolean available = false;

    public synchronized int get() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notifyAll();
        return contents;
    }

    public synchronized void put(int value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        contents = value;
        available = true;
        notifyAll();
    }
}

class Consumer extends Thread {

    private CubbyHole cubbyhole;
    private int number;

    public Consumer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        int value = 0;
        for (int i = 0; i < 10; i++) {
            value = cubbyhole.get();
            System.out.println("Consumer #"
                    + this.number
                    + " got: " + value);
        }
    }
}

class Producer extends Thread {

    private CubbyHole cubbyhole;
    private int number;

    public Producer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            cubbyhole.put(i);
            System.out.println("Producer #" + this.number
                    + " put: " + i);
            try {
                sleep((int) (Math.random() * 100));
            } catch (InterruptedException e) {
            }
        }
    }
}
于 2015-08-03T06:04:47.270 に答える