2

directRabbitMQ サーバーで交換をセットアップしました。次のように、そのサーバーにメッセージを発行します。

val message = Json.toJson(myMessage).toString
subscriber.rabbitControl ! Message.exchange(message, routingKey = "MyMessage", exchange = "ipsmse")

サブスクライバーが次のメッセージを読み込もうとしています。

class SubscriberImpl @Inject()(actorSystem: ActorSystem, lifecycle: ApplicationLifecycle) extends Subscriber {

  override val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

  import MyMessage._
  implicit val myMessageFormat = Json.format[MyMessage]
  implicit val recoveryStrategy = RecoveryStrategy.none
  val subscriptionRef = Subscription.run(rabbitControl) {
    import Directives._
    channel(qos = 3) {
      consume(Queue.passive("MyMessage")) {
        (body(as[MyMessage]) & routingKey) { (myMessage, key) =>
          println(s"""A message '${myMessage.toString}' was received over '$key'.""")
          ack
        }
      }
    }
  }

  lifecycle.addStopHook { () =>
    Future(subscriptionRef.close())
  }
}

play アプリケーションを実行すると、公開はうまくいっているようです:

rabbitmqadmin --vhost="ipsmsvh" get queue=MyMessage requeue=false -u ... -p ...     

+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
|    routing_key     | exchange | message_count |                                                                       payload                                                                        | payload_bytes | payload_encoding | redelivered |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
| MyMessage | ipsmse   | 8             | {"id":"1bef63f4-2854-41c1-9736-3fe913eec307","createdAt":1484161631674,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}} | 148           | string           | True        |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+    

しかし、サブスクライバーは、別のコンテンツ タイプを予期してエラーを生成します。

MyMessage - exception while processing message. Body={"id":"70a43b48-51b8-4adc-86d0-6a250c1d89a8","createdAt":1484161760382,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}}. Envelope=Envelope(deliveryTag=1, redeliver=true, exchange=ipsmse, routingKey=MyMessage)
com.spingo.op_rabbit.MismatchedContentType: MismatchedContentType: expected 'application/json', received 'text/plain'
  at com.spingo.op_rabbit.PlayJsonSupport$$anon$2.unmarshall(PlayJsonSupport.scala:53)  

ここに私のマーシャラー/アンマーシャラーがあります:

object Marshaller {

  implicit val myMessageMarshaller =
    new RabbitMarshaller[MyMessage]
        with RabbitUnmarshaller[MyMessage] {
      val contentType = "application/json"
      val contentEncoding = Some("UTF-8")

      override def marshall(value: MyMessage) =
        value.toString.getBytes

      override def unmarshall(
          value: Array[Byte],
          contentType: Option[String],
          charset: Option[String]): MyMessage = {
        val json = Json.parse(new String(value))
        MyMessage.myMessageReads.reads(json).getOrElse(new MyMessage(null, null, null))
      }
    }

}

エラーの原因とその修正方法に関するアイデアはありますか? それは単に content-type を設定するだけの問題ですか? もしそうなら、どのように? ありがとう!

4

0 に答える 0