From d9534a1a24d9e9aaac3da5cd806a008ab6843e00 Mon Sep 17 00:00:00 2001 From: sin365 <353374337@qq.com> Date: Tue, 16 Jan 2024 14:38:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8E=9F=E5=A7=8B=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NetworkHelperCore_SourceMode.cs | 246 ++++++ .../SourceMode/TcpSaeaServer_SourceMode.cs | 727 ++++++++++++++++++ .../SourceMode/TokenMsgPool_SourceMode.cs | 57 ++ 3 files changed, 1030 insertions(+) create mode 100644 NetLib/HaoYueNet.ClientNetwork/NetworkHelperCore_SourceMode.cs create mode 100644 NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TcpSaeaServer_SourceMode.cs create mode 100644 NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TokenMsgPool_SourceMode.cs diff --git a/NetLib/HaoYueNet.ClientNetwork/NetworkHelperCore_SourceMode.cs b/NetLib/HaoYueNet.ClientNetwork/NetworkHelperCore_SourceMode.cs new file mode 100644 index 0000000..e6a3c4d --- /dev/null +++ b/NetLib/HaoYueNet.ClientNetwork/NetworkHelperCore_SourceMode.cs @@ -0,0 +1,246 @@ +using System.Net; +using System.Net.Sockets; +using static HaoYueNet.ClientNetwork.BaseData; + +namespace HaoYueNet.ClientNetwork +{ + public class NetworkHelperCore_SourceMode + { + private Socket client; + + ////响应倒计时计数最大值 + //private static int MaxRevIndexNum = 6; + + ////发送倒计时计数最大值 + //private static int MaxSendIndexNum = 3; + + //响应倒计时计数最大值 + private static int MaxRevIndexNum = 50; + + //发送倒计时计数最大值 + private static int MaxSendIndexNum = 3; + + //响应倒计时计数 + private static int RevIndex=0; + //发送倒计时计数 + private static int SendIndex=0; + + //计时器间隔 + private static int TimerInterval = 3000; + + public static string LastConnectIP; + public static int LastConnectPort; + public bool bDetailedLog = false; + + public bool Init(string IP, int port,bool isHadDetailedLog = true, bool bBindReuseAddress = false,int bBindport = 0) + { + LogOut("==>初始化网络核心"); + + bDetailedLog = isHadDetailedLog; + RevIndex = MaxRevIndexNum; + SendIndex = MaxSendIndexNum; + + client = new Socket(SocketType.Stream, ProtocolType.Tcp); + if (bBindReuseAddress) + { + client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + IPEndPoint ipe = new IPEndPoint(IPAddress.Any, Convert.ToInt32(bBindport)); + client.Bind(ipe); + } + LastConnectIP = IP; + LastConnectPort = port; + return Connect(IP, port); + } + + bool Connect(string IP, int port) + { + //带回调的 + try + { + if(bDetailedLog) + LogOut("连接到远程IP " + IP + ":" + port); + else + LogOut("连接到远程服务"); + + client.Connect(IP, port); + Thread thread = new Thread(Recive); + thread.IsBackground = true; + thread.Start(client); + int localport = ((IPEndPoint)client.LocalEndPoint).Port; + + if (bDetailedLog) + LogOut($"连接成功!连接到远程IP->{IP}:{port} | 本地端口->{localport}"); + else + LogOut("连接成功!"); + + if (bDetailedLog) + LogOut("开启心跳包检测"); + + OnConnected?.Invoke(true); + return true; + } + catch (Exception ex) + { + if (bDetailedLog) + LogOut("连接失败:" + ex.ToString()); + else + LogOut("连接失败"); + + OnConnected?.Invoke(false); + return false; + } + } + + ~NetworkHelperCore_SourceMode() + { + client.Close(); + } + + private void SendToSocket(byte[] data) + { + //已拼接包长度,这里不再需要拼接长度 + //data = SendDataWithHead(data); + try + { + SendWithIndex(data); + } + catch (Exception ex) + { + //连接断开 + OnCloseReady(); + return; + } + //LogOut("发送消息,消息长度=> "+data.Length); + } + + private void SendHeartbeat() + { + try + { + SendWithIndex(HeartbeatData); + } + catch (Exception ex) + { + //连接断开 + OnCloseReady(); + return; + } + //LogOut("发送心跳包"); + } + + /// + /// 发送数据并计数 + /// + /// + private void SendWithIndex(byte[] data) + { + //增加发送计数 + SendIndex = MaxSendIndexNum; + //发送数据 + client.Send(data); + } + + /// + /// 供外部调用 发送消息 + /// + /// + /// 序列化之后的数据 + public void SendToServer(byte[] data) + { + //LogOut("准备数据 data=> "+data); + SendToSocket(data); + } + + #region 事件定义 + public delegate void OnReceiveDataHandler(byte[] data); + public delegate void OnConnectedHandler(bool IsConnected); + public delegate void OnCloseHandler(); + public delegate void OnLogOutHandler(string Msg); + #endregion + + public event OnConnectedHandler OnConnected; + public event OnReceiveDataHandler OnReceiveData; + public event OnCloseHandler OnClose; + /// + /// 网络库调试日志输出 + /// + public event OnLogOutHandler OnLogOut; + + /// + /// 做好处理的连接管理 + /// + private void OnCloseReady() + { + LogOut("关闭连接"); + //关闭Socket连接 + client.Close(); + OnClose?.Invoke(); + } + + /// + /// 主动关闭连接 + /// + public void CloseConntect() + { + OnCloseReady(); + } + + private void DataCallBackReady(byte[] data) + { + //增加接收计数 + RevIndex = MaxRevIndexNum; + OnReceiveData(data); + } + + MemoryStream memoryStream = new MemoryStream();//开辟一个内存流 + private void Recive(object o) + { + var client = o as Socket; + //MemoryStream memoryStream = new MemoryStream();//开辟一个内存流 + + while (true) + { + byte[] buffer = new byte[1024 * 1024 * 2]; + int effective=0; + try + { + effective = client.Receive(buffer); + if (effective == 0) + { + continue; + } + } + catch(Exception ex) + { + //远程主机强迫关闭了一个现有的连接 + OnCloseReady(); + return; + //断开连接 + } + memoryStream.Write(buffer, 0, effective);//将接受到的数据写入内存流中 + while (true) + { + if (effective > 0)//如果接受到的消息不为0(不为空) + { + DataCallBackReady(memoryStream.ToArray()); + //流复用的方式 不用重新new申请 + memoryStream.Position = 0; + memoryStream.SetLength(0); + } + } + } + } + + + public void LogOut(string Msg) + { + //Console.WriteLine(Msg); + OnLogOut?.Invoke(Msg); + } + + public Socket GetClientSocket() + { + return client; + } + } +} diff --git a/NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TcpSaeaServer_SourceMode.cs b/NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TcpSaeaServer_SourceMode.cs new file mode 100644 index 0000000..808e313 --- /dev/null +++ b/NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TcpSaeaServer_SourceMode.cs @@ -0,0 +1,727 @@ +//using HunterProtobufCore; +using System.Net; +using System.Net.Sockets; +using static HaoYueNet.ServerNetwork.BaseData; + +namespace HaoYueNet.ServerNetwork +{ + public class TcpSaeaServer_SourceMode + { + #region 定义属性 + protected int MaxRevIndexNum = 50;//响应倒计时计数最大值 + protected int MaxSendIndexNum = 3;//发送倒计时计数最大值 + protected static int TimerInterval = 3000;//计时器间隔 + protected System.Timers.Timer _heartTimer;//心跳包计数器 + public int m_maxConnectNum; //最大连接数 + public int m_revBufferSize; //最大接收字节数 + protected BufferManager m_bufferManager; + protected const int opsToAlloc = 2; + Socket listenSocket; //监听Socket + protected SocketEventPool m_Receivepool; + protected SocketEventPool m_Sendpool; + protected TokenMsgPool_SourceMode msg_pool; + protected int m_clientCount; //连接的客户端数量 + protected Semaphore m_maxNumberAcceptedClients;//信号量 + protected Dictionary _DictSocketAsyncUserToken = new Dictionary(); + List m_clients; //客户端列表 + public List ClientList { private set { m_clients = value; } get { return m_clients; } } //获取客户端列表 + #endregion + + #region 定义委托 + /// + /// 客户端连接数量变化时触发 + /// + /// 当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1) + /// 增加用户的信息 + public delegate void OnClientNumberChangeHandler(int num, AsyncUserToken token); + /// + /// 接收到客户端的数据 + /// + /// 客户端 + /// 客户端数据 + public delegate void OnReceiveDataHandler(AsyncUserToken sk, int CMDID, byte[] data); + /// + /// 断开连接 + /// + /// + public delegate void OnDisconnectHandler(AsyncUserToken sk); + /// + /// 日志 + /// + /// + public delegate void OnNetLogHandler(string msg); + #endregion + + #region 定义事件 + /// + /// 客户端连接数量变化事件 + /// + public event OnClientNumberChangeHandler OnClientNumberChange; + /// + /// 接收到客户端的数据事件 + /// + public event OnReceiveDataHandler OnReceive; + /// + /// 接收到客户端的断开连接 + /// + public event OnDisconnectHandler OnDisconnected; + /// + /// 网络库内部输出 + /// + public event OnNetLogHandler OnNetLog; + #endregion + + /// + /// 构造函数 + /// + /// 最大连接数 + /// 缓存区大小 + public TcpSaeaServer_SourceMode(int numConnections, int receiveBufferSize) + { + m_clientCount = 0; + m_maxConnectNum = numConnections; + m_revBufferSize = receiveBufferSize; + // allocate buffers such that the maximum number of sockets can have one outstanding read and + //write posted to the socket simultaneously + m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize); + + m_Receivepool = new SocketEventPool(numConnections); + m_Sendpool = new SocketEventPool(numConnections); + + msg_pool = new TokenMsgPool_SourceMode(numConnections); + + m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections); + } + + #region Client操作 + /// + /// 初始化 + /// + public void Init() + { + // Allocates one large byte buffer which all I/O operations use a piece of. This gaurds + // against memory fragmentation + m_bufferManager.InitBuffer(); + m_clients = new List(); + // preallocate pool of SocketAsyncEventArgs objects + SocketAsyncEventArgs readWriteEventArg; + + for (int i = 0; i < m_maxConnectNum; i++) + { + readWriteEventArg = new SocketAsyncEventArgs(); + readWriteEventArg.Completed += new EventHandler(IO_Completed); + readWriteEventArg.UserToken = new AsyncUserToken(); + // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object + m_bufferManager.SetBuffer(readWriteEventArg); + // add SocketAsyncEventArg to the pool + m_Receivepool.Push(readWriteEventArg); + } + + for (int i = 0; i < m_maxConnectNum; i++) + { + readWriteEventArg = new SocketAsyncEventArgs(); + readWriteEventArg.Completed += new EventHandler(IO_Completed); + readWriteEventArg.UserToken = new AsyncUserToken(); + + //发送是否需要如此设置 TODO + m_bufferManager.SetBuffer(readWriteEventArg); + + m_Sendpool.Push(readWriteEventArg); + } + OutNetLog("初始化完毕"); + } + /// + /// 启动服务 + /// + /// + /// 是否端口重用 + /// + public bool Start(IPEndPoint localEndPoint, bool bReuseAddress = false) + { + try + { + ClearUserToken(); + listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + + if (bReuseAddress) + { + listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + } + + listenSocket.Bind(localEndPoint); + // start the server with a listen backlog of 100 connections + listenSocket.Listen(m_maxConnectNum); + // post accepts on the listening socket + StartAccept(null); + + OutNetLog("监听:" + listenSocket.LocalEndPoint.ToString()); + + _heartTimer = new System.Timers.Timer(); + _heartTimer.Interval = TimerInterval; + _heartTimer.Elapsed += CheckUpdatetimer_Elapsed; + _heartTimer.AutoReset = true; + _heartTimer.Enabled = true; + OutNetLog("开启定时心跳包"); + + return true; + } + catch (Exception) + { + return false; + } + } + /// + /// 停止服务 + /// + public void Stop() + { + foreach (AsyncUserToken token in m_clients) + { + try + { + token.Socket.Shutdown(SocketShutdown.Both); + } + catch (Exception) { } + } + try + { + listenSocket.Shutdown(SocketShutdown.Both); + } + catch (Exception) { } + + listenSocket.Close(); + int c_count = m_clients.Count; + ClearUserToken(); + + if (OnClientNumberChange != null) + OnClientNumberChange(-c_count, null); + } + public void CloseClient(AsyncUserToken token) + { + try {token.Socket.Shutdown(SocketShutdown.Both);} + catch (Exception) { } + } + /// + /// 关闭客户端连接 + /// + /// + void CloseClientSocket(SocketAsyncEventArgs e) + { + AsyncUserToken token = e.UserToken as AsyncUserToken; + CloseReady(token); + // 释放SocketAsyncEventArg,以便其他客户端可以重用它们 + ReleaseSocketAsyncEventArgs(e); + } + void CloseReady(AsyncUserToken token) + { + OnDisconnected?.Invoke(token); + RemoveUserToken(token); + //如果有事件,则调用事件,发送客户端数量变化通知 + OnClientNumberChange?.Invoke(-1, token); + // 关闭与客户端关联的套接字 + try { token.Socket.Shutdown(SocketShutdown.Send); } catch (Exception) { } + token.Socket.Close(); + // 递减计数器以跟踪连接到服务器的客户端总数 + Interlocked.Decrement(ref m_clientCount); + m_maxNumberAcceptedClients.Release(); + } + #endregion + + #region Token管理 + public AsyncUserToken GetAsyncUserTokenForSocket(Socket sk) + { + return _DictSocketAsyncUserToken.ContainsKey(sk) ? _DictSocketAsyncUserToken[sk] : null; + } + void AddUserToken(AsyncUserToken userToken) + { + lock (_DictSocketAsyncUserToken) + { + m_clients.Add(userToken); + _DictSocketAsyncUserToken.Add(userToken.Socket, userToken); + } + } + void RemoveUserToken(AsyncUserToken userToken) + { + lock (_DictSocketAsyncUserToken) + { + m_clients.Remove(userToken); + _DictSocketAsyncUserToken.Remove(userToken.Socket); + } + } + void ClearUserToken() + { + lock (_DictSocketAsyncUserToken) + { + m_clients.Clear(); + _DictSocketAsyncUserToken.Clear(); + } + } + /// + /// 回收SocketAsyncEventArgs + /// + /// + /// + void ReleaseSocketAsyncEventArgs(SocketAsyncEventArgs saea) + { + //saea.UserToken = null;//TODO + //saea.SetBuffer(null, 0, 0); + //saea.Dispose(); + //↑ 这里不要自作主张去清东西,否则回收回去不可用 + + switch (saea.LastOperation) + { + case SocketAsyncOperation.Receive: + m_Receivepool.Push(saea); + break; + case SocketAsyncOperation.Send: + m_Sendpool.Push(saea); + break; + default: + throw new ArgumentException("ReleaseSocketAsyncEventArgs > The last operation completed on the socket was not a receive or send"); + } + + } + #endregion + + #region 监听IOCP循环 + /// + /// 开始接受客户端的连接请求的操作 + /// + /// 在服务器的侦听套接字上发出接受操作时要使用的上下文对象 + public void StartAccept(SocketAsyncEventArgs acceptEventArg) + { + if (acceptEventArg == null) + { + acceptEventArg = new SocketAsyncEventArgs(); + acceptEventArg.Completed += new EventHandler(AcceptEventArg_Completed); + } + else + { + // socket must be cleared since the context object is being reused + acceptEventArg.AcceptSocket = null; + } + + m_maxNumberAcceptedClients.WaitOne(); + if (!listenSocket.AcceptAsync(acceptEventArg)) + { + ProcessAccept(acceptEventArg); + } + } + /// + /// 此方法是与Socket关联的回调方法。AcceptAsync操作,并在接受操作完成时调用 + /// + /// + /// + void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) + { + ProcessAccept(e); + } + private void ProcessAccept(SocketAsyncEventArgs e) + { + try + { + Interlocked.Increment(ref m_clientCount); + // Get the socket for the accepted client connection and put it into the + //ReadEventArg object user token + SocketAsyncEventArgs readEventArgs = m_Receivepool.Pop(); + //TODO readEventArgs.UserToken这里的 UserToken 有可能是空 + AsyncUserToken userToken; + if (readEventArgs.UserToken == null) + readEventArgs.UserToken = new AsyncUserToken(); + + userToken = (AsyncUserToken)readEventArgs.UserToken; + userToken.Socket = e.AcceptSocket; + userToken.ConnectTime = DateTime.Now; + userToken.Remote = e.AcceptSocket.RemoteEndPoint; + userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address; + + + userToken.RevIndex = MaxRevIndexNum; + userToken.SendIndex = MaxSendIndexNum; + + AddUserToken(userToken); + + OnClientNumberChange?.Invoke(1, userToken); + if (!e.AcceptSocket.ReceiveAsync(readEventArgs)) + { + ProcessReceive(readEventArgs); + } + } + catch (Exception me) + { + //RuncomLib.Log.LogUtils.Info(me.Message + "\r\n" + me.StackTrace); + } + + // Accept the next connection request + if (e.SocketError == SocketError.OperationAborted) return; + StartAccept(e); + } + #endregion + + #region 收发IOCP循环 + /// 当异步接收操作完成时,会调用此方法。 + /// 如果远程主机关闭了连接,则套接字关闭。 + /// 如果接收到数据,则将数据回显到客户端。 + /// + /// + private void ProcessReceive(SocketAsyncEventArgs e) + { + try + { + // check if the remote host closed the connection + AsyncUserToken token = (AsyncUserToken)e.UserToken; + if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) + { + //读取数据 + //byte[] data = new byte[e.BytesTransferred]; + //Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred); + //lock (token.Buffer) + lock(token.memoryStream) + { + //token.Buffer.AddRange(data); + token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred); + } + do + { + //如果包头不完整 + //if (token.Buffer.Count < 4) + if (token.memoryStream.Length < 4) + break; + + //判断包的长度 + //byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray(); + //int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4; + //if (packageLen > token.Buffer.Count - 4) + //{ //长度不够时,退出循环,让程序继续接收 + // break; + //} + + long FristBeginPos = token.memoryStream.Position; + byte[] lenBytes = new byte[4]; + token.memoryStream.Seek(0, SeekOrigin.Begin); + token.memoryStream.Read(lenBytes,0,4); + int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4; + if (packageLen > token.memoryStream.Length - 4) + { //长度不够时,退出循环,让程序继续接收 + break; + } + + //包够长时,则提取出来,交给后面的程序去处理 + //byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray(); + + byte[] rev = new byte[packageLen]; + token.memoryStream.Seek(4, SeekOrigin.Begin); + token.memoryStream.Read(rev, 0, packageLen); + + ////从数据池中移除这组数据 + //lock (token.Buffer) + //{ + // token.Buffer.RemoveRange(0, packageLen + 4); + //} + + token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin); + //从数据池中移除这组数据 + lock (token.memoryStream) + { + int numberOfBytesToRemove = packageLen + 4; + byte[] buf = token.memoryStream.GetBuffer(); + Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)token.memoryStream.Length - numberOfBytesToRemove); + token.memoryStream.SetLength(token.memoryStream.Length - numberOfBytesToRemove); + } + + DataCallBackReady(token, rev); + + //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收. + //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了. + //} while (token.Buffer.Count > 4); + } while (token.memoryStream.Length > 4); + + //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 + if (!token.Socket.ReceiveAsync(e)) + { + this.ProcessReceive(e); + } + } + else + { + CloseClientSocket(e); + } + } + catch (Exception xe) + { + //RuncomLib.Log.LogUtils.Info(xe.Message + "\r\n" + xe.StackTrace); + } + } + private void ProcessSend(SocketAsyncEventArgs e) + { + if (e.SocketError == SocketError.Success) + { + //TODO + } + else + { + CloseClientSocket(e); + return; + } + ReleaseSocketAsyncEventArgs(e); + SendForMsgPool(); + } + void IO_Completed(object sender, SocketAsyncEventArgs e) + { + // determine which type of operation just completed and call the associated handler + + switch (e.LastOperation) + { + case SocketAsyncOperation.Receive: + ProcessReceive(e); + break; + case SocketAsyncOperation.Send: + ProcessSend(e); + break; + default: + throw new ArgumentException("The last operation completed on the socket was not a receive or send"); + } + } + #endregion + + #region 发送 + int sendrun = 0; + /// + /// 对外暴露的发送消息 + /// + /// + /// 序列化之后的数据 + public void SendToSocket(Socket sk, int CMDID, int ERRCODE, byte[] data) + { + AsyncUserToken token = GetAsyncUserTokenForSocket(sk); + /*HunterNet_S2C _s2cdata = new HunterNet_S2C(); + _s2cdata.HunterNetCoreCmdID = CMDID; + _s2cdata.HunterNetCoreData = ByteString.CopyFrom(data); + _s2cdata.HunterNetCoreERRORCode = ERRCODE; + byte[] _finaldata = Serizlize(_s2cdata);*/ + + //byte[] _finaldata = HunterNet_S2C.CreatePkgData((ushort)CMDID, (ushort)ERRCODE, data); + + SendWithIndex(token, (ushort)CMDID, (ushort)ERRCODE, data); + } + void SendForMsgPool() + { + //if (flag_SendForMsgPool) return; + try + { + if (sendrun < msg_pool.Count || msg_pool.Count < 1) + return; + + sendrun++; + while (msg_pool.Count > 0) + { + try + { + TokenWithMsg_SourceMode msg = msg_pool.Dequeue(); + //OutNetLog("从信息池取出发送"); + //是心跳包 + if (msg.bHeartbeat) + { + SendHeartbeatMessage(msg.token); + } + else + { + SendMessage(msg.token,msg.data); + } + msg = null; + } + catch + { + OutNetLog("==============================================>"); + } + } + sendrun--; + OutNetLog("!!!!!!!!!!!!!!!!!!!!!!!!!!"); + } + catch (Exception ex) + { + OutNetLog(ex.ToString()); + } + + } + + /// + /// 发送心跳包 + /// + /// + void SendHeartbeatMessage(AsyncUserToken token) + { + if (token == null || token.Socket == null || !token.Socket.Connected) + return; + try + { + if (m_Sendpool.Count > 0) + { + SocketAsyncEventArgs myreadEventArgs = m_Sendpool.Pop(); + myreadEventArgs.UserToken = token; + myreadEventArgs.AcceptSocket = token.Socket; + //直接写入SocketAsyncEventArgs的Buff + HunterNet_Heartbeat.SetDataToSocketAsyncEventArgs(myreadEventArgs); + + //若不需要等待 + if (!token.Socket.SendAsync(myreadEventArgs)) + { + m_Sendpool.Push(myreadEventArgs); + } + return; + } + else + { + //先压入队列,等待m_Sendpool回收 + msg_pool.Enqueue(new TokenWithMsg_SourceMode() { token = token, bHeartbeat = true }); + //OutNetLog("!!!!压入消息发送队列MSG_Pool"); + return; + } + } + catch (Exception e) + { + OutNetLog(e.ToString()); + } + } + + /// + /// 发送数据并计数 + /// + /// + void SendWithIndex(AsyncUserToken token, UInt16 CmdID, UInt16 ERRCODE, byte[] data) + { + try + { + //发送数据 + SendMessage(token, CmdID, ERRCODE, data); + token.SendIndex = MaxSendIndexNum; + } + catch + { + CloseReady(token); + } + } + + void SendMessage(AsyncUserToken token, byte[] data) + { + if (token == null || token.Socket == null || !token.Socket.Connected) + return; + try + { + if (m_Sendpool.Count > 0) + { + SocketAsyncEventArgs myreadEventArgs = m_Sendpool.Pop(); + myreadEventArgs.UserToken = token; + myreadEventArgs.AcceptSocket = token.Socket; + myreadEventArgs.SetBuffer(data, 0, data.Length); //将数据放置进去. + + //若不需要等待 + if (!token.Socket.SendAsync(myreadEventArgs)) + { + m_Sendpool.Push(myreadEventArgs); + } + return; + } + else + { + //先压入队列,等待m_Sendpool回收 + msg_pool.Enqueue(new TokenWithMsg_SourceMode() { token = token, data = data ,bHeartbeat = false}); + //OutNetLog("!!!!压入消息发送队列MSG_Pool"); + return; + } + } + catch (Exception e) + { + OutNetLog(e.ToString()); + } + } + #endregion + + #region 处理前预备 + private void DataCallBackReady(AsyncUserToken sk, byte[] data) + { + //增加接收计数 + sk.RevIndex = MaxRevIndexNum; + + if (data.Length == 1 && data[0] == 0x00)//心跳包 + { + //OutNetLog("收到心跳包"); + //无处理 + } + else + { + try + { + //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度. + /* + HunterNet_C2S _s2c = DeSerizlize(data); + OnReceive?.Invoke(sk, (int)_s2c.HunterNetCoreCmdID, _s2c.HunterNetCoreData.ToArray()); + //DataCallBack(sk, (int)_s2c.HunterNetCoreCmdID, _s2c.HunterNetCoreData.ToArray()); + */ + HunterNet_C2S.AnalysisPkgData(data, out ushort CmdID, out byte[] resultdata); + OnReceive?.Invoke(sk, CmdID, resultdata); + } + catch (Exception ex) + { + OutNetLog("数据解析错误"); + } + } + } + private void OutNetLog(string msg) + { + OnNetLog?.Invoke(msg); + } + #endregion + + #region 心跳包 + /// + /// 发送心跳包 + /// + /// + /// + private void SendHeartbeatWithIndex(AsyncUserToken token) + { + if (token == null || token.Socket == null || !token.Socket.Connected) + return; + try + { + //OutNetLog(DateTime.Now.ToString() + "发送心跳包"); + token.SendIndex = MaxSendIndexNum; + SendHeartbeatMessage(token); + } + catch (Exception e) + { + CloseReady(token); + } + } + /// + /// 心跳包时钟事件 + /// + /// + /// + private void CheckUpdatetimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) + { + for (int i = 0; i < m_clients.Count(); i++) + { + //接收服务器数据计数 + m_clients[i].RevIndex--; + if (m_clients[i].RevIndex <= 0) + { + //判定掉线 + CloseReady(m_clients[i]); + return; + } + + //发送计数 + m_clients[i].SendIndex--; + if (m_clients[i].SendIndex <= 0)//需要发送心跳包了 + { + //重置倒计时计数 + m_clients[i].SendIndex = MaxSendIndexNum; + SendHeartbeatWithIndex(m_clients[i]); + } + } + } + #endregion + } +} diff --git a/NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TokenMsgPool_SourceMode.cs b/NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TokenMsgPool_SourceMode.cs new file mode 100644 index 0000000..ffb1c06 --- /dev/null +++ b/NetLib/HaoYueNet.ServerNetwork/NetWork/SourceMode/TokenMsgPool_SourceMode.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; + +namespace HaoYueNet.ServerNetwork +{ + public class TokenWithMsg_SourceMode + { + public AsyncUserToken token; + public byte[] data; + public bool bHeartbeat; + } + + public class TokenMsgPool_SourceMode + { + //Stack msg_pool; + Queue msg_pool; + + public TokenMsgPool_SourceMode(int capacity) + { + msg_pool = new Queue(capacity); + } + + /// + /// 向 Queue 的末尾添加一个对象。 + /// + /// + public void Enqueue(TokenWithMsg_SourceMode item) + { + lock (msg_pool) + { + msg_pool.Enqueue(item); + } + } + + //移除并返回在 Queue 的开头的对象。 + public TokenWithMsg_SourceMode Dequeue() + { + lock (msg_pool) + { + return msg_pool.Dequeue(); + } + } + + public int Count + { + get { return msg_pool.Count; } + } + + public void Clear() + { + msg_pool.Clear(); + } + } +}