1

次の方法を検討してください。

public void add(final List<ReportingSTG> message) {
        if(stopRequested.get()) {
            synchronized (this) {
                if(stopRequested.get()) {
                    retryQueue.put(message);
                }
            }
        }
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
            synchronized (this) {
                if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                    final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                    messages.clear();
                    if(processors.size()>=numOfProcessors) {
                        waitingThreads.incrementAndGet();
                        waitForProcessor();
                        waitingThreads.decrementAndGet();
                    }                   
                    startProcessor(clone);
                }
            }

        }
    }

特にこれらの2行:

 1:   final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
 2:   messages.clear();

スレッドAが同期ブロックに入り、現在のオブジェクトのロックを取得した場合、これは、このオブジェクトのインスタンスプロパティの状態を、同期ブロック外の他のスレッドが変更できないことを意味しますか(スレッドAが同期ブロックにある間)?

たとえば、スレッドAの実行行1->スレッドBがメソッドに入り、新しいリストエントリを追加しました(messages.add(message))->実行された行2->スレッドBによって追加されたエントリを(他のエントリと一緒に)削除しました)。このシナリオは可能ですか?または、スレッドBは、スレッドAによってロックが解放されるまで待機し、その後、リストエントリを削除します。

メッセージは非静的synchronizedListです

UPD:更新された方法、可能な解決策:

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    while (addLock.get()){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {}
    }

    messages.add(message);

    if(messages.size() >= batchSize && waitingThreads.get() == 0) {
        synchronized (this) {
            if(messages.size() >= batchSize && waitingThreads.get() == 0) {

                addLock.set(true);
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                addLock.set(false);

                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
        }

    }
}

addLock-AtomicBoolean、デフォルトではfalse

4

2 に答える 2

3

説明されているシナリオは可能です。つまり、メッセージが失われる可能性があります。

このキーワードにより、セクションを同時にsynchronized実行する 2 つのスレッドが存在しないことが保証されます。ブロック内で操作されるオブジェクトの別のスレッドによるsynchronized変更は防止されません(この別のスレッドがオブジェクトにアクセスできるようになるとすぐに)。synchronized


これは、追加とクリアを同期するため、可能な解決策です。

private Object lock = new Object();

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    synchronized(lock){
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
    }
}
于 2013-02-28T11:58:36.680 に答える
1

DoubleBufferedList最近、クラスをまとめました。おそらくそれを使用すると、問題を完全に回避できます。その名前が示すように、ダブル バッファリング アルゴリズムを実装しますが、リスト用です。

このクラスを使用すると、多数のプロデューサー スレッドと多数のコンシューマー スレッドを使用できます。各プロデューサー スレッドは、現在のリストに追加できます。各コンシューマ スレッドは、処理のために現在のリスト全体を取得します。

これもロックを使用せず、アトミックのみを使用するため、効率的に実行する必要があります。

これの多くはテスト コードであることに注意してください。コメントの後のすべてを削除することはでき// TESTINGますが、テストの厳しさに安心感を覚えるかもしれません。

public class DoubleBufferedList<T> {
  // Atomic reference so I can atomically swap it through.
  // Mark = true means I am adding to it so unavailable for iteration.
  private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

  // Factory method to create a new list - may be best to abstract this.
  protected List<T> newList() {
    return new ArrayList<>();
  }

  // Get and replace the current list.
  public List<T> get() {
    // Atomically grab and replace the list with an empty one.
    List<T> empty = newList();
    List<T> it;
    // Replace an unmarked list with an empty one.
    if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
      // Failed to replace! 
      // It is probably marked as being appended to but may have been replaced by another thread.
      // Return empty and come back again soon.
      return Collections.EMPTY_LIST;
    }
    // Successfull replaced an unmarked list with an empty list!
    return it;
  }

  // Grab and lock the list in preparation for append.
  private List<T> grab() {
    List<T> it;
    // We cannot fail so spin on get and mark.
    while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
      // Spin on mark.
    }
    return it;
  }

  // Release the list.
  private void release(List<T> it) {
    // Unmark it. Should never fail because once marked it will not be replaced.
    if (!list.attemptMark(it, false)) {
      throw new IllegalMonitorStateException("it changed while we were adding to it!");
    }
  }

  // Add an entry to the list.
  public void add(T entry) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entry.
      it.add(entry);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add many entries to the list.
  public void add(List<T> entries) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entries.
      it.addAll(entries);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add a number of entries.
  public void add(T... entries) {
    // Make a list of them.
    add(Arrays.asList(entries));
  }
  // TESTING.
  // How many testers to run.
  static final int N = 10;
  // The next one we're waiting for.
  static final AtomicInteger[] seen = new AtomicInteger[N];
  // The ones that arrived out of order.
  static final Set<Widget>[] queued = new ConcurrentSkipListSet[N];

  static {
    // Populate the arrays.
    for (int i = 0; i < N; i++) {
      seen[i] = new AtomicInteger();
      queued[i] = new ConcurrentSkipListSet();
    }
  }

  // Thing that is produced and consumed.
  private static class Widget implements Comparable<Widget> {
    // Who produced it.
    public final int producer;
    // Its sequence number.
    public final int sequence;

    public Widget(int producer, int sequence) {
      this.producer = producer;
      this.sequence = sequence;
    }

    @Override
    public String toString() {
      return producer + "\t" + sequence;
    }

    @Override
    public int compareTo(Widget o) {
      // Sort on producer
      int diff = Integer.compare(producer, o.producer);
      if (diff == 0) {
        // And then sequence
        diff = Integer.compare(sequence, o.sequence);
      }
      return diff;
    }
  }

  // Produces Widgets and feeds them to the supplied DoubleBufferedList.
  private static class TestProducer implements Runnable {
    // The list to feed.
    final DoubleBufferedList<Widget> list;
    // My ID
    final int id;
    // The sequence we're at
    int sequence = 0;
    // Set this at true to stop me.
    public volatile boolean stop = false;

    public TestProducer(DoubleBufferedList<Widget> list, int id) {
      this.list = list;
      this.id = id;
    }

    @Override
    public void run() {
      // Just pump the list.
      while (!stop) {
        list.add(new Widget(id, sequence++));
      }
    }
  }

  // Consumes Widgets from the suplied DoubleBufferedList
  private static class TestConsumer implements Runnable {
    // The list to bleed.
    final DoubleBufferedList<Widget> list;
    // My ID
    final int id;
    // Set this at true to stop me.
    public volatile boolean stop = false;

    public TestConsumer(DoubleBufferedList<Widget> list, int id) {
      this.list = list;
      this.id = id;
    }

    @Override
    public void run() {
      // The list I am working on.
      List<Widget> l = list.get();
      // Stop when stop == true && list is empty
      while (!(stop && l.isEmpty())) {
        // Record all items in list as arrived.
        arrived(l);
        // Grab another list.
        l = list.get();
      }
    }

    private void arrived(List<Widget> l) {
      for (Widget w : l) {
        // Mark each one as arrived.
        arrived(w);
      }
    }

    // A Widget has arrived.
    private static void arrived(Widget w) {
      // Which one is it?
      AtomicInteger n = seen[w.producer];
      // Don't allow multi-access to the same producer data or we'll end up confused.
      synchronized (n) {
        // Is it the next to be seen?
        if (n.compareAndSet(w.sequence, w.sequence + 1)) {
          // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
          for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
            Widget it = i.next();
            // Is it in sequence?
            if (n.compareAndSet(it.sequence, it.sequence + 1)) {
              // Done with that one too now!
              i.remove();
            } else {
              // Found a gap! Stop now.
              break;
            }
          }
        } else {
          // Out of sequence - Queue it.
          queued[w.producer].add(w);
        }
      }
    }
  }

  // Main tester
  public static void main(String args[]) {
    try {
      System.out.println("DoubleBufferedList:Test");
      // Create my test buffer.
      DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
      // All threads running - Producers then Consumers.
      List<Thread> running = new LinkedList<>();
      // Start some producer tests.
      List<TestProducer> producers = new ArrayList<>();
      for (int i = 0; i < N; i++) {
        TestProducer producer = new TestProducer(list, i);
        Thread t = new Thread(producer);
        t.setName("Producer " + i);
        t.start();
        producers.add(producer);
        running.add(t);
      }

      // Start the same number of consumers.
      List<TestConsumer> consumers = new ArrayList<>();
      for (int i = 0; i < N; i++) {
        TestConsumer consumer = new TestConsumer(list, i);
        Thread t = new Thread(consumer);
        t.setName("Consumer " + i);
        t.start();
        consumers.add(consumer);
        running.add(t);
      }
      // Wait for a while.
      Thread.sleep(5000);
      // Close down all.
      for (TestProducer p : producers) {
        p.stop = true;
      }
      for (TestConsumer c : consumers) {
        c.stop = true;
      }
      // Wait for all to stop.
      for (Thread t : running) {
        System.out.println("Joining " + t.getName());
        t.join();
      }
      // What results did we get?
      for (int i = 0; i < N; i++) {
        // How far did the producer get?
        int gotTo = producers.get(i).sequence;
        // The consumer's state
        int seenTo = seen[i].get();
        Set<Widget> queue = queued[i];
        if (seenTo == gotTo && queue.isEmpty()) {
          System.out.println("Producer " + i + " ok.");
        } else {
          // Different set consumed as produced!
          System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
        }
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
  }
}
于 2013-02-28T12:14:22.317 に答える