リストから名前を読み取り、外部サーバーを呼び出してtrue / falseステータスチェックを行い、trueステータスのユーザーにアクションを実行するシステムがあります。外部サーバーへの呼び出しには時間がかかるため、すべてを1つのスレッドで実行するのはあまり効率的ではありません。
私は現在、多くのコンシューマースレッドがリストから名前を読み取り、外部サーバーを呼び出し、有効な名前をブロックキューに入れ、単一のコンシューマーがキューからアイテムを選択してアクションを実行するプロデューサー/コンシューマーシステムとして実装しようとしています。 。ただし、残念ながら、システムが完了するまで実行されることもあれば、無期限にハングすることもあります。テストコードは次のとおりです
public class SubscriberTest {
static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
Random rand = new Random();
public SubscriberTest(int i) {
for (int j = 0; j < i; j++) {
subscribed.add("I love:" + j);
}
}
public SubscriberTest(Queue<String> subs) {
subscribed = subs;
}
public static void main(String[] args) {
SubscriberTest fun = new SubscriberTest(10000);
System.out.println(subscribed.size());
ExecutorService producers = Executors.newCachedThreadPool();
ExecutorService consumers = Executors.newSingleThreadExecutor();
Consumer consumer = fun.new Consumer();
Producer producer = fun.new Producer();
while (!subscribed.isEmpty()) {
producers.execute(producer);
consumers.execute(consumer);
}
producers.shutdown();
consumers.shutdown();
System.out.println("finally");
}
// take names from subscribed and get status
class Producer implements Runnable {
public void run() {
String x = subscribed.poll();
System.out.println("Producer: " + x + " " + Thread.currentThread().getName());
try {
if (getStatus(x)) {
valid.put(x);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// this is a call to an external server
private boolean getStatus(String x) {
return rand.nextBoolean();
}
}
// takes names from valid queue and save them
class Consumer implements Runnable {
public void run() {
try {
System.out.println("Consumer: " + valid.take() + " " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
どこが悪いのか教えてください。