0

こんにちは、Spring Boot アプリケーションには、プロパティ ファイルを使用して r2dbc 接続プールが自動構成されています。

    spring.r2dbc.url=r2dbc:pool:postgres://localhost:5432/ecom
    spring.r2dbc.username=xxx
    spring.r2dbc.password=yyy</code></pre>

ここで、PostgresqlConnection インスタンスを取得する必要があります。これを次のように行います。

this.connection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

しかし、これはプール構成であるため、ClassCastException と、必要な PostgresqlConnection をラップする次の PooledConnection オブジェクトを受け取ります。

PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@14c93774, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@62a68bcb}]

PostgresqlConnection にアクセスして、通知などのネイティブ機能を使用する必要があります。

PostgresqlConnection connection = …;
    Flux<Notification> listen = connection.createStatement("LISTEN mymessage")
    .execute()
    .flatMap(PostgresqlResult::getRowsUpdated)
    .thenMany(connection.getNotifications());

問題は、connectionFactory から PostgresqlConnection インスタンスを適切に取得する方法です。どんな助けでも大歓迎です。

4

1 に答える 1

1
  1. デフォルトの ConnectionFactory をオーバーライドします。

     @Bean
     @Primary
     public ConnectionFactory connectionFactory() {
         return new PostgresqlConnectionFactory(
                 PostgresqlConnectionConfiguration.builder()
                         .host("localhost")
                         .database("test")
                         .username("user")
                         .password("password")
                         .codecRegistrar(EnumCodec.builder().withEnum("post_status", Post.Status.class).build())
                         .build()
         );
     }
    
  2. listen/notify 用に別の接続ファクトリーを作成します。

        @Bean
     @Qualifier("pgConnectionFactory")
     public ConnectionFactory pgConnectionFactory() {
         return new PostgresqlConnectionFactory(
                 PostgresqlConnectionConfiguration.builder()
                         .host("localhost")
                         .database("test")
                         .username("user")
                         .password("password")
                         //.codecRegistrar(EnumCodec.builder().withEnum("post_status", Post.Status.class).build())
                         .build()
         );
     }
    

2 番目の方法の例を作成しました。ここを確認してください。

アプリケーションを起動し、次から hello を送信しますcurl

curl http://localhost:8080/hello

コンソールに、次のようなメッセージが表示されます。

2020-09-15 16:49:20.657  INFO 20216 --- [ctor-http-nio-4] sending notification::                   : onSubscribe(FluxFlatMap.FlatMapMain)
2020-09-15 16:49:20.658  INFO 20216 --- [ctor-http-nio-4] sending notification::                   : request(unbounded)
2020-09-15 16:49:20.666  INFO 20216 --- [actor-tcp-nio-2] reactor.Flux.ConcatMap.2                 : onNext(NotificationResponseWrapper{name=mymessageprocessId=753parameter=Hello world at 2020-09-15T16:49:20.656715600})
2020-09-15 16:49:20.667  INFO 20216 --- [actor-tcp-nio-2] com.example.demo.Listener                : notifications: NotificationResponseWrapper{name=mymessageprocessId=753parameter=Hello world at 2020-09-15T16:49:20.656715600}
于 2020-09-15T09:09:45.737 に答える