0

HDFS にログ イベントをロードする単純な春の XD アプリケーションを試しています。spring-ampq/rabbit log4j appender(クラス) を使用してターゲット アプリケーションを構成org.springframework.amqp.rabbit.log4j.AmqpAppenderし、事前に構成された交換にログ メッセージを送り込みました。これらのメッセージを HDFS からプルし、HDFS にプッシュするように次のストリームを設定します。ここでは、ソース モジュールとシンク モジュールの両方が既製の XD モジュールです。

ストリーム定義、

xd:>stream create --name demoQ1 --definition "rabbit | hdfs --rollover=15 --directory=/user/root" --deploy

新しいストリーム「demoQ1」を作成してデプロイしました

xd:>stream list
  Stream Name  Stream Definition                                   Status
  -----------  --------------------------------------------------  --------
  demoQ1       rabbit | hdfs --rollover=15 --directory=/user/root  deployed

AMQP アペンダーは、メッセージを exchange にパブリッシュし、demoQ1 キューにルーティングします。ここで、rabbit ソースが最初のメッセージを取得し、メッセージを確認しないためスタックします。その理由は何ですか?

4

2 に答える 2

0

コンテナ ログに「メッセージ ペイロードを HDFS に書き込めませんでした」というメッセージが表示されますか?

その場合は、モジュール間の型変換を使用する必要があります。ウサギのソースから hdfs シンクへのメッセージは、単純にバイト配列になります。

あなたのストリーム定義は、

stream create --name demoQ1 --definition "rabbit --outputType=text/plain | hdfs --rollover=15 --directory=/user/root" --deploy

また、

stream create --name demoQ1 --definition "rabbit | hdfs --inputType=text/plain --rollover=15 --directory=/user/root" --deploy

それぞれソース/シンクの outputType または inputType オプションに注意してください。この場合、hdfs シンクの HdfsStoreMessageHandler は、ペイロードが String 型であると想定しています。

型変換の詳細については、 https ://github.com/spring-projects/spring-xd/wiki/Type-Conversion を確認してください。

于 2014-04-21T13:03:09.943 に答える
-1

ウサギモジュールを実行しているSpring XDコンテナーでデバッグログを有効にしました。最初のメッセージに対して次の例外が繰り返し発生し、メッセージが再キューイングされたため、メッセージは未確認の状態のままになり、ウサギソースはそれ以上のメッセージを処理できません..

この問題を解決するために、log4j Appender プロパティからこのプロパティを削除しましたlog4j.appender.amqp.contentEncoding=null。このプロパティは、エンコーダーの名前を「null」として明示的に指定しますが、これはバグのようです。null はエンコーダーが指定されていないことを意味すると予想していました:)

ログの例外。メッセージが拒否され、再びキューに入れられると、継続的に繰り返されます。

19:29:17,713 DEBUG SimpleAsyncTaskExecutor-1 listener.BlockingQueueConsumer:268 - Received message: (Body:'Hello'MessageProperties [headers={categoryName=org.apache.hadoop.yarn.server.nodemanager.NodeManager, level=INFO}, timestamp=Sat Apr 19 19:21:52 PDT 2014, messageId=null, userId=null, appId=NodeManager, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=test-exch, receivedRoutingKey=rk1, deliveryTag=184015, messageCount=0]) 19:29:17,715 WARN SimpleAsyncTaskExecutor-1 listener.SimpleMessageListenerContainer:530 - Execution of Rabbit message listener failed, and no ErrorHandler has been set. org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:751) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:690) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:583) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:154) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1111) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:556) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989) at java.lang.Thread.run(Thread.java:722) Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert text-based Message content at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:100) at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:73) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:688) ... 10 more Caused by: java.io.UnsupportedEncodingException: null at java.lang.StringCoding.decode(StringCoding.java:190) at java.lang.String.(String.java:416) at java.lang.String.(String.java:481) at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:97) ... 12 more 19:29:17,715 DEBUG SimpleAsyncTaskExecutor-1 listener.BlockingQueueConsumer:657 - Rejecting messages (requeue=true)
于 2014-05-11T01:08:11.473 に答える