From 25ad38edc5eaf527451f829509a198db0bdd8b2c Mon Sep 17 00:00:00 2001 From: sin365 <353374337@qq.com> Date: Tue, 18 Jun 2024 10:16:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A9=E5=B1=95IOCP=E7=9A=84=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- HaoYueNet.sln | 7 + .../IOCPMode/AsyncUserToken.cs | 54 ++ .../IOCPMode/BufferManager.cs | 60 ++ .../IOCPMode/MemoryStreamPool.cs | 41 + .../IOCPMode/SocketEventPool.cs | 45 ++ .../IOCPMode/TcpSaeaClient.cs | 726 ++++++++++++++++++ .../IOCPMode/TokenMsgPool.cs | 130 ++++ Simple/ClientSaeaTest/ClientSaeaTest.csproj | 14 + Simple/ClientSaeaTest/Program.cs | 19 + Simple/ClientSaeaTest/SaeaClient.cs | 69 ++ 10 files changed, 1165 insertions(+) create mode 100644 NetLib/HaoYueNet.ClientNetwork/IOCPMode/AsyncUserToken.cs create mode 100644 NetLib/HaoYueNet.ClientNetwork/IOCPMode/BufferManager.cs create mode 100644 NetLib/HaoYueNet.ClientNetwork/IOCPMode/MemoryStreamPool.cs create mode 100644 NetLib/HaoYueNet.ClientNetwork/IOCPMode/SocketEventPool.cs create mode 100644 NetLib/HaoYueNet.ClientNetwork/IOCPMode/TcpSaeaClient.cs create mode 100644 NetLib/HaoYueNet.ClientNetwork/IOCPMode/TokenMsgPool.cs create mode 100644 Simple/ClientSaeaTest/ClientSaeaTest.csproj create mode 100644 Simple/ClientSaeaTest/Program.cs create mode 100644 Simple/ClientSaeaTest/SaeaClient.cs diff --git a/HaoYueNet.sln b/HaoYueNet.sln index fb882b9..396c5a4 100644 --- a/HaoYueNet.sln +++ b/HaoYueNet.sln @@ -27,6 +27,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HaoYueNet.ClientNetworkNet. EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HaoYueNet.ServerNetwork.Standard2", "NetLib_Standard2\HaoYueNet.ServerNetwork.Standard2\HaoYueNet.ServerNetwork.Standard2.csproj", "{6BACBAAB-3777-4165-A2F7-7F9B517286B4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClientSaeaTest", "Simple\ClientSaeaTest\ClientSaeaTest.csproj", "{AE22B541-A21E-48F1-9913-C1ACEBF21874}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,6 +71,10 @@ Global {6BACBAAB-3777-4165-A2F7-7F9B517286B4}.Debug|Any CPU.Build.0 = Debug|Any CPU {6BACBAAB-3777-4165-A2F7-7F9B517286B4}.Release|Any CPU.ActiveCfg = Release|Any CPU {6BACBAAB-3777-4165-A2F7-7F9B517286B4}.Release|Any CPU.Build.0 = Release|Any CPU + {AE22B541-A21E-48F1-9913-C1ACEBF21874}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AE22B541-A21E-48F1-9913-C1ACEBF21874}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AE22B541-A21E-48F1-9913-C1ACEBF21874}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AE22B541-A21E-48F1-9913-C1ACEBF21874}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -83,6 +89,7 @@ Global {45FBC25D-9EC5-4C8E-A979-005F04CE76AB} = {A2CAD164-0816-4D1D-9793-1B1F398C9D29} {16AF64F5-6BED-4BD5-AD41-39816AD56769} = {F4C45C48-8011-4782-B0B3-99164D611A6C} {6BACBAAB-3777-4165-A2F7-7F9B517286B4} = {F4C45C48-8011-4782-B0B3-99164D611A6C} + {AE22B541-A21E-48F1-9913-C1ACEBF21874} = {A2CAD164-0816-4D1D-9793-1B1F398C9D29} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {22107F03-013F-4871-AC8E-F082694E2679} diff --git a/NetLib/HaoYueNet.ClientNetwork/IOCPMode/AsyncUserToken.cs b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/AsyncUserToken.cs new file mode 100644 index 0000000..2ef4f5b --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/AsyncUserToken.cs @@ -0,0 +1,54 @@ +using System.Net; +using System.Net.Sockets; + +namespace HaoYueNet.ClientNetwork.IOCPMode +{ + public class AsyncUserToken + { + /// + /// 客户端IP地址 + /// + public IPAddress IPAddress { get; set; } + + /// + /// 远程地址 + /// + public EndPoint Remote { get; set; } + + /// + /// 通信SOKET + /// + public Socket Socket { get; set; } + + /// + /// 连接时间 + /// + public DateTime ConnectTime { get; set; } + + /// + /// 所属用户信息 + /// + public object UserInfo { get; set; } + + /// + /// 数据缓存区 + /// + //public List Buffer { get; set; } + + public MemoryStream memoryStream { get; set; } + + public AsyncUserToken() + { + //this.Buffer = new List(); + memoryStream = new MemoryStream(); + } + /// + /// 响应倒计时计数 + /// + public int RevIndex { get; set; } = 0; + /// + /// 发送倒计时计数 + /// + public int SendIndex { get; set; } = 0; + } +} diff --git a/NetLib/HaoYueNet.ClientNetwork/IOCPMode/BufferManager.cs b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/BufferManager.cs new file mode 100644 index 0000000..c76a19a --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/BufferManager.cs @@ -0,0 +1,60 @@ +using System.Net.Sockets; + +namespace HaoYueNet.ClientNetwork.IOCPMode +{ + public class BufferManager + { + int m_numBytes; // the total number of bytes controlled by the buffer pool + byte[] m_buffer; // the underlying byte array maintained by the Buffer Manager + Stack m_freeIndexPool; // + int m_currentIndex; + int m_bufferSize; + + public BufferManager(int totalBytes, int bufferSize) + { + m_numBytes = totalBytes; + m_currentIndex = 0; + m_bufferSize = bufferSize; + m_freeIndexPool = new Stack(); + } + + // Allocates buffer space used by the buffer pool + public void InitBuffer() + { + // create one big large buffer and divide that + // out to each SocketAsyncEventArg object + m_buffer = new byte[m_numBytes]; + } + + // Assigns a buffer from the buffer pool to the + // specified SocketAsyncEventArgs object + // + // true if the buffer was successfully set, else false + public bool SetBuffer(SocketAsyncEventArgs args) + { + + if (m_freeIndexPool.Count > 0) + { + args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize); + } + else + { + if (m_numBytes - m_bufferSize < m_currentIndex) + { + return false; + } + args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize); + m_currentIndex += m_bufferSize; + } + return true; + } + + // Removes the buffer from a SocketAsyncEventArg object. + // This frees the buffer back to the buffer pool + public void FreeBuffer(SocketAsyncEventArgs args) + { + m_freeIndexPool.Push(args.Offset); + args.SetBuffer(null, 0, 0); + } + } +} diff --git a/NetLib/HaoYueNet.ClientNetwork/IOCPMode/MemoryStreamPool.cs b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/MemoryStreamPool.cs new file mode 100644 index 0000000..2500e0a --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/MemoryStreamPool.cs @@ -0,0 +1,41 @@ +using System.Net.Sockets; + +namespace HaoYueNet.ClientNetwork.IOCPMode +{ + + public class MemoryStreamPool + { + Stack m_pool; + + public MemoryStreamPool(int capacity) + { + m_pool = new Stack(capacity); + } + + public void Push(MemoryStream item) + { + if (item == null) { throw new ArgumentNullException("Items added to a MemoryStream cannot be null"); } + lock (m_pool) + { + m_pool.Push(item); + } + } + public MemoryStream Pop() + { + lock (m_pool) + { + return m_pool.Pop(); + } + } + + public int Count + { + get { return m_pool.Count; } + } + + public void Clear() + { + m_pool.Clear(); + } + } +} diff --git a/NetLib/HaoYueNet.ClientNetwork/IOCPMode/SocketEventPool.cs b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/SocketEventPool.cs new file mode 100644 index 0000000..1e8bdca --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/SocketEventPool.cs @@ -0,0 +1,45 @@ +using System.Net.Sockets; + +namespace HaoYueNet.ClientNetwork.IOCPMode +{ + + public class SocketEventPool + { + Stack m_pool; + + public SocketEventPool(int capacity) + { + m_pool = new Stack(capacity); + } + + public void Push(SocketAsyncEventArgs item) + { + if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); } + lock (m_pool) + { + m_pool.Push(item); + } + } + + // Removes a SocketAsyncEventArgs instance from the pool + // and returns the object removed from the pool + public SocketAsyncEventArgs Pop() + { + lock (m_pool) + { + return m_pool.Pop(); + } + } + + // The number of SocketAsyncEventArgs instances in the pool + public int Count + { + get { return m_pool.Count; } + } + + public void Clear() + { + m_pool.Clear(); + } + } +} diff --git a/NetLib/HaoYueNet.ClientNetwork/IOCPMode/TcpSaeaClient.cs b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/TcpSaeaClient.cs new file mode 100644 index 0000000..0d6f88d --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/TcpSaeaClient.cs @@ -0,0 +1,726 @@ +//using HunterProtobufCore; +using System.IO; +using System.Net; +using System.Net.Sockets; +using static HaoYueNet.ClientNetwork.BaseData; + +namespace HaoYueNet.ClientNetwork.IOCPMode +{ + public class TcpSaeaClient + { + #region 定义属性 + protected int MaxRevIndexNum = 50;//响应倒计时计数最大值 + protected int MaxSendIndexNum = 3;//发送倒计时计数最大值 + protected static int TimerInterval = 3000;//计时器间隔 + protected System.Timers.Timer _heartTimer;//心跳包计数器 + public int m_maxConnectNum; //最大连接数 + public int m_revBufferSize; //最大接收字节数 + protected BufferManager m_bufferManager; + protected const int opsToAlloc = 2; + protected SocketEventPool m_Receivepool; + protected SocketEventPool m_Sendpool; + protected TokenMsgPool msg_pool; + protected int m_clientCount; //连接的客户端数量 + protected Semaphore m_maxNumberAcceptedClients;//信号量 + protected Dictionary _DictSocketAsyncUserToken = new Dictionary(); + List m_clients; //客户端列表 + public List ClientList { private set { m_clients = value; } get { return m_clients; } } //获取客户端列表 + #endregion + + #region 定义委托 + /// + /// 客户端连接数量变化时触发 + /// + /// 当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1) + /// 增加用户的信息 + public delegate void OnClientNumberChangeHandler(int num, AsyncUserToken token); + /// + /// 接收到客户端的数据 + /// + /// 客户端 + /// 客户端数据 + public delegate void OnReceiveDataHandler(AsyncUserToken sk, int CMDID, int ErrID, byte[] data); + /// + /// 断开连接 + /// + /// + public delegate void OnDisconnectHandler(AsyncUserToken sk); + /// + /// 日志 + /// + /// + public delegate void OnNetLogHandler(string msg); + #endregion + + #region 定义事件 + /// + /// 客户端连接数量变化事件 + /// + public event OnClientNumberChangeHandler OnClientNumberChange; + /// + /// 接收到客户端的数据事件 + /// + public event OnReceiveDataHandler OnReceive; + /// + /// 接收到客户端的断开连接 + /// + public event OnDisconnectHandler OnDisconnected; + /// + /// 网络库内部输出 + /// + public event OnNetLogHandler OnNetLog; + #endregion + + /// + /// 构造函数 + /// + /// 最大连接数 + /// 缓存区大小 + public TcpSaeaClient(int numConnections, int receiveBufferSize) + { + m_clientCount = 0; + m_maxConnectNum = numConnections; + m_revBufferSize = receiveBufferSize; + // allocate buffers such that the maximum number of sockets can have one outstanding read and + //write posted to the socket simultaneously + m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize); + + m_Receivepool = new SocketEventPool(numConnections); + m_Sendpool = new SocketEventPool(numConnections); + + msg_pool = new TokenMsgPool(numConnections); + + m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections); + } + + #region Client操作 + /// + /// 初始化 + /// + public void Init() + { + // Allocates one large byte buffer which all I/O operations use a piece of. This gaurds + // against memory fragmentation + m_bufferManager.InitBuffer(); + m_clients = new List(); + // preallocate pool of SocketAsyncEventArgs objects + SocketAsyncEventArgs readWriteEventArg; + + for (int i = 0; i < m_maxConnectNum; i++) + { + readWriteEventArg = new SocketAsyncEventArgs(); + readWriteEventArg.Completed += new EventHandler(IO_Completed); + readWriteEventArg.UserToken = new AsyncUserToken(); + // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object + m_bufferManager.SetBuffer(readWriteEventArg); + // add SocketAsyncEventArg to the pool + m_Receivepool.Push(readWriteEventArg); + } + + for (int i = 0; i < m_maxConnectNum; i++) + { + readWriteEventArg = new SocketAsyncEventArgs(); + readWriteEventArg.Completed += new EventHandler(IO_Completed); + readWriteEventArg.UserToken = new AsyncUserToken(); + + //发送是否需要如此设置 TODO + m_bufferManager.SetBuffer(readWriteEventArg); + + m_Sendpool.Push(readWriteEventArg); + } + OutNetLog("初始化完毕"); + } + /// + /// 启动服务 + /// + /// + /// 是否端口重用 + /// + public bool Start() + { + try + { + ClearUserToken(); + + _heartTimer = new System.Timers.Timer(); + _heartTimer.Interval = TimerInterval; + _heartTimer.Elapsed += CheckUpdatetimer_Elapsed; + _heartTimer.AutoReset = true; + _heartTimer.Enabled = true; + OutNetLog("开启定时心跳包"); + + return true; + } + catch (Exception) + { + return false; + } + } + + /// + /// 停止服务 + /// + public void Stop() + { + foreach (AsyncUserToken token in m_clients) + { + try + { + token.Socket.Shutdown(SocketShutdown.Both); + } + catch (Exception) { } + } + try + { + for (int i = 0; i < m_clients.Count; i++) + { + m_clients[i].Socket.Shutdown(SocketShutdown.Both); + } + //listenSocket.Shutdown(SocketShutdown.Both); + + for (int i = 0; i < m_clients.Count; i++) + { + m_clients[i].Socket.Close(); + } + } + catch (Exception) { } + + //listenSocket.Close(); + int c_count = m_clients.Count; + ClearUserToken(); + + if (OnClientNumberChange != null) + OnClientNumberChange(-c_count, null); + } + public void CloseClient(AsyncUserToken token) + { + try { token.Socket.Shutdown(SocketShutdown.Both); } + catch (Exception) { } + } + /// + /// 关闭客户端连接 + /// + /// + void CloseClientSocket(SocketAsyncEventArgs e) + { + AsyncUserToken token = e.UserToken as AsyncUserToken; + CloseReady(token); + // 释放SocketAsyncEventArg,以便其他客户端可以重用它们 + ReleaseSocketAsyncEventArgs(e); + } + void CloseReady(AsyncUserToken token) + { + OnDisconnected?.Invoke(token); + RemoveUserToken(token); + //如果有事件,则调用事件,发送客户端数量变化通知 + OnClientNumberChange?.Invoke(-1, token); + // 关闭与客户端关联的套接字 + try { token.Socket.Shutdown(SocketShutdown.Send); } catch (Exception) { } + token.Socket.Close(); + // 递减计数器以跟踪连接到服务器的客户端总数 + Interlocked.Decrement(ref m_clientCount); + m_maxNumberAcceptedClients.Release(); + } + #endregion + + #region Token管理 + public AsyncUserToken GetAsyncUserTokenForSocket(Socket sk) + { + return _DictSocketAsyncUserToken.ContainsKey(sk) ? _DictSocketAsyncUserToken[sk] : null; + } + void AddUserToken(AsyncUserToken userToken) + { + lock (_DictSocketAsyncUserToken) + { + m_clients.Add(userToken); + _DictSocketAsyncUserToken.Add(userToken.Socket, userToken); + } + } + void RemoveUserToken(AsyncUserToken userToken) + { + lock (_DictSocketAsyncUserToken) + { + m_clients.Remove(userToken); + _DictSocketAsyncUserToken.Remove(userToken.Socket); + } + } + void ClearUserToken() + { + lock (_DictSocketAsyncUserToken) + { + m_clients.Clear(); + _DictSocketAsyncUserToken.Clear(); + } + } + /// + /// 回收SocketAsyncEventArgs + /// + /// + /// + void ReleaseSocketAsyncEventArgs(SocketAsyncEventArgs saea) + { + //saea.UserToken = null;//TODO + //saea.SetBuffer(null, 0, 0); + //saea.Dispose(); + //↑ 这里不要自作主张去清东西,否则回收回去不可用 + + switch (saea.LastOperation) + { + case SocketAsyncOperation.Receive: + m_Receivepool.Push(saea); + break; + case SocketAsyncOperation.Send: + m_Sendpool.Push(saea); + break; + default: + throw new ArgumentException("ReleaseSocketAsyncEventArgs > The last operation completed on the socket was not a receive or send"); + } + + } + #endregion + + + + #region 连接 + + public void StartConnect(string ip, int port, Socket socket, SocketAsyncEventArgs connectEventArg = null) + { + if (connectEventArg == null) + { + connectEventArg = new SocketAsyncEventArgs(); + connectEventArg.Completed += new EventHandler(IO_Completed); + } + else + { + // socket must be cleared since the context object is being reused + connectEventArg.AcceptSocket = null; + //TODO ?? + //这里其实AcceptSocket是服务端用的,非监听的客户端用,本来就为空,清不清理没什么卵用,但是connectEventArg.ConnectSocket是只读的的 + } + + connectEventArg.RemoteEndPoint = new DnsEndPoint(ip, port); + + m_maxNumberAcceptedClients.WaitOne(); + + if (!socket.ConnectAsync(connectEventArg)) + { + ProcessConnect(connectEventArg); + } + } + + private void ProcessConnect(SocketAsyncEventArgs e) + { + try + { + Interlocked.Increment(ref m_clientCount); + + //确保监听结束时,有连接才抛给数据接收 + if (e.ConnectSocket.RemoteEndPoint != null) + { + // Get the socket for the accepted client connection and put it into the + //ReadEventArg object user token + SocketAsyncEventArgs readEventArgs = m_Receivepool.Pop(); + //TODO readEventArgs.UserToken这里的 UserToken 有可能是空 + AsyncUserToken userToken; + if (readEventArgs.UserToken == null) + readEventArgs.UserToken = new AsyncUserToken(); + + userToken = (AsyncUserToken)readEventArgs.UserToken; + userToken.Socket = e.ConnectSocket; + userToken.ConnectTime = DateTime.Now; + userToken.Remote = e.ConnectSocket.RemoteEndPoint; + userToken.IPAddress = ((IPEndPoint)e.ConnectSocket.RemoteEndPoint).Address; + + userToken.RevIndex = MaxRevIndexNum; + userToken.SendIndex = MaxSendIndexNum; + + AddUserToken(userToken); + + OnClientNumberChange?.Invoke(1, userToken); + if (!e.ConnectSocket.ReceiveAsync(readEventArgs)) + { + ProcessReceive(readEventArgs); + } + } + } + catch (Exception me) + { + //RuncomLib.Log.LogUtils.Info(me.Message + "\r\n" + me.StackTrace); + } + + // Accept the next connection request + if (e.SocketError == SocketError.OperationAborted) return; + //StartAccept(e); + } + #endregion + + #region 收发IOCP循环 + /// 当异步接收操作完成时,会调用此方法。 + /// 如果远程主机关闭了连接,则套接字关闭。 + /// 如果接收到数据,则将数据回显到客户端。 + /// + /// + private void ProcessReceive(SocketAsyncEventArgs e) + { + try + { + // check if the remote host closed the connection + AsyncUserToken token = (AsyncUserToken)e.UserToken; + if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) + { + //读取数据 + //byte[] data = new byte[e.BytesTransferred]; + //Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred); + //lock (token.Buffer) + lock (token.memoryStream) + { + //token.Buffer.AddRange(data); + token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred); + do + { + //如果包头不完整 + //if (token.Buffer.Count < 4) + if (token.memoryStream.Length < 4) + break; + + ////判断包的长度 + //byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray(); + //int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4; + //if (packageLen > token.Buffer.Count - 4) + //{ //长度不够时,退出循环,让程序继续接收 + // break; + //} + + long FristBeginPos = token.memoryStream.Position; + byte[] lenBytes = new byte[4]; + token.memoryStream.Seek(0, SeekOrigin.Begin); + token.memoryStream.Read(lenBytes, 0, 4); + int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4; + if (packageLen > token.memoryStream.Length - 4) + { + token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin); + //长度不够时,退出循环,让程序继续接收 + break; + } + + ////包够长时,则提取出来,交给后面的程序去处理 + //byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray(); + + byte[] rev = new byte[packageLen]; + token.memoryStream.Seek(4, SeekOrigin.Begin); + token.memoryStream.Read(rev, 0, packageLen); + + //从数据池中移除这组数据 + //lock (token.Buffer) + //{ + // token.Buffer.RemoveRange(0, packageLen + 4); + //} + + token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin); + //从数据池中移除这组数据 + lock (token.memoryStream) + { + //token.memoryStream.Position = 0; + //token.memoryStream.SetLength(0); + int numberOfBytesToRemove = packageLen + 4; + byte[] buf = token.memoryStream.GetBuffer(); + Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)token.memoryStream.Length - numberOfBytesToRemove); + token.memoryStream.SetLength(token.memoryStream.Length - numberOfBytesToRemove); + } + + DataCallBackReady(token, rev); + + //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收. + //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了. + //} while (token.Buffer.Count > 4); + } while (token.memoryStream.Length > 4); + } + + //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 + if (!token.Socket.ReceiveAsync(e)) + { + ProcessReceive(e); + } + } + else + { + //尝试性,清理数据 + token.memoryStream.SetLength(0); + token.memoryStream.Seek(0, SeekOrigin.Begin); + + CloseClientSocket(e); + } + } + catch (Exception xe) + { + //RuncomLib.Log.LogUtils.Info(xe.Message + "\r\n" + xe.StackTrace); + } + } + private void ProcessSend(SocketAsyncEventArgs e) + { + if (e.SocketError == SocketError.Success) + { + //TODO + } + else + { + CloseClientSocket(e); + return; + } + ReleaseSocketAsyncEventArgs(e); + SendForMsgPool(); + } + void IO_Completed(object sender, SocketAsyncEventArgs e) + { + // determine which type of operation just completed and call the associated handler + + switch (e.LastOperation) + { + case SocketAsyncOperation.Connect: + ProcessConnect(e); + break; + case SocketAsyncOperation.Receive: + ProcessReceive(e); + break; + case SocketAsyncOperation.Send: + ProcessSend(e); + break; + default: + throw new ArgumentException("The last operation completed on the socket was not a receive or send"); + } + } + #endregion + + #region 发送 + int sendrun = 0; + /// + /// 对外暴露的发送消息 + /// + /// + /// 序列化之后的数据 + public void SendToSocket(Socket sk, int CMDID, byte[] data) + { + AsyncUserToken token = GetAsyncUserTokenForSocket(sk); + SendWithIndex(token, (ushort)CMDID, data); + } + + void SendForMsgPool() + { + //if (flag_SendForMsgPool) return; + try + { + if (sendrun < msg_pool.Count || msg_pool.Count < 1) + return; + + sendrun++; + while (msg_pool.Count > 0) + { + try + { + TokenWithMsg msg = msg_pool.Dequeue(); + //OutNetLog("从信息池取出发送"); + //是心跳包 + if (msg.bHeartbeat) + { + SendHeartbeatMessage(msg.token); + } + else + { + SendMessage(msg.token, msg.CMDID, msg.data); + } + msg = null; + } + catch + { + OutNetLog("==============================================>"); + } + } + sendrun--; + OutNetLog("!!!!!!!!!!!!!!!!!!!!!!!!!!"); + } + catch (Exception ex) + { + OutNetLog(ex.ToString()); + } + + } + + /// + /// 发送心跳包 + /// + /// + void SendHeartbeatMessage(AsyncUserToken token) + { + if (token == null || token.Socket == null || !token.Socket.Connected) + return; + try + { + if (m_Sendpool.Count > 0) + { + SocketAsyncEventArgs myreadEventArgs = m_Sendpool.Pop(); + myreadEventArgs.UserToken = token; + //TODO ????客户端用这里是否应该直接就不设置 + //myreadEventArgs.AcceptSocket = token.Socket; + + //直接写入SocketAsyncEventArgs的Buff + HunterNet_Heartbeat.SetDataToSocketAsyncEventArgs(myreadEventArgs); + + //若不需要等待 + if (!token.Socket.SendAsync(myreadEventArgs)) + { + m_Sendpool.Push(myreadEventArgs); + } + return; + } + else + { + //先压入队列,等待m_Sendpool回收 + msg_pool.Enqueue(new TokenWithMsg() { token = token, bHeartbeat = true }); + //OutNetLog("!!!!压入消息发送队列MSG_Pool"); + return; + } + } + catch (Exception e) + { + OutNetLog(e.ToString()); + } + } + + /// + /// 发送数据并计数 + /// + /// + void SendWithIndex(AsyncUserToken token, ushort CmdID, byte[] data) + { + try + { + //发送数据 + SendMessage(token, CmdID, data); + token.SendIndex = MaxSendIndexNum; + } + catch + { + CloseReady(token); + } + } + + void SendMessage(AsyncUserToken token, ushort CmdID, byte[] data) + { + if (token == null || token.Socket == null || !token.Socket.Connected) + return; + try + { + if (m_Sendpool.Count > 0) + { + SocketAsyncEventArgs myreadEventArgs = m_Sendpool.Pop(); + myreadEventArgs.UserToken = token; + myreadEventArgs.AcceptSocket = token.Socket; + //myreadEventArgs.SetBuffer(message, 0, message.Length); //将数据放置进去. + //更换为CMDID和Data直接写入SocketAsyncEventArgs的Buff + HunterNet_C2S.SetDataToSocketAsyncEventArgs(myreadEventArgs, CmdID, data); + + //若不需要等待 + if (!token.Socket.SendAsync(myreadEventArgs)) + { + m_Sendpool.Push(myreadEventArgs); + } + return; + } + else + { + //先压入队列,等待m_Sendpool回收 + msg_pool.Enqueue(new TokenWithMsg() { token = token, CMDID = CmdID, data = data }); + //OutNetLog("!!!!压入消息发送队列MSG_Pool"); + return; + } + } + catch (Exception e) + { + OutNetLog(e.ToString()); + } + } + #endregion + + #region 处理前预备 + private void DataCallBackReady(AsyncUserToken sk, byte[] data) + { + //增加接收计数 + sk.RevIndex = MaxRevIndexNum; + + if (data.Length == 1 && data[0] == 0x00)//心跳包 + { + //OutNetLog("收到心跳包"); + //无处理 + } + else + { + try + { + //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度. + HunterNet_S2C.AnalysisPkgData(data, out ushort CmdID, out ushort Error, out byte[] resultdata); + OnReceive?.Invoke(sk, CmdID, Error, resultdata); + } + catch (Exception ex) + { + OutNetLog("数据解析错误"); + } + } + } + private void OutNetLog(string msg) + { + OnNetLog?.Invoke(msg); + } + #endregion + + #region 心跳包 + /// + /// 发送心跳包 + /// + /// + /// + private void SendHeartbeatWithIndex(AsyncUserToken token) + { + if (token == null || token.Socket == null || !token.Socket.Connected) + return; + try + { + //OutNetLog(DateTime.Now.ToString() + "发送心跳包"); + token.SendIndex = MaxSendIndexNum; + SendHeartbeatMessage(token); + } + catch (Exception e) + { + CloseReady(token); + } + } + /// + /// 心跳包时钟事件 + /// + /// + /// + private void CheckUpdatetimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) + { + for (int i = 0; i < m_clients.Count(); i++) + { + //接收服务器数据计数 + m_clients[i].RevIndex--; + if (m_clients[i].RevIndex <= 0) + { + //判定掉线 + CloseReady(m_clients[i]); + return; + } + + //发送计数 + m_clients[i].SendIndex--; + if (m_clients[i].SendIndex <= 0)//需要发送心跳包了 + { + //重置倒计时计数 + m_clients[i].SendIndex = MaxSendIndexNum; + SendHeartbeatWithIndex(m_clients[i]); + } + } + } + #endregion + } +} diff --git a/NetLib/HaoYueNet.ClientNetwork/IOCPMode/TokenMsgPool.cs b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/TokenMsgPool.cs new file mode 100644 index 0000000..bd171db --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/IOCPMode/TokenMsgPool.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; + +namespace HaoYueNet.ClientNetwork.IOCPMode +{ + public class TokenWithMsg + { + public AsyncUserToken token; + public ushort CMDID; + //public UInt16 Error; + public byte[] data; + public bool bHeartbeat; + } + + public class TokenMsgPool + { + //Stack msg_pool; + Queue msg_pool; + + public TokenMsgPool(int capacity) + { + msg_pool = new Queue(capacity); + } + + /// + /// 向 Queue 的末尾添加一个对象。 + /// + /// + public void Enqueue(TokenWithMsg item) + { + lock (msg_pool) + { + msg_pool.Enqueue(item); + } + } + + //移除并返回在 Queue 的开头的对象。 + public TokenWithMsg Dequeue() + { + lock (msg_pool) + { + return msg_pool.Dequeue(); + } + } + + public int Count + { + get { return msg_pool.Count; } + } + + public void Clear() + { + msg_pool.Clear(); + } + } + + /* + public class TokenWithMsg + { + public AsyncUserToken token; + public byte[] message; + } + + public class TokenMsgPool + { + //Stack msg_pool; + Queue msg_pool; + + public TokenMsgPool(int capacity) + { + //msg_pool = new Stack(capacity); + msg_pool = new Queue(capacity); + } + + //public void Push(TokenWithMsg item) + //{ + // if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); } + // lock (msg_pool) + // { + // msg_pool.Push(item); + // } + //} + + /// + /// 向 Queue 的末尾添加一个对象。 + /// + /// + public void Enqueue(TokenWithMsg item) + { + lock (msg_pool) + { + msg_pool.Enqueue(item); + } + } + + //移除并返回在 Queue 的开头的对象。 + public TokenWithMsg Dequeue() + { + lock (msg_pool) + { + return msg_pool.Dequeue(); + } + } + + //// Removes a SocketAsyncEventArgs instance from the pool + //// and returns the object removed from the pool + //public TokenWithMsg Pop() + //{ + // lock (msg_pool) + // { + // return msg_pool.Pop(); + // } + //} + + // The number of SocketAsyncEventArgs instances in the pool + public int Count + { + get { return msg_pool.Count; } + } + + public void Clear() + { + msg_pool.Clear(); + } + } + */ +} diff --git a/Simple/ClientSaeaTest/ClientSaeaTest.csproj b/Simple/ClientSaeaTest/ClientSaeaTest.csproj new file mode 100644 index 0000000..37af9ce --- /dev/null +++ b/Simple/ClientSaeaTest/ClientSaeaTest.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/Simple/ClientSaeaTest/Program.cs b/Simple/ClientSaeaTest/Program.cs new file mode 100644 index 0000000..6ef3c7c --- /dev/null +++ b/Simple/ClientSaeaTest/Program.cs @@ -0,0 +1,19 @@ +using System.Net.Sockets; + +namespace ClientSaeaTest +{ + internal class Program + { + static SaeaClient client = new SaeaClient(1024,1024); + static void Main(string[] args) + { + client.Init(); + client.Start(); + Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + client.Connect("139.186.160.243",1000); + + Console.WriteLine("Hello, World!"); + Console.ReadLine(); + } + } +} diff --git a/Simple/ClientSaeaTest/SaeaClient.cs b/Simple/ClientSaeaTest/SaeaClient.cs new file mode 100644 index 0000000..0184e75 --- /dev/null +++ b/Simple/ClientSaeaTest/SaeaClient.cs @@ -0,0 +1,69 @@ +using HaoYueNet.ClientNetwork.IOCPMode; +using System.Net.Sockets; + +namespace ClientSaeaTest +{ + internal class SaeaClient : TcpSaeaClient + { + public SaeaClient(int numConnections, int receiveBufferSize) + : base(numConnections, receiveBufferSize) + { + OnClientNumberChange += ClientNumberChange; + OnReceive += ReceiveData; + OnDisconnected += OnDisconnect; + OnNetLog += OnShowNetLog; + } + + + private void ClientNumberChange(int num, AsyncUserToken token) + { + Console.WriteLine("Client数发生变化"); + } + + public void Connect(string ip, int port) + { + Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + base.StartConnect(ip, port, socket); + } + + /// + /// 接受包回调 + /// + /// 协议ID + /// 错误编号 + /// 业务数据 + private void ReceiveData(AsyncUserToken token, int CMDID,int Error, byte[] data) + { + DataCallBack(token.Socket, CMDID, Error,data); + } + + public void DataCallBack(Socket sk, int CMDID, int Error, byte[] data) + { + Console.WriteLine("收到消息 CMDID =>" + CMDID + " 数据长度=>" + data.Length); + try + { + //抛出网络数据 + //NetMsg.Instance.PostNetMsgEvent(CMDID, sk, data); + } + catch (Exception ex) + { + Console.WriteLine("逻辑处理错误:" + ex.ToString()); + } + } + + /// + /// 断开连接 + /// + /// + public void OnDisconnect(AsyncUserToken token) + { + Console.WriteLine("断开连接"); + //ServerManager.g_ClientMgr.SetClientOfflineForSocket(token.Socket); + } + + public void OnShowNetLog(string msg) + { + //ServerManager.g_Log.Debug(msg); + } + } +}