TCPソケットからのメッセージをリッスンし、本文をデータベースに送信する次のキャメルルートがあります。
from("netty:tcp://localhost:5150?sync=false&keepAlive=true")
.transform().simple("insert into mytable (DATA) values (\"${in.body}\");")
.to("jdbc:mydb");
そして、最初にメッセージを送信する以下:
from("direct:input").to("netty:tcp://localhost:5150?sync=false&keepAlive=true");
次のようなJUnitでテストしました:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class OutputRoutesTest {
@Autowired
protected CamelContext camelContext;
@EndpointInject(uri = "direct:input")
private ProducerTemplate template;
@Test
@DirtiesContext
public void testTCPSend() throws Exception {
String body = "foo?";
NotifyBuilder notify = new NotifyBuilder(camelContext).whenDone(1).create();
try {
template.sendBody(body);
} finally {
template.stop();
}
boolean matches = notify.matches(5, TimeUnit.SECONDS);
assertTrue(matches);
}
}
テストを開始すると、次のスタックトレースがあります。
12:05:36.502 [Camel (camel-1) thread #22 - NettyOrderedWorker] ERROR o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-M249-52364-1373537133834-0-3 on ExchangeId: ID-M249-52364-1373537133834-0-4). Exhausted after delivery attempt: 3 caught: java.sql.SQLException: Data source is closed
java.sql.SQLException: Data source is closed
at org.apache.commons.dbcp.BasicDataSource.createDataSource(BasicDataSource.java:1362) ~[commons-dbcp-1.4.jar:1.4]
at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044) ~[commons-dbcp-1.4.jar:1.4]
at org.apache.camel.component.jdbc.JdbcProducer.processingSqlBySettingAutoCommit(JdbcProducer.java:76) ~[camel-jdbc-2.11.0.jar:2.11.0]
at org.apache.camel.component.jdbc.JdbcProducer.process(JdbcProducer.java:63) ~[camel-jdbc-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:164) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.component.netty.handlers.ServerChannelHandler.processAsynchronously(ServerChannelHandler.java:118) [camel-netty-2.11.0.jar:2.11.0]
at org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:102) [camel-netty-2.11.0.jar:2.11.0]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.6.5.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.6.5.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.6.5.Final.jar:na]
at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [netty-3.6.5.Final.jar:na]
at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [netty-3.6.5.Final.jar:na]
at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [netty-3.6.5.Final.jar:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) [na:1.6.0_05]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) [na:1.6.0_05]
at java.lang.Thread.run(Thread.java:619) [na:1.6.0_05]
12:05:36.503 [Camel (camel-1) thread #22 - NettyOrderedWorker] DEBUG org.apache.camel.processor.Pipeline - Message exchange has failed: so breaking out of pipeline for exchange: Exchange[Message: insert into mytable (DATA) values ("foo?");] Exception: java.sql.SQLException: Data source is closed
12:05:37.473 [Camel (camel-1) thread #23 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - Route: route2 preparing to shutdown complete.
メッセージは camel ルートで受信されますが、データベースに送信できません。データベースとの接続の問題のようですが、TCPエンドポイントを「direct:output」に置き換えてみたところbaseにデータが挿入されました。
どこが間違っていますか?netty:tcp コンポーネントの使い方を誤解している可能性があります。ご協力いただきありがとうございます。
Camel 2.11 と Spring 3.1.2 を使用しています