暂时不使用MemoryStream,优化后重新使用,以及客户端库的原生Listener功能

This commit is contained in:
sin365 2024-03-29 17:49:19 +08:00
parent 19b8e57c28
commit f2e0279fd9
5 changed files with 200 additions and 142 deletions

View File

@ -141,17 +141,21 @@ namespace HaoYueNet.ClientNetwork
//LogOut("发送心跳包"); //LogOut("发送心跳包");
} }
object sendLock = new object();
/// <summary> /// <summary>
/// 发送数据并计数 /// 发送数据并计数
/// </summary> /// </summary>
/// <param name="data"></param> /// <param name="data"></param>
private void SendWithIndex(byte[] data) private void SendWithIndex(byte[] data)
{
lock (sendLock)
{ {
//增加发送计数 //增加发送计数
SendIndex = MaxSendIndexNum; SendIndex = MaxSendIndexNum;
//发送数据 //发送数据
client.Send(data); client.Send(data);
} }
}
////拼接头长度 ////拼接头长度
//private byte[] SendDataWithHead(byte[] message) //private byte[] SendDataWithHead(byte[] message)

View File

@ -7,7 +7,7 @@ namespace HaoYueNet.ClientNetwork.OtherMode
public class NetworkHelperCore_ListenerMode public class NetworkHelperCore_ListenerMode
{ {
private Socket serversocket; private Socket serversocket;
private Socket client; private Dictionary<nint,Socket> mDictHandleClient;
//响应倒计时计数最大值 //响应倒计时计数最大值
private static int MaxRevIndexNum = 50; private static int MaxRevIndexNum = 50;
@ -28,65 +28,109 @@ namespace HaoYueNet.ClientNetwork.OtherMode
public void Init(int port) public void Init(int port)
{ {
mDictHandleClient = new Dictionary<nint, Socket>();
LogOut("==>初始化NetworkHelperCore_ListenerMode"); LogOut("==>初始化NetworkHelperCore_ListenerMode");
serversocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); serversocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port); IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
serversocket.Bind(endPoint); // 绑定 serversocket.Bind(endPoint); // 绑定
serversocket.Listen(1); serversocket.Listen(1);
client = serversocket.Accept(); // 接收客户端连接 //client = serversocket.Accept(); // 接收客户端连接
OnConnected?.Invoke(true); //OnConnected?.Invoke(true);
Console.WriteLine("客户端连接成功 信息: " + client.AddressFamily.ToString()); //Console.WriteLine("客户端连接成功 信息: " + client.AddressFamily.ToString());
Thread revThread = new Thread(Recive); //Thread revThread = new Thread(Recive);
revThread.Start(client); //revThread.Start(client);
Task task = new Task(() =>
{
while (true)
{
Socket newclient;
try
{
newclient = serversocket.Accept(); // 接收客户端连接
} }
catch
{
break;
}
AddDictSocket(newclient);
OnConnected?.Invoke(newclient);
Console.WriteLine("客户端连接成功 信息: " + newclient.AddressFamily.ToString());
Thread revThread = new Thread(Recive);
revThread.Start(newclient);
}
});
task.Start();
}
#region
/// <summary>
/// 追加Socket返回下标
/// </summary>
/// <param name="socket"></param>
/// <returns></returns>
public void AddDictSocket(Socket socket)
{
if (socket == null)
return;
lock (mDictHandleClient)
{
mDictHandleClient[socket.Handle] = socket;
}
}
public void RemoveDictSocket(Socket socket)
{
if (socket == null)
return;
lock (mDictHandleClient)
{
if (!mDictHandleClient.ContainsKey(socket.Handle))
return;
mDictHandleClient.Remove(socket.Handle);
}
}
#endregion
~NetworkHelperCore_ListenerMode() ~NetworkHelperCore_ListenerMode()
{ {
client.Close(); nint[] keys = mDictHandleClient.Keys.ToArray();
for (uint i = 0; i < keys.Length; i++)
{
mDictHandleClient[keys[i]].Close();
}
mDictHandleClient.Clear();
} }
private void SendToSocket(byte[] data) private void SendToSocket(Socket socket, byte[] data)
{ {
//已拼接包长度,这里不再需要拼接长度 //已拼接包长度,这里不再需要拼接长度
//data = SendDataWithHead(data); //data = SendDataWithHead(data);
try try
{ {
SendWithIndex(data); SendWithIndex(socket,data);
} }
catch (Exception ex) catch (Exception ex)
{ {
//连接断开 //连接断开
OnCloseReady(); OnCloseReady(socket);
return; return;
} }
//LogOut("发送消息,消息长度=> "+data.Length); //LogOut("发送消息,消息长度=> "+data.Length);
} }
private void SendHeartbeat()
{
try
{
SendWithIndex(HeartbeatData);
}
catch (Exception ex)
{
//连接断开
OnCloseReady();
return;
}
//LogOut("发送心跳包");
}
/// <summary> /// <summary>
/// 发送数据并计数 /// 发送数据并计数
/// </summary> /// </summary>
/// <param name="data"></param> /// <param name="data"></param>
private void SendWithIndex(byte[] data) private void SendWithIndex(Socket socket,byte[] data)
{ {
//增加发送计数 //增加发送计数
SendIndex = MaxSendIndexNum; SendIndex = MaxSendIndexNum;
//发送数据 //发送数据
client.Send(data); socket.Send(data);
} }
/// <summary> /// <summary>
@ -94,56 +138,60 @@ namespace HaoYueNet.ClientNetwork.OtherMode
/// </summary> /// </summary>
/// <param name="CMDID"></param> /// <param name="CMDID"></param>
/// <param name="data">序列化之后的数据</param> /// <param name="data">序列化之后的数据</param>
public void SendToClient(byte[] data) public void SendToClient(Socket socket, byte[] data)
{ {
//LogOut("准备数据 data=> "+data); //LogOut("准备数据 data=> "+data);
SendToSocket(data); SendToSocket(socket, data);
} }
#region #region
public delegate void OnReceiveDataHandler(byte[] data); public delegate void OnConnectedHandler(Socket socket);
public delegate void OnConnectedHandler(bool IsConnected);
public delegate void OnCloseHandler(); public delegate void OnReceiveDataHandler(Socket sk, byte[] data);
public delegate void OnLogOutHandler(string Msg);
public delegate void OnDisconnectHandler(Socket sk);
public delegate void OnNetLogHandler(string msg);
#endregion #endregion
public event OnConnectedHandler OnConnected; public event OnConnectedHandler OnConnected;
public event OnReceiveDataHandler OnReceiveData;
public event OnCloseHandler OnClose; public event OnReceiveDataHandler OnReceive;
/// <summary>
/// 网络库调试日志输出 public event OnDisconnectHandler OnDisconnected;
/// </summary>
public event OnLogOutHandler OnLogOut; public event OnNetLogHandler OnNetLog;
/// <summary> /// <summary>
/// 做好处理的连接管理 /// 做好处理的连接管理
/// </summary> /// </summary>
private void OnCloseReady() private void OnCloseReady(Socket socket)
{ {
LogOut("关闭连接"); LogOut("关闭连接");
//关闭Socket连接 //关闭Socket连接
client.Close(); socket.Close();
OnClose?.Invoke(); RemoveDictSocket(socket);
OnDisconnected?.Invoke(socket);
} }
/// <summary> /// <summary>
/// 主动关闭连接 /// 主动关闭连接
/// </summary> /// </summary>
public void CloseConntect() public void CloseConntect(Socket socket)
{ {
OnCloseReady(); OnCloseReady(socket);
} }
private void DataCallBackReady(byte[] data) private void DataCallBackReady(Socket socket,byte[] data)
{ {
//增加接收计数 //增加接收计数
RevIndex = MaxRevIndexNum; RevIndex = MaxRevIndexNum;
OnReceiveData(data); OnReceive(socket,data);
} }
MemoryStream memoryStream = new MemoryStream();//开辟一个内存流
private void Recive(object o) private void Recive(object o)
{ {
MemoryStream memoryStream = new MemoryStream();//开辟一个内存流
var client = o as Socket; var client = o as Socket;
//MemoryStream memoryStream = new MemoryStream();//开辟一个内存流 //MemoryStream memoryStream = new MemoryStream();//开辟一个内存流
@ -162,33 +210,26 @@ namespace HaoYueNet.ClientNetwork.OtherMode
catch (Exception ex) catch (Exception ex)
{ {
//远程主机强迫关闭了一个现有的连接 //远程主机强迫关闭了一个现有的连接
OnCloseReady(); OnCloseReady(client);
return; return;
//断开连接 //断开连接
} }
memoryStream.Write(buffer, 0, effective);//将接受到的数据写入内存流中
while (true)
{
if (effective > 0)//如果接受到的消息不为0不为空 if (effective > 0)//如果接受到的消息不为0不为空
{ {
DataCallBackReady(memoryStream.ToArray()); memoryStream.Write(buffer, 0, effective);//将接受到的数据写入内存流中
DataCallBackReady(client, memoryStream.ToArray());
//流复用的方式 不用重新new申请 //流复用的方式 不用重新new申请
memoryStream.Position = 0; memoryStream.Position = 0;
memoryStream.SetLength(0); memoryStream.SetLength(0);
} }
} }
} }
}
public void LogOut(string Msg) public void LogOut(string Msg)
{ {
//Console.WriteLine(Msg); //Console.WriteLine(Msg);
OnLogOut?.Invoke(Msg); OnNetLog?.Invoke(Msg);
} }
public Socket GetClientSocket()
{
return client;
}
} }
} }

