direct
RabbitMQ サーバーで交換をセットアップしました。次のように、そのサーバーにメッセージを発行します。
val message = Json.toJson(myMessage).toString
subscriber.rabbitControl ! Message.exchange(message, routingKey = "MyMessage", exchange = "ipsmse")
サブスクライバーが次のメッセージを読み込もうとしています。
class SubscriberImpl @Inject()(actorSystem: ActorSystem, lifecycle: ApplicationLifecycle) extends Subscriber {
override val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
import MyMessage._
implicit val myMessageFormat = Json.format[MyMessage]
implicit val recoveryStrategy = RecoveryStrategy.none
val subscriptionRef = Subscription.run(rabbitControl) {
import Directives._
channel(qos = 3) {
consume(Queue.passive("MyMessage")) {
(body(as[MyMessage]) & routingKey) { (myMessage, key) =>
println(s"""A message '${myMessage.toString}' was received over '$key'.""")
ack
}
}
}
}
lifecycle.addStopHook { () =>
Future(subscriptionRef.close())
}
}
play アプリケーションを実行すると、公開はうまくいっているようです:
rabbitmqadmin --vhost="ipsmsvh" get queue=MyMessage requeue=false -u ... -p ...
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
| MyMessage | ipsmse | 8 | {"id":"1bef63f4-2854-41c1-9736-3fe913eec307","createdAt":1484161631674,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}} | 148 | string | True |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
しかし、サブスクライバーは、別のコンテンツ タイプを予期してエラーを生成します。
MyMessage - exception while processing message. Body={"id":"70a43b48-51b8-4adc-86d0-6a250c1d89a8","createdAt":1484161760382,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}}. Envelope=Envelope(deliveryTag=1, redeliver=true, exchange=ipsmse, routingKey=MyMessage)
com.spingo.op_rabbit.MismatchedContentType: MismatchedContentType: expected 'application/json', received 'text/plain'
at com.spingo.op_rabbit.PlayJsonSupport$$anon$2.unmarshall(PlayJsonSupport.scala:53)
ここに私のマーシャラー/アンマーシャラーがあります:
object Marshaller {
implicit val myMessageMarshaller =
new RabbitMarshaller[MyMessage]
with RabbitUnmarshaller[MyMessage] {
val contentType = "application/json"
val contentEncoding = Some("UTF-8")
override def marshall(value: MyMessage) =
value.toString.getBytes
override def unmarshall(
value: Array[Byte],
contentType: Option[String],
charset: Option[String]): MyMessage = {
val json = Json.parse(new String(value))
MyMessage.myMessageReads.reads(json).getOrElse(new MyMessage(null, null, null))
}
}
}
エラーの原因とその修正方法に関するアイデアはありますか? それは単に content-type を設定するだけの問題ですか? もしそうなら、どのように? ありがとう!