TCPClient セッションに問題があります。
背景:
- Bulk SMS サービスを実行していますが、最近、ネットワーク モバイル オペレーター (MNO) からの新しいメッセージをプルからリッスンに切り替えました。
- 新しいメッセージをリッスンするクラスを作成しました(以下のコードを参照)
- MNO で所有しているアカウントごとにスレッドを実行する必要があるため、クラスを 15 回呼び出します。
- 各スレッドは、特定のアカウントのサブスクリプション情報を送信し、メッセージを待機します
- コードを実行すると、15 個のスレッドが呼び出されて作成され、すべてのスレッドが接続されていることが示されます
- 60 分ごとに 15 個のスレッドを停止して開始 (再作成) します。更新するたびに、ログに各スレッドが正しく接続されていることが示されます
- 24 時間年中無休のリスニング サービスが必要です
問題:
- スレッドは接続されているように見えますが、サービスをテストしたところ、リスニング サービスからメッセージが届かない場合があります。MNOはすべてのスレッドが接続されていることを示していますが、スレッドがスリープ状態になっているようです
- 受信メッセージを常に受信するにはどうすればよいですか?
リスニング サービスのコードは次のとおりです。
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Windows.Forms;
using log4net;
using MessageManagerWinService.Util;
using MessageManagerWinService.DAL;
using MessageManagerWinService.Classes;
using System.Linq;
namespace MessageManagerWinService.MTNListener
{
public class Listener {
internal static readonly ILog log = LogManager.GetLogger(typeof(Listener));
private ManualResetEvent _stopEvent;
private Thread _listenerThread;
private bool _running;
private bool _connected;
private DateTime _lastSubscribe;
private TcpClient _client;
private NetworkStream _stream;
public string _listenType { get; set; }
public string _username { get; set; }
public string _password { get; set; }
public void Start()
{
_stopEvent = new ManualResetEvent(false);
//_listenType = listenType.ToUpper();
_listenerThread = new Thread(ListenerThreadProc);
_running = true;
_listenerThread.Start();
}
public void Stop() {
_running = false;
try
{
_client.Close();
log.Info(_listenType + " Client Closed - " + " - " + DateTime.Now);
}
catch (Exception)
{
log.Info("Tried Closing Client - Client not open - " + _listenType + " - " + DateTime.Now);
}
Thread.Sleep(1000); // or more
if (_listenerThread.IsAlive)
{
log.Info("Aborting thread - " + _listenType + " - " + DateTime.Now);
_listenerThread.Abort();
}
}
private void Reconnect()
{
int srvInd = 0;
while (_running && !_connected)
{
string addr = "";
if (_listenType == "USSD")
{
addr = AppSettings.Instance.ReceiveGatewayUSSD[srvInd];
}
else if (_listenType == "ACCOUNTS")
{
addr = AppSettings.Instance.ReceiveGatewayAccounts[srvInd];
}
log.Info("Connecting to " + _listenType +" IP Address : " + addr + "...");
var ipAddr = addr.Split(':')[0];
var port = Int32.Parse(addr.Split(':')[1]);
_client = new TcpClient();
IAsyncResult result = _client.BeginConnect(ipAddr, port, null, null);
WaitHandle timeoutHandler = result.AsyncWaitHandle;
try
{
if (!result.AsyncWaitHandle.WaitOne(TimeSpan.FromSeconds(AppSettings.Instance.ConnectTimeout), false))
{
_client.Close();
_connected = false;
log.Info("Timeout connecting to " + _listenType);
} else
{
//log.Info("Connected to " + _listenType);
_client.EndConnect(result);
_stream = _client.GetStream();
//_stream.ReadTimeout = AppSettings.Instance.ReadTimeout*1000;
_connected = true;
}
}
catch (Exception ex)
{
_connected = false;
log.Error("Error connecting to " + _listenType + ". Error: " + ex.Message, ex);
}
finally
{
timeoutHandler.Close();
}
if (!_connected)
{
srvInd++;
if (srvInd >= addr.Length)
{
srvInd = 0;
}
}
}
if (_running && _connected)
{
Subscribe();
}
}
protected void SendString(string str)
{
var buf = Encoding.ASCII.GetBytes(str + "\r\n");
_stream.Write(buf, 0, buf.Length);
}
private string FixPattern(string code)
{
return code.Replace("*", "\\*");
}
private void Subscribe() {
_lastSubscribe = DateTime.Now;
if (_listenType == "USSD")
{
//Subscribe to USSD service
SendString("<usereq USERNAME='" + _username + "' PASSWORD ='" + _password + "' VERBOSITY='0'>");
SendString("<subscribe NODE='.*' TRANSFORM='USSD' PATTERN='\\*'/>");
SendString("</usereq>END");
log.Info("Subscription to USSD - Connected - " + DateTime.Now.ToString());
}
else if (_listenType == "ACCOUNTS")
{
SendString("<usereq USERNAME='" + _username + "' PASSWORD ='" + _password + "'>");
//MTN_Accounts_SendString("<subscribe NODE='pollSmppTopic' TRANSFORM='DELIVER_SM'>");
SendString("<subscribe NODE='" + _username + "' TRANSFORM='DELIVER_SM'>");
SendString("<pattern><![CDATA[$short_message~='.*']]></pattern>");
SendString("</subscribe>");
SendString("</usereq>END");
log.Info("Subscription to Accounts (" + _username + ") - Connected - " + DateTime.Now.ToString());
}
}
private bool Eol(byte[] buf) {
if (buf.Length < 1) {
return false;
}
return buf[buf.Length - 1] == 0x0a;
}
protected string ReadString()
{
var ms = new MemoryStream();
while (!Eol(ms.ToArray()))
{
try
{
byte[] buf = new byte[1];
int len = _stream.Read(buf, 0, 1);
if (len > 0)
{
ms.WriteByte(buf[0]);
}
else
{
break;
}
}
catch(IOException)
{
break;
}
}
if (ms.Length==0)
{
return null;
}
return Encoding.ASCII.GetString(ms.ToArray()).TrimEnd(new char[] { '\r', '\n' });
}
private void ListenerThreadProc()
{
_connected = false;
while (_running)
{
try
{
if (!_connected)
{
//log.Info("_listenType + ("+ _username + ") - Disconnected. Attempting to reconnect. DateTime: " + DateTime.Now);
Reconnect();
}
//log.Info(_listenType +" ("+ _username + ") - Total minutes elapsed since last refresh : " +DateTime.Now.Subtract(_lastSubscribe).TotalMinutes);
if (DateTime.Now.Subtract(_lastSubscribe).TotalMinutes >= AppSettings.Instance.SubscribeInterval)
{
//log.Info(_listenType + " (" + _username + ") - Refreshing Thread : " + DateTime.Now.Subtract(_lastSubscribe).TotalMinutes);
EmailService sendMail = new EmailService();
sendMail.SendEmail("ADMIN", "ALERT - Message Manager : Re-subscribing to " + _listenType + " listening service", "", null, "");
Subscribe();
}
string line;
string message = "";
while ((line = ReadString()) != null)
{
using (var fs = new StreamWriter(Path.Combine(Path.GetDirectoryName(Application.ExecutablePath), AppSettings.Instance.MessageFilename), true))
{
fs.WriteLine(line);
}
log.Debug("Received: " + line);
if (line.Contains("<usarsp"))
{
message = "";
message = @"<?xml version=""1.0"" encoding=""UTF-8""?>";
message = message + line;
}
else if (line.Contains("</usarsp>"))
{
message = message + line;
ProcessMTNMessage processMessage = new ProcessMTNMessage();
processMessage.processMessage(_listenType, message);
}
else
{
message = message + line;
}
}
if(!_client.Connected)
{
_connected = false;
try
{
_stream.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
Thread.Sleep(100);
}
catch(IOException)
{
log.Info("Disconnected.");
_connected = false;
try
{
_stream.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
try
{
_client.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
catch(ThreadInterruptedException ex)
{
log.Error("Thread Interrupted Error: " + ex.Message, ex);
}
catch(Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
if(_connected)
{
try
{
_stream.Close();
}
catch(Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
try
{
_client.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
}
}
}