View File

@ -33,14 +33,14 @@ namespace HaoYueNet.ServerNetwork
/// <summary> /// <summary>
/// 数据缓存区 /// 数据缓存区
/// </summary> /// </summary>
//public List<byte> Buffer { get; set; } public List<byte> Buffer { get; set; }
public MemoryStream memoryStream { get; set; } //public MemoryStream memoryStream { get; set; }
public AsyncUserToken() public AsyncUserToken()
{ {
//this.Buffer = new List<byte>(); this.Buffer = new List<byte>();
this.memoryStream = new MemoryStream(); //this.memoryStream = new MemoryStream();
} }
/// <summary> /// <summary>
/// 响应倒计时计数 /// 响应倒计时计数

View File

@ -378,22 +378,35 @@ namespace HaoYueNet.ServerNetwork
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{ {
//读取数据 //读取数据
lock(token.memoryStream)
//读取数据
byte[] data = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
lock (token.Buffer)
//lock(token.memoryStream)
{ {
//token.Buffer.AddRange(data); token.Buffer.AddRange(data);
token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred); //token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred);
}
do do
{ {
DataCallBackReady(token,token.memoryStream.ToArray()); DataCallBackReady(token, data);
//流复用的方式 不用重新new申请 //从数据池中移除这组数据
token.memoryStream.Position = 0; lock (token.Buffer)
token.memoryStream.SetLength(0); {
token.Buffer.Clear();
}
//DataCallBackReady(token, token.memoryStream.ToArray());
////流复用的方式 不用重新new申请
//token.memoryStream.Position = 0;
//token.memoryStream.SetLength(0);
//这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收. //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.
//若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了. //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.
} while (token.memoryStream.Length > 0); //} while (token.memoryStream.Length > 0);
} while (token.Buffer.Count > 4);
}
//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
if (!token.Socket.ReceiveAsync(e)) if (!token.Socket.ReceiveAsync(e))

View File

@ -378,68 +378,68 @@ namespace HaoYueNet.ServerNetwork
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{ {
//读取数据 //读取数据
//byte[] data = new byte[e.BytesTransferred]; byte[] data = new byte[e.BytesTransferred];
//Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred); Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
//lock (token.Buffer) lock (token.Buffer)
lock(token.memoryStream) //lock(token.memoryStream)
{ {
//token.Buffer.AddRange(data); token.Buffer.AddRange(data);
token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred); //token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred);
}
do do
{ {
//如果包头不完整 //如果包头不完整
//if (token.Buffer.Count < 4) if (token.Buffer.Count < 4)
if (token.memoryStream.Length < 4) //if (token.memoryStream.Length < 4)
break; break;
//判断包的长度 //判断包的长度
//byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray(); byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();
//int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
//if (packageLen > token.Buffer.Count - 4)
//{ //长度不够时,退出循环,让程序继续接收
// break;
//}
long FristBeginPos = token.memoryStream.Position;
byte[] lenBytes = new byte[4];
token.memoryStream.Seek(0, SeekOrigin.Begin);
token.memoryStream.Read(lenBytes,0,4);
int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4; int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
if (packageLen > token.memoryStream.Length - 4) if (packageLen > token.Buffer.Count - 4)
{ //长度不够时,退出循环,让程序继续接收 { //长度不够时,退出循环,让程序继续接收
break; break;
} }
//包够长时,则提取出来,交给后面的程序去处理 //long FristBeginPos = token.memoryStream.Position;
//byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray(); //byte[] lenBytes = new byte[4];
//token.memoryStream.Seek(0, SeekOrigin.Begin);
byte[] rev = new byte[packageLen]; //token.memoryStream.Read(lenBytes, 0, 4);
token.memoryStream.Seek(4, SeekOrigin.Begin); //int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
token.memoryStream.Read(rev, 0, packageLen); //if (packageLen > token.memoryStream.Length - 4)
//{ //长度不够时,退出循环,让程序继续接收
////从数据池中移除这组数据 // break;
//lock (token.Buffer)
//{
// token.Buffer.RemoveRange(0, packageLen + 4);
//} //}
token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin); //包够长时,则提取出来,交给后面的程序去处理
byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();
//byte[] rev = new byte[packageLen];
//token.memoryStream.Seek(4, SeekOrigin.Begin);
//token.memoryStream.Read(rev, 0, packageLen);
//从数据池中移除这组数据 //从数据池中移除这组数据
lock (token.memoryStream) lock (token.Buffer)
{ {
int numberOfBytesToRemove = packageLen + 4; token.Buffer.RemoveRange(0, packageLen + 4);
byte[] buf = token.memoryStream.GetBuffer();
Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)token.memoryStream.Length - numberOfBytesToRemove);
token.memoryStream.SetLength(token.memoryStream.Length - numberOfBytesToRemove);
} }
//token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin);
////从数据池中移除这组数据
//lock (token.memoryStream)
//{
// int numberOfBytesToRemove = packageLen + 4;
// byte[] buf = token.memoryStream.GetBuffer();
// Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)token.memoryStream.Length - numberOfBytesToRemove);
// token.memoryStream.SetLength(token.memoryStream.Length - numberOfBytesToRemove);
//}
DataCallBackReady(token, rev); DataCallBackReady(token, rev);
//这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收. //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.
//若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了. //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.
//} while (token.Buffer.Count > 4); } while (token.Buffer.Count > 4);
} while (token.memoryStream.Length > 4); //} while (token.memoryStream.Length > 4);
}
//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
if (!token.Socket.ReceiveAsync(e)) if (!token.Socket.ReceiveAsync(e))