cassandra クラスターに接続するために datastax Java ドライバー 3.1.0 を使用しています。cassandra クラスターのバージョンは 2.0.10 です。QUORUMの一貫性を保ちながら非同期に書いています。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
上記の save メソッドは、複数のスレッドから非常に高速で呼び出されます。
質問:
executeAsync
Cassandra に非同期で書き込むメソッドへのリクエストを抑制したいと考えています。私の Cassandra クラスターが処理できる速度よりも非常に高速で書き込みを行うと、エラーがスローされ始めます。すべての書き込みが損失なく cassandra に正常に行われるようにする必要があります。
この投稿を見ましたが、解決策はSemaphore
固定数の許可で使用することです。しかし、それを実装するための最良の方法と方法がわかりません。私は以前にセマフォを使用したことがありません。これがロジックです。誰でも私のコードにセマフォベースの例を提供できますか、またはより良い方法/オプションがある場合は、私にも知らせてください。
データローダ プログラムを作成するコンテキストでは、次のようなことができます。
- 物事をシンプルに保つには、セマフォまたは固定数の許可を持つその他の構造を使用します (これは、インフライト リクエストの最大数になります)。executeAsync を使用してクエリを送信するたびに、許可を取得します。セマフォからパーミットを取得してクエリを実行するスレッドは 1 つだけ必要です (ただし、これを実行する # cpu コア サイズのプールを導入する必要がある場合があります)。利用可能な許可が得られるまで、取得時にブロックされます。
- executeAsync から返される未来には、Futures.addCallback を使用します。コールバックは、onSuccess と onFailure の両方のケースで Sempahore.release() を呼び出す必要があります。パーミットを解放することで、ステップ 1 のスレッドが続行し、次のリクエストを送信できるようになります。
また、使用について話している他の投稿をいくつか見RingBuffer
ましGuava RateLimitter
たが、どちらが優れていて、使用する必要がありますか? 以下は私が考えることができるオプションです:
- セマフォの使用
- リングバッファの使用
- Guava レート リミッターの使用
リクエストを抑制したり、cassandra 書き込みのバックプレッシャーを取得したり、すべての書き込みが cassandra に正常に送信されるようにしたりする方法の例を教えてください。