4

メッセージを継続的に受信するアプリケーションを開発しています。これらのメッセージをメモリ内のデータ構造(リストなど)に保存しています。これらのメッセージをファイルに書き込みたいのですが、リストサイズが100メッセージなどのしきい値に達した後でのみです(メッセージに対してバッチ処理を実行します)。1つの方法は、すべてのメッセージを受信した後にリストサイズを確認し、しきい値に達した場合にファイルにメッセージを書き込む関数を呼び出すことです。しかし、このアプローチの問題は次のとおりです。

  1. 呼び出し元の関数は、すべてのメッセージがファイルに書き込まれるまで無期限に待機する必要がある場合があります
  2. 受信メッセージは、処理中に失われるか、リストに保存されるのを待つ必要がある場合があります。

他の方法は、新しいスレッドを生成することです。これにより、メッセージがファイルに個別に書き込まれます。しかし、書き込み操作を実行するためにリスト(メッセージを含む)をスレッドに渡すと、継続的に着信する新しいメッセージで更新されます。その結果、新しく到着したメッセージも予期しないファイルに書き込まれます。

新しいメッセージを次のバッチで書き込むつもりなので、これは発生しないはずです。

誰かが私にこの要件の解決策、または私の問題を解決できる上記のアプローチの改善を提案できますか?

4

5 に答える 5

5

よりクリーンな解決策は、自動バッチ処理をサポートすることです。つまり、バッチのサイズが受信データのレートに応じて調整されます。

これを行うには、BlockingQueueを使用できます

// unbound queue will not block the producer.
final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();

// to add an element.
queue.add(element);

// to get a batch of data
List<T> list = new ArrayList<T>(maxElements);
while(writing) {
    T t = queue.take(); // wait for at least one element.
    list.add(t);
    queue.drainTo(list, maxElements-1);
    // process list, e.g. write to a file.
    list.clear();
}

このアプローチの利点は、プロデューサーが非常に遅い場合、要素が不当に長く保持されることはないということですが、レートが上がると、バッチサイズはプロデューサーに追いつくものに自然に大きくなります。つまり、そうする必要はありません。使用するのに最適なバッチサイズを決定します。

于 2012-10-09T14:03:24.387 に答える
1

次のアプローチをお勧めします。

  1. 内のメッセージリストへの参照を保持しますAtomicReference
  2. リストが十分にいっぱいになったら、新しい空のリストに置き換えます。
  3. メッセージをファイルに保存するワーカースレッドに完全なリストを渡します。

単一のスレッドからリストに書き込む場合は、の代わりにプレーン参照を使用するだけで十分AtomicReferenceです。

于 2012-10-09T14:00:54.760 に答える
1

Javaでオブジェクトを渡すことは決してなく、参照(またはプリミティブ値)のみを渡すことを理解することが重要です。

オプション:

  • リストのコピーを作成し、そのコピーへの参照を新しいスレッドに渡します
  • プロデューサー/コンシューマーキューを使用すると、「プロデュース」スレッドはキューに値を追加するだけであり、コンシューマースレッドキューからアイテムを取得してディスクに書き込むだけです。もちろん、キューがそれ以上のエントリの受け入れを停止する前に、キューが潜在的にどれだけ大きくなるかを考えたいと思うでしょう。

java.util.concurrentパッケージ内のクラスを使用して実装する、後者のアプローチをお勧めします。特にBlockingQueue<E>実装。

于 2012-10-09T14:02:19.070 に答える
0

古いメッセージリストをファイル書き込みスレッドに渡したら、メインメッセージ受信プロセスに新しいメッセージリストを作成させてはどうでしょうか。

于 2012-10-09T14:02:58.867 に答える
0

sayオブジェクトを受け入れるConditionBoundedQueueを使用してカスタムを実装し、一度に書き込むことができます。100

BoundedQueueこれで、このクラスインスタンスをさまざまなスレッドと共有し てオブジェクトを配置でき、呼び出しwriteAll()たいまでメソッドを呼び出すスレッドができます。

BoundedBuffer boundedBuffer  = new BoundedBuffer();
boundedBuffer.put("test"); .......

スレッドを書くことから以下を行います

boundedBuffer.writeAll();

以下はサンプルコードです

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition full = lock.newCondition();
final Condition empty = lock.newCondition();

final Object[] items = new Object[100];
int count;

public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
        while (count == items.length) {
            empty.signal();
            full.await();
        }
        items[count] = x;
        ++count;
    } finally {
        lock.unlock();
    }
}

public void writeAll() throws InterruptedException {
    lock.lock();
    try {
        while (count < items.length)
            empty.await();
        // Write to file here After write finished signal full condition
        count = 0;
        full.signal();

    } finally {
        lock.unlock();
    }
}
}
于 2012-10-09T14:03:08.940 に答える