1

NATS Streaming Server を Java クライアントと組み合わせて使用​​した経験のある人はいますか? 具体的には、加入者がオフラインのときに送信される Java クライアントを使用してメッセージを取得する方法がわかりません。

Go クライアントを使用して、メッセージをパブリッシュし、後でサブスクリプションを追加して、パブリッシュされたすべてのメッセージを取得できることを確認できます。これは NATS Streaming Getting Startedドキュメントにあり、宣伝どおりに機能します。

いくつかのメッセージを公開します。パブリケーションごとに結果を取得する必要があります。

$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples
go run stan-pub.go foo "msg one"
Published [foo] : 'msg one'
$ go run stan-pub.go foo "msg two"
Published [foo] : 'msg two'
$ go run stan-pub.go foo "msg three"
Published [foo] : 'msg three'

サブスクライバー クライアントを実行します。--all フラグを使用して、発行されたすべてのメッセージを受信します。

$ go run stan-sub.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196

NATS Java clientを使用してこれを実行しようとしています。類似のメソッド呼び出しが見つからないだけなのか、それともその機能が Java クライアントに存在しないのか、私にはわかりません。

これが私が試したことです

    import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Constants;
import io.nats.client.Message;
import io.nats.client.SyncSubscription;

import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class NatsTest2 {

  private static final SecureRandom random = new SecureRandom();

  public static void main(String... args) throws Exception {
    final ConnectionFactory factory = new ConnectionFactory(Constants.DEFAULT_URL);
    try (final Connection conn = factory.createConnection()) {
      // Simple Async Subscriber
      final String expectMessage = "Yum, cookies " + System.currentTimeMillis();
      works(conn, expectMessage);
      broken(conn, expectMessage);
    }
  }

  private static void works(Connection conn, String expectMessage) throws IOException, TimeoutException {
    final String queue = Long.toString(random.nextLong());
    System.out.print(queue + "=>");
    try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
      conn.publish(queue, expectMessage.getBytes());
      subscribe(subscription);
    }
  }

  private static void broken(Connection conn, String expectMessage) throws IOException, TimeoutException {
    final String queue = Long.toString(random.nextLong());
    System.out.print(queue + "=>");
    conn.publish(queue, expectMessage.getBytes());
    try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
      subscribe(subscription);
    }
  }

  private static void subscribe(SyncSubscription subscription) throws IOException, TimeoutException {
    final Message message = subscription.nextMessage(1, TimeUnit.SECONDS);
    System.out.println(new String(message.getData()));
  }
}

これにより、出力が得られます

-8522002637987832314=>Yum, cookies 1473462495040
-3024385525006291780=>Exception in thread "main" java.util.concurrent.TimeoutException: Channel timed out waiting for items
4

1 に答える 1

1

を使用している場合は、 java-nats-streamingクライアントnats-streaming-serverを使用する必要があります。お探しの機能 (履歴メッセージへのサブスクリプション) は、そのクライアントにのみ存在します。

とにかく、jnatsクライアントで行ったことを確認した理由は次のとおりです。

nats-streaming-serverには現在 NATS サーバー ( gnatsd) が組み込まれているため、通常の NATS クライアントに標準の NATS 機能が許可されています。

サンプル コードでworks()は、メッセージを発行する前にサブスクリプションが既に作成されているため、たまたま機能します (つまり、try-with-resourcesブロックは、他のことが起こる前にサブスクリプションが既にアクティブであることを保証します)。したがって、過去に発行されたメッセージを実際に受け取っているわけではありません。サブスクリプションの開始後に発行されたメッセージを受信して​​います。

このbroken()例は、サブスクリプションが作成される前に実際にメッセージをパブリッシュしているため機能しません。メッセージは (まだ) 関心がないため、サーバーによって破棄されます。

于 2016-09-12T18:58:00.740 に答える