17

cassandra クラスターに接続するために datastax Java ドライバー 3.1.0 を使用しています。cassandra クラスターのバージョンは 2.0.10 です。QUORUMの一貫性を保ちながら非同期に書いています。

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

上記の save メソッドは、複数のスレッドから非常に高速で呼び出されます。

質問:

executeAsyncCassandra に非同期で書き込むメソッドへのリクエストを抑制したいと考えています。私の Cassandra クラスターが処理できる速度よりも非常に高速で書き込みを行うと、エラーがスローされ始めます。すべての書き込みが損失なく cassandra に正常に行われるようにする必要があります。

この投稿を見ましたが、解決策はSemaphore固定数の許可で使用することです。しかし、それを実装するための最良の方法と方法がわかりません。私は以前にセマフォを使用したことがありません。これがロジックです。誰でも私のコードにセマフォベースの例を提供できますか、またはより良い方法/オプションがある場合は、私にも知らせてください。

データローダ プログラムを作成するコンテキストでは、次のようなことができます。

  • 物事をシンプルに保つには、セマフォまたは固定数の許可を持つその他の構造を使用します (これは、インフライト リクエストの最大数になります)。executeAsync を使用してクエリを送信するたびに、許可を取得します。セマフォからパーミットを取得してクエリを実行するスレッドは 1 つだけ必要です (ただし、これを実行する # cpu コア サイズのプールを導入する必要がある場合があります)。利用可能な許可が得られるまで、取得時にブロックされます。
  • executeAsync から返される未来には、Futures.addCallback を使用します。コールバックは、onSuccess と onFailure の両方のケースで Sempahore.release() を呼び出す必要があります。パーミットを解放することで、ステップ 1 のスレッドが続行し、次のリクエストを送信できるようになります。

また、使用について話している他の投稿をいくつか見RingBufferましGuava RateLimitterたが、どちらが優れていて、使用する必要がありますか? 以下は私が考えることができるオプションです:

  • セマフォの使用
  • リングバッファの使用
  • Guava レート リミッターの使用

リクエストを抑制したり、cassandra 書き込みのバックプレッシャーを取得したり、すべての書き込みが cassandra に正常に送信されるようにしたりする方法の例を教えてください。

4

2 に答える 2

9

正式な回答ではありませんが、参考になるかもしれません。まず、クエリをすぐに実行できない場合にどうするかを検討する必要があります。選択したレート制限に関係なく、Cassandra に書き込めるレートよりも高いレートでリクエストを受け取ると、最終的には待機中のリクエストでプロセスが詰まるでしょう。その時点で、クライアントにリクエストをしばらく保留するように伝える必要があります (「プッシュバック」)。たとえば、HTTP 経由で送信された場合、応答ステータスは 429 "Too Many Requests" になります。同じプロセスでリクエストを生成する場合は、許容できる最長のタイムアウトを決定します。とはいえ、Cassandra が追いつかない場合は、スケーリング (または調整) するときです。

おそらく、レート制限を実装する前に、saveメソッドを呼び出す前に (Thread.sleep(...) を使用して) 実験してスレッドに人為的な遅延を追加し、問題が解決するかどうか、または何か他のものが必要かどうかを確認する価値があります。

エラーを返すクエリは、Cassandra からのバックプレッシャーです。ただし、 RetryPolicyを選択または実装して、失敗したクエリをいつ再試行するかを決定できます。

また、接続プールのオプション(および特にプールの監視と調整) を確認することもできます。接続ごとの非同期リクエストの数を調整できます。ただし、ドキュメントによると、Cassandra 2.x の場合、このパラメーターは 128 に制限されており、変更すべきではありません (私はそれを試してみたいと思います:)

セマフォを使用した実装は次のようになります

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}

(実際のコードでは、パーミットを解放してラップされたメソッドを呼び出すラッパー コールバックを作成します)

Guava の RateLimiter はセマフォに似ていますが、使用率の低い期間の後に一時的なバーストを許可し、タイミングに基づいてリクエストを制限します (アクティブなクエリの総数ではありません)。

ただし、リクエストはさまざまな理由で失敗する可能性があるため、(断続的なエラーの場合に) 再試行する方法を計画しておくことをお勧めします。

あなたの場合は適切ではないかもしれませんが、キューまたはバッファを使用してリクエストをキューに入れようとします(例java.util.concurrent.ArrayBlockingQueue)。「バッファがいっぱい」は、クライアントが要求を待つか放棄する必要があることを意味します。バッファーは、失敗した要求を再度キューに入れるためにも使用されます。ただし、より公平を期すために、失敗したリクエストはおそらくキューの前に配置して、最初に再試行する必要があります。また、キューがいっぱいで、同時に新しい失敗したリクエストがある場合の状況を何らかの方法で処理する必要があります。次に、シングルスレッド ワーカーがキューからリクエストを取得し、Cassandra に送信します。多くのことを行うべきではないため、ボトルネックになる可能性は低いです。このワーカーは、たとえば のタイミングに基づいて、独自のレート制限を適用することもできますcom.google.common.util.concurrent.RateLimiter

メッセージをできるだけ失わないようにしたい場合は、Cassandra の前に持続性を持つメッセージ ブローカー (Kafka など) を配置できます。このようにして、着信メッセージは、Cassandra が長時間停止しても存続できます。しかし、あなたの場合はやり過ぎだと思います。

于 2017-01-02T12:56:50.800 に答える
2

ブロッキング キューを使用するだけで問題ありません。フューチャーはスレッド化され、そこでのコールバック (成功と失敗) はコンシューマーとして機能し、save メソッドを呼び出す場所はどこからでもプロデューサーとして機能します。

さらに良い方法は、完全なリクエスト自体をキューに入れ、デキューごとに保存して1つずつデキューすることです。

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
          queue.take();
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
          queue.take();
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
}

public void invokeSaveInLoop(){
    Object dummyObj = new Object();
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
    for(int i=0; i< 1000; i++){
        save("process", clientid, deviceid, queue);
        queue.put(dummyObj);
    }
}

さらに進んで、途中でクラスターの負荷を確認したい場合

public static String getCurrentState(){    
StringBuilder response = new StringBuilder();
            response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
            final LoadBalancingPolicy loadBalancingPolicy =
                    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
            final PoolingOptions poolingOptions =
                    cluster.getConfiguration().getPoolingOptions();
            Session.State state = session.getState();
            for (Host host : state.getConnectedHosts()) {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
                                host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
                                connections *
                                        poolingOptions.getMaxRequestsPerConnection(distance)))
                        .append("<br>\n");
            }
            return response.toString();
}
于 2017-01-08T18:52:20.317 に答える