MessageReceiver

时间:2023-03-09 16:01:48
MessageReceiver
class MessageReceiver
{
private RelayEngine<MessageCollection> _MessageRelayEngine;
private string _Hostname;
private int _MessageDispatchServerPort;
private string _SessionId;
private TcpClient _Client;
private bool _Stopped = false;
private Thread _Worker = null;
private ulong _LastMessageSequence = 0; internal MessageReceiver(RelayEngine<MessageCollection> messageRelayEngine)
{
this._MessageRelayEngine = messageRelayEngine;
} internal void Start(string hostname, int messageDispatchServerPort, string sessionId)
{
this.Stop(); this._Stopped = false; this._Hostname = hostname;
this._MessageDispatchServerPort = messageDispatchServerPort;
this._SessionId = sessionId; this._Worker = new Thread(ConnectServerAndReceiveMessage);
this._Worker.IsBackground = true;
this._Worker.Start();
} internal void SetSessionId(string newSessionId)
{
this._SessionId = newSessionId;
} private void ConnectServerAndReceiveMessage(object state)
{
byte[] header = new byte[4];
int defaultBufferLen = 1024 * 64;
byte[] defaultBuffer = new byte[defaultBufferLen]; while (!this._Stopped)
{
this._Client = null;
NetworkStream stream = null; try
{
Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage to connect {0}:{1}", _Hostname, _MessageDispatchServerPort); this._Client = new TcpClient();
this._Client.Connect(_Hostname, _MessageDispatchServerPort);
stream = this._Client.GetStream(); byte[] sessionData = ASCIIEncoding.ASCII.GetBytes(this._SessionId);
byte[] sessionDataLen = new byte[2] { (byte)(sessionData.Length >> 8), (byte)sessionData.Length };
stream.Write(sessionDataLen, 0, sessionDataLen.Length);
stream.Write(sessionData, 0, sessionData.Length);
}
catch (Exception ex)
{
Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage error:\r\n{0}", ex);
this.CloseConectionSilently();
continue;
} while (true)
{
Array.Clear(header, 0, header.Length);
if (!this.ReadAll(stream, header))
{
Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read header data failed");
this.CloseConectionSilently();
break;//reconnect
} int dataLength = ((int)header[0] << 24) + ((int)header[1] << 16) + ((int)header[2] << 8) + header[3];
byte[] buffer = dataLength <= defaultBufferLen ? defaultBuffer : new byte[dataLength]; if (!this.ReadAll(stream, buffer, dataLength))
{
Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read message data failed");
this.CloseConectionSilently();
break;//reconnect
} try
{
MessageCollection message = CompressHelper.FromByteArray<MessageCollection>(buffer, dataLength);
if (message.Sequence != this._LastMessageSequence)
{
Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ConnectServerAndReceiveMessage got message with worng sequence {0}, excepted sequece is {1}",
message.Sequence, this._LastMessageSequence);
}
this._LastMessageSequence++; this._MessageRelayEngine.AddItem(message);
ConsoleClient.Instance.RefreshLastMsgTime();
}
catch (Exception ex)
{
Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage add message to engine error:\r\n{0}", ex);
}
}
}
} private void CloseConectionSilently()
{
try
{
if (this._Client != null)
{
this._Client.Close();
this._Client = null;
}
}
catch (Exception ex)
{
Logger.TraceEvent(TraceEventType.Error, "MessageReceiver CloseConectionSilently error:\r\n{0}", ex);
}
} private bool ReadAll(NetworkStream stream, byte[] buffer, int? dataLength = null)
{
try
{
int offset = 0;
int len = dataLength.HasValue ? dataLength.Value : buffer.Length; while (len > 0)
{
if (!stream.DataAvailable)
{
Thread.Sleep(100);
continue;
}
int readLength = stream.Read(buffer, offset, len);
if (readLength == 0)
{
return false;
}
else
{
offset += readLength;
len -= offset;
}
}
return true;
}
catch (Exception ex)
{
Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ReadAll error:\r\n{0}", ex.ToString());
return false;
}
} internal void Stop()
{
this._Stopped = true;
this._LastMessageSequence = 0;
this.CloseConectionSilently();
if (this._Worker != null)
{
this._Worker.Join(1000);
this._Worker = null;
}
}
}