TCP ソケットからデータを消費するように一連のスプリング統合コンポーネントを構成しようとしています。基本的なプロトコルは、接続を開くと、ユーザー名とパスワードの入力を求められ、認証が成功すると、データが利用可能になるとストリーミングされます。ping メッセージが 30 秒ごとに送信されるので、データがストリーミングされていない静かな時間帯に接続が有効であることを確認できます。
spring-integration docs に従って、TCP ゲートウェイをセットアップしました。 http://docs.spring.io/spring-integration/reference/html/ip.html#tcp-gateways
<bean id="authInterceptorFactory"
class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<array>
<bean class="com.socketfetching.AuthConnectionInterceptorFactory">
<constructor-arg value="Login Username:"/>
<constructor-arg value="${socket.username}"/>
<constructor-arg value="Password:"/>
<constructor-arg value="${socket.password}"/>
</bean>
</array>
</property>
</bean>
<bean id="lfSeserializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer"/>
<ip:tcp-connection-factory id="connectionFactory"
type="client"
host="${socket.url}"
port="${socket.port}"
single-use="false"
so-keep-alive="true"
interceptor-factory-chain="authInterceptorFactory"
deserializer="lfSeserializer"
serializer="lfSeserializer"
/>
<int:channel id="socketInitChannel"/>
<ip:tcp-inbound-gateway id="inGateway"
request-channel="clientBytes2StringChannel"
reply-channel="socketInitChannel"
connection-factory="connectionFactory"
reply-timeout="10000"
retry-interval="5000"
auto-startup="true"
client-mode="true"/>
InterceptorFactory は、接続が開かれたときに発生するハンドシェイクを処理し、予想されるプロンプトと目的の応答をパラメーターとして受け取ります。このハンドシェイクは完全に機能し、アプリケーションはサーバーから定期的な ping を受信しています。
client-mode=true を指定すると、ゲートウェイは起動時にすぐに接続を開き、ユーザー名のプロンプトを待ちます。
私の問題は、接続が失われたときの回復にあります。ネットワーク接続を切断すると、明らかに ping が来なくなり、ゲートウェイがこれを検出して、定期的に再接続を試行するようにしたいと考えています。ネットワーク接続が復元されると、ゲートウェイは正常に再接続されるはずです。
再試行間隔でこれを処理できると思いましたが、効果がないようです。ドキュメントは、この目的のために TaskScheduler を使用することを示唆しています...しかし、それをサーバーからのpingメッセージと統合する方法が正確にはわかりません。
何かアドバイス?
編集:理想的かどうかはわかりませんが、機能するソリューションを見つけました。私のゲートウェイの再試行間隔は、5 秒ごとに接続がテストされ、必要に応じて再作成されることを意味します。これは、AuthConnectionInterceptor で isOpen() を呼び出すことによって行われます。そのため、このメソッドをオーバーライドして、現在の時刻とインターセプターを介して送信された最後のメッセージの間のデルタを確認することができました。タイム ギャップが長すぎる場合は、手動で接続を切断し、再接続をトリガーします。
これらのクラスの完全なソースは次のとおりです... InterceptorFactory: package com.socketfetching;
import org.apache.log4j.Logger;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
/**
* AuthConnectionInterceptorFactory
* Created by: Seth Kelly
* Date: 10/3/13
*/
public class AuthConnectionInterceptorFactory implements TcpConnectionInterceptorFactory {
private static Logger logger = Logger.getLogger(AuthConnectionInterceptorFactory.class);
private String usernamePrompt;
private String username;
private String passwordPrompt;
private String password;
public AuthConnectionInterceptorFactory(String usernamePrompt, String username, String passwordPrompt, String password) {
this.usernamePrompt = usernamePrompt;
this.username = username;
this.passwordPrompt = passwordPrompt;
this.password = password;
}
@Override
public TcpConnectionInterceptor getInterceptor() {
return new AuthConnectionInterceptor(usernamePrompt, username, passwordPrompt, password);
}
}
インターセプター:
package com.socketfetching;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.ip.tcp.connection.AbstractTcpConnectionInterceptor;
import org.springframework.integration.support.MessageBuilder;
/**
* AuthConnectionInterceptor
* Created by: Seth Kelly
* Date: 10/3/13
*
* Handles username/password authentication when opening a new TCP connection.
*/
public class AuthConnectionInterceptor extends AbstractTcpConnectionInterceptor {
private static Logger logger = Logger.getLogger(AuthConnectionInterceptor.class);
private String usernamePrompt;
private String username;
private String passwordPrompt;
private String password;
private Boolean usernameSent = false;
private Boolean passwordSent = false;
private static final String PING_PREFIX = "Ping";
private DateTime lastMsgReceived;
private Integer inactivityTimeout = 35000;
public AuthConnectionInterceptor(String usernamePrompt, String username, String passwordPrompt, String password) {
this.usernamePrompt = usernamePrompt;
this.username = username;
this.passwordPrompt = passwordPrompt;
this.password = password;
}
@Override
public boolean onMessage(Message<?> message) {
lastMsgReceived = new DateTime();
Boolean forwardMessage = true;
if(!this.isServer()) {
String payload = new String((byte[])message.getPayload());
if(!usernameSent) {
if(payload.equals(usernamePrompt)) {
try {
logger.debug("Sending username=" + username + "to authenticate socket.");
super.send(MessageBuilder.withPayload(username).build());
usernameSent = true;
forwardMessage = false;
} catch (Exception e) {
throw new MessagingException("Negotiation error", e);
}
}
else {
throw new MessagingException("Negotiation error. expected message=" + usernamePrompt +
" actual message=" + payload);
}
}
else if(!passwordSent) {
if(payload.equals(passwordPrompt)) {
try {
logger.debug("Sending password to authenticate socket.");
super.send(MessageBuilder.withPayload(password).build());
passwordSent = true;
forwardMessage = false;
} catch (Exception e) {
throw new MessagingException("Negotiation error", e);
}
}
else {
throw new MessagingException("Negotiation error. expected message=" + passwordPrompt +
" actual message=" + payload);
}
}
else if(payload.startsWith(PING_PREFIX)) {
//Just record that we received the ping.
forwardMessage = false;
}
}
if(forwardMessage)
return super.onMessage(message);
else
return true;
}
@Override
public boolean isOpen() {
DateTime now = new DateTime();
if((lastMsgReceived == null) ||
((now.getMillis() - lastMsgReceived.getMillis()) < inactivityTimeout)) {
return super.isOpen();
}
else
{
if(super.isOpen()) {
super.close();
}
return false;
}
}
public Integer getInactivityTimeout() {
return inactivityTimeout;
}
public void setInactivityTimeout(Integer inactivityTimeout) {
this.inactivityTimeout = inactivityTimeout;
}
}