完善IOCP库处理

This commit is contained in:
sin365 2023-06-29 19:20:14 +08:00
parent 7eafd2e59d
commit 5c377d5d4b
13 changed files with 93 additions and 194 deletions

Binary file not shown.

View File

@ -111,7 +111,7 @@ namespace HaoYueNet.ClientNetworkNet4x
OnCloseReady(); OnCloseReady();
return; return;
} }
LogOut("发送心跳包"); //LogOut("发送心跳包");
} }
/// <summary> /// <summary>
@ -229,7 +229,7 @@ namespace HaoYueNet.ClientNetworkNet4x
//不处理心跳包 //不处理心跳包
if (data.Length == 1 && data[0] == 0x00) if (data.Length == 1 && data[0] == 0x00)
{ {
LogOut("收到心跳包"); //LogOut("收到心跳包");
return; return;
} }

View File

@ -44,17 +44,48 @@ namespace HaoYueNet.ServerNetwork
public BufferManager m_bufferManager; public BufferManager m_bufferManager;
public const int opsToAlloc = 2; public const int opsToAlloc = 2;
Socket listenSocket; //监听Socket Socket listenSocket; //监听Socket
public SocketEventPool m_pool; public SocketEventPool m_Receivepool;
public SocketEventPool m_Sendpool; public SocketEventPool m_Sendpool;
public TokenMsgPool msg_pool; public TokenMsgPool msg_pool;
//public Dictionary<AsyncUserToken, SocketAsyncEventArgs> _DictAsyncUserTokenSendSAEA = new Dictionary<AsyncUserToken, SocketAsyncEventArgs>();
public int m_clientCount; //连接的客户端数量 public int m_clientCount; //连接的客户端数量
public Semaphore m_maxNumberAcceptedClients; public Semaphore m_maxNumberAcceptedClients;//信号量
List<AsyncUserToken> m_clients; //客户端列表 List<AsyncUserToken> m_clients; //客户端列表
public Dictionary<Socket, AsyncUserToken> _DictSocketAsyncUserToken = new Dictionary<Socket, AsyncUserToken>(); public Dictionary<Socket, AsyncUserToken> _DictSocketAsyncUserToken = new Dictionary<Socket, AsyncUserToken>();
#region Token管理
void ClearUserToken()
{
lock (_DictSocketAsyncUserToken)
{
m_clients.Clear();
_DictSocketAsyncUserToken.Clear();
}
}
void AddUserToken(AsyncUserToken userToken)
{
lock (_DictSocketAsyncUserToken)
{
m_clients.Add(userToken);
_DictSocketAsyncUserToken.Add(userToken.Socket, userToken);
}
}
void RemoveUserToken(AsyncUserToken userToken)
{
lock (_DictSocketAsyncUserToken)
{
m_clients.Remove(userToken);
_DictSocketAsyncUserToken.Remove(userToken.Socket);
}
}
#endregion
#region #region
/// <summary> /// <summary>
@ -83,12 +114,9 @@ namespace HaoYueNet.ServerNetwork
/// 接收到客户端的数据事件 /// 接收到客户端的数据事件
/// </summary> /// </summary>
public event OnReceiveData ReceiveClientData; public event OnReceiveData ReceiveClientData;
#endregion #endregion
#region #region
/// <summary> /// <summary>
/// 获取客户端列表 /// 获取客户端列表
/// </summary> /// </summary>
@ -106,10 +134,10 @@ namespace HaoYueNet.ServerNetwork
m_maxConnectNum = numConnections; m_maxConnectNum = numConnections;
m_revBufferSize = receiveBufferSize; m_revBufferSize = receiveBufferSize;
// allocate buffers such that the maximum number of sockets can have one outstanding read and // allocate buffers such that the maximum number of sockets can have one outstanding read and
//write posted to the socket simultaneously //write posted to the socket simultaneously
m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize); m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);
m_pool = new SocketEventPool(numConnections); m_Receivepool = new SocketEventPool(numConnections);
m_Sendpool = new SocketEventPool(numConnections); m_Sendpool = new SocketEventPool(numConnections);
msg_pool = new TokenMsgPool(numConnections); msg_pool = new TokenMsgPool(numConnections);
@ -134,23 +162,21 @@ namespace HaoYueNet.ServerNetwork
readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg = new SocketAsyncEventArgs();
readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
readWriteEventArg.UserToken = new AsyncUserToken(); readWriteEventArg.UserToken = new AsyncUserToken();
// assign a byte buffer from the buffer pool to the SocketAsyncEventArg object // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
m_bufferManager.SetBuffer(readWriteEventArg); m_bufferManager.SetBuffer(readWriteEventArg);
// add SocketAsyncEventArg to the pool // add SocketAsyncEventArg to the pool
m_pool.Push(readWriteEventArg); m_Receivepool.Push(readWriteEventArg);
} }
//尝试
for (int i = 0; i < m_maxConnectNum; i++) for (int i = 0; i < m_maxConnectNum; i++)
{ {
readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg = new SocketAsyncEventArgs();
readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed2); readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
readWriteEventArg.UserToken = new AsyncUserToken(); readWriteEventArg.UserToken = new AsyncUserToken();
// assign a byte buffer from the buffer pool to the SocketAsyncEventArg object //发送是否需要如此设置 TODO
m_bufferManager.SetBuffer(readWriteEventArg); m_bufferManager.SetBuffer(readWriteEventArg);
// add SocketAsyncEventArg to the pool
m_Sendpool.Push(readWriteEventArg); m_Sendpool.Push(readWriteEventArg);
} }
} }
@ -166,7 +192,7 @@ namespace HaoYueNet.ServerNetwork
{ {
try try
{ {
m_clients.Clear(); ClearUserToken();
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (bReuseAddress) if (bReuseAddress)
@ -216,9 +242,7 @@ namespace HaoYueNet.ServerNetwork
listenSocket.Close(); listenSocket.Close();
int c_count = m_clients.Count; int c_count = m_clients.Count;
lock (m_clients) { m_clients.Clear(); } ClearUserToken();
//补充处理
lock (_DictSocketAsyncUserToken) { _DictSocketAsyncUserToken.Clear(); }
if (ClientNumberChange != null) if (ClientNumberChange != null)
ClientNumberChange(-c_count, null); ClientNumberChange(-c_count, null);
@ -274,7 +298,7 @@ namespace HaoYueNet.ServerNetwork
Interlocked.Increment(ref m_clientCount); Interlocked.Increment(ref m_clientCount);
// Get the socket for the accepted client connection and put it into the // Get the socket for the accepted client connection and put it into the
//ReadEventArg object user token //ReadEventArg object user token
SocketAsyncEventArgs readEventArgs = m_pool.Pop(); SocketAsyncEventArgs readEventArgs = m_Receivepool.Pop();
AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken; AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;
userToken.Socket = e.AcceptSocket; userToken.Socket = e.AcceptSocket;
userToken.ConnectTime = DateTime.Now; userToken.ConnectTime = DateTime.Now;
@ -285,13 +309,9 @@ namespace HaoYueNet.ServerNetwork
userToken.RevIndex = MaxRevIndexNum; userToken.RevIndex = MaxRevIndexNum;
userToken.SendIndex = MaxSendIndexNum; userToken.SendIndex = MaxSendIndexNum;
lock (m_clients) { m_clients.Add(userToken); } AddUserToken(userToken);
//补充处理 ClientNumberChange?.Invoke(1, userToken);
lock (_DictSocketAsyncUserToken) { _DictSocketAsyncUserToken.Add(userToken.Socket, userToken); }
if (ClientNumberChange != null)
ClientNumberChange(1, userToken);
if (!e.AcceptSocket.ReceiveAsync(readEventArgs)) if (!e.AcceptSocket.ReceiveAsync(readEventArgs))
{ {
ProcessReceive(readEventArgs); ProcessReceive(readEventArgs);
@ -324,22 +344,6 @@ namespace HaoYueNet.ServerNetwork
} }
} }
void IO_Completed2(object sender, SocketAsyncEventArgs e)
{
// determine which type of operation just completed and call the associated handler
Console.WriteLine("就他妈从来没进过");
//switch (e.LastOperation)
//{
// case SocketAsyncOperation.Receive:
// ProcessReceive(e);
// break;
// case SocketAsyncOperation.Send:
// ProcessSend2(e);
// break;
// default:
// throw new ArgumentException("The last operation completed on the socket was not a receive or send");
//}
}
// This method is invoked when an asynchronous receive operation completes. // This method is invoked when an asynchronous receive operation completes.
// If the remote host closed the connection, then the socket is closed. // If the remote host closed the connection, then the socket is closed.
@ -400,7 +404,9 @@ namespace HaoYueNet.ServerNetwork
//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
if (!token.Socket.ReceiveAsync(e)) if (!token.Socket.ReceiveAsync(e))
{
this.ProcessReceive(e); this.ProcessReceive(e);
}
} }
else else
{ {
@ -413,51 +419,18 @@ namespace HaoYueNet.ServerNetwork
} }
} }
// This method is invoked when an asynchronous send operation completes.
// The method issues another receive on the socket to read any additional
// data sent from the client
//
// <param name="e"></param>
private void ProcessSend(SocketAsyncEventArgs e) private void ProcessSend(SocketAsyncEventArgs e)
{ {
if (e.SocketError == SocketError.Success) if (e.SocketError == SocketError.Success)
{ {
// done echoing data back to the client //TODO
AsyncUserToken token = (AsyncUserToken)e.UserToken;
// read the next block of data send from the client
bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
ProcessReceive(e);
}
} }
else else
{ {
CloseClientSocket(e); CloseClientSocket(e);
return;
} }
} ReleaseSocketAsyncEventArgs(e);
private void ProcessSend2(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
// done echoing data back to the client
//AsyncUserToken token = (AsyncUserToken)e.UserToken;
// read the next block of data send from the client
//bool willRaiseEvent = token.Socket.ReceiveAsync(e);
//if (!willRaiseEvent)
//{
// ProcessReceive(e);
//}
}
else
{
CloseClientSocket(e);
}
e.SetBuffer(null, 0, 0);
m_Sendpool.Push(e);
//Console.WriteLine("发送完毕压进回对象池");
SendForMsgPool(); SendForMsgPool();
} }
@ -469,15 +442,7 @@ namespace HaoYueNet.ServerNetwork
//调用关闭连接 //调用关闭连接
OnClose(token); OnClose(token);
lock (m_clients) { m_clients.Remove(token); } RemoveUserToken(token);
//补充处理
lock (_DictSocketAsyncUserToken) { _DictSocketAsyncUserToken.Remove(token.Socket); }
////尝试1
//m_Sendpool.Push(_DictAsyncUserTokenSendSAEA[token]);
//lock (_DictAsyncUserTokenSendSAEA) { _DictAsyncUserTokenSendSAEA.Remove(token); }
////尝试1结束
//如果有事件,则调用事件,发送客户端数量变化通知 //如果有事件,则调用事件,发送客户端数量变化通知
if (ClientNumberChange != null) if (ClientNumberChange != null)
@ -493,40 +458,24 @@ namespace HaoYueNet.ServerNetwork
Interlocked.Decrement(ref m_clientCount); Interlocked.Decrement(ref m_clientCount);
m_maxNumberAcceptedClients.Release(); m_maxNumberAcceptedClients.Release();
// Free the SocketAsyncEventArg so they can be reused by another client // Free the SocketAsyncEventArg so they can be reused by another client
e.UserToken = new AsyncUserToken(); ReleaseSocketAsyncEventArgs(e);
m_pool.Push(e); }
void ReleaseSocketAsyncEventArgs(SocketAsyncEventArgs saea)
{
saea.UserToken = null;//TODO
saea.SetBuffer(null, 0, 0);
switch (saea.LastOperation)
{
case SocketAsyncOperation.Receive:
m_Receivepool.Push(saea);
break;
case SocketAsyncOperation.Send:
m_Sendpool.Push(saea);
break;
}
} }
/// <summary>
/// 对数据进行打包,然后再发送
/// </summary>
/// <param name="token"></param>
/// <param name="message"></param>
/// <returns></returns>
//public void SendMessage(AsyncUserToken token, byte[] message)
//{
// if (token == null || token.Socket == null || !token.Socket.Connected)
// return;
// try
// {
// //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)
// byte[] buff = new byte[message.Length + 4];
// byte[] len = BitConverter.GetBytes(message.Length);
// Array.Copy(len, buff, 4);
// Array.Copy(message, 0, buff, 4, message.Length);
// //token.Socket.Send(buff); //这句也可以发送, 可根据自己的需要来选择
// //新建异步发送对象, 发送消息
// SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
// sendArg.UserToken = token;
// sendArg.SetBuffer(buff, 0, buff.Length); //将数据放置进去.
// token.Socket.SendAsync(sendArg);
// }
// catch (Exception e)
// {
// //RuncomLib.Log.LogUtils.Info("SendMessage - Error:" + e.Message);
// }
//}
//bool flag_SendForMsgPool = false;
int sendrun = 0; int sendrun = 0;
public void SendForMsgPool() public void SendForMsgPool()
{ {
@ -561,72 +510,43 @@ namespace HaoYueNet.ServerNetwork
} }
public void SendMessage(AsyncUserToken token, byte[] message) public void SendMessage(AsyncUserToken token, byte[] message,bool dontNeedHead = false)
{ {
if (token == null || token.Socket == null || !token.Socket.Connected) if (token == null || token.Socket == null || !token.Socket.Connected)
return; return;
try try
{ {
message = SendDataWithHead(message); if (!dontNeedHead)
{
message = SendDataWithHead(message);
}
//尝试2 (发送的时候从队列取,动态绑定
//Console.WriteLine("队列取出 并 发送!!!!");
if (m_Sendpool.Count > 0) if (m_Sendpool.Count > 0)
{ {
SocketAsyncEventArgs myreadEventArgs = m_Sendpool.Pop(); SocketAsyncEventArgs myreadEventArgs = m_Sendpool.Pop();
myreadEventArgs.UserToken = token; myreadEventArgs.UserToken = token;
myreadEventArgs.AcceptSocket = token.Socket; myreadEventArgs.AcceptSocket = token.Socket;
myreadEventArgs.SetBuffer(message, 0, message.Length); //将数据放置进去. myreadEventArgs.SetBuffer(message, 0, message.Length); //将数据放置进去.
token.Socket.SendAsync(myreadEventArgs);
//得了,先回去吧 //若不需要等待
m_Sendpool.Push(myreadEventArgs); if (!token.Socket.SendAsync(myreadEventArgs))
{
m_Sendpool.Push(myreadEventArgs);
}
return; return;
} }
else else
{ {
//先压入队列等待m_Sendpool回收
msg_pool.Enqueue(new TokenWithMsg() { token = token, message = message }); msg_pool.Enqueue(new TokenWithMsg() { token = token, message = message });
//Console.WriteLine("压入消息发送队列MSG_Pool"); //Console.WriteLine("压入消息发送队列MSG_Pool");
return; return;
} }
//尝试结束
////尝试1
//Console.WriteLine("发送!!!!");
//SocketAsyncEventArgs myreadEventArgs;
//if (!_DictAsyncUserTokenSendSAEA.ContainsKey(token))
//{
// myreadEventArgs = m_Sendpool.Pop();
// myreadEventArgs.UserToken = token;
// _DictAsyncUserTokenSendSAEA.Add(token, myreadEventArgs);
// myreadEventArgs.AcceptSocket = token.Socket;
// //myreadEventArgs.AcceptSocket.RemoteEndPoint = token.Remote;
// token.IPAddress = ((IPEndPoint)(myreadEventArgs.AcceptSocket.RemoteEndPoint)).Address;
//}
//else
//{
// myreadEventArgs = _DictAsyncUserTokenSendSAEA[token];
//}
//myreadEventArgs.SetBuffer(message, 0, message.Length); //将数据放置进去.
//token.Socket.SendAsync(myreadEventArgs);
//return;
////尝试1结束
//新建异步发送对象, 发送消息
SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
sendArg.UserToken = token;
sendArg.SetBuffer(message, 0, message.Length); //将数据放置进去.
token.Socket.SendAsync(sendArg);
} }
catch (Exception e) catch (Exception e)
{ {
Console.WriteLine(e.ToString()); Console.WriteLine(e.ToString());
//RuncomLib.Log.LogUtils.Info("SendMessage - Error:" + e.Message);
} }
} }
//拼接长度 //拼接长度
@ -672,12 +592,7 @@ namespace HaoYueNet.ServerNetwork
{ {
OnClose(token); OnClose(token);
lock (m_clients) { m_clients.Remove(token); } RemoveUserToken(token);
//补充处理
lock (_DictSocketAsyncUserToken) { _DictSocketAsyncUserToken.Remove(token.Socket); }
//如果有事件,则调用事件,发送客户端数量变化通知 //如果有事件,则调用事件,发送客户端数量变化通知
if (ClientNumberChange != null) if (ClientNumberChange != null)
@ -692,15 +607,6 @@ namespace HaoYueNet.ServerNetwork
// decrement the counter keeping track of the total number of clients connected to the server // decrement the counter keeping track of the total number of clients connected to the server
Interlocked.Decrement(ref m_clientCount); Interlocked.Decrement(ref m_clientCount);
m_maxNumberAcceptedClients.Release(); m_maxNumberAcceptedClients.Release();
//试着加入一个释放
//token.Socket.Dispose();
// Free the SocketAsyncEventArg so they can be reused by another client
//e.UserToken = new AsyncUserToken();
//m_pool.Push(e);
//这里直接注释了进程池,需要验证是否会出问题
} }
/// <summary> /// <summary>
@ -754,14 +660,9 @@ namespace HaoYueNet.ServerNetwork
return; return;
try try
{ {
Console.WriteLine(DateTime.Now.ToString() + "发送心跳包"); //Console.WriteLine(DateTime.Now.ToString() + "发送心跳包");
token.SendIndex = MaxSendIndexNum; token.SendIndex = MaxSendIndexNum;
SendMessage(token, HeartbeatData, true);
//新建异步发送对象, 发送消息
SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
sendArg.UserToken = token;
sendArg.SetBuffer(HeartbeatData, 0, HeartbeatData.Length); //将数据放置进去.
token.Socket.SendAsync(sendArg);
} }
catch (Exception e) catch (Exception e)
{ {
@ -776,7 +677,7 @@ namespace HaoYueNet.ServerNetwork
if (data.Length == 1 && data[0] == 0x00)//心跳包 if (data.Length == 1 && data[0] == 0x00)//心跳包
{ {
Console.WriteLine("收到心跳包"); //Console.WriteLine("收到心跳包");
//无处理 //无处理
} }
else else
@ -802,7 +703,6 @@ namespace HaoYueNet.ServerNetwork
{ {
for (int i = 0; i < m_clients.Count(); i++) for (int i = 0; i < m_clients.Count(); i++)
{ {
//Console.WriteLine("RevIndex->{0} SendIndex->{1}", m_clients[i].RevIndex, m_clients[i].SendIndex);
//接收服务器数据计数 //接收服务器数据计数
m_clients[i].RevIndex--; m_clients[i].RevIndex--;
if (m_clients[i].RevIndex <= 0) if (m_clients[i].RevIndex <= 0)

View File

@ -10,15 +10,14 @@ namespace ServerCore.NetWork
public IOCPNetWork(int numConnections, int receiveBufferSize) public IOCPNetWork(int numConnections, int receiveBufferSize)
: base(numConnections, receiveBufferSize) : base(numConnections, receiveBufferSize)
{ {
m_clientCount = 0; //m_clientCount = 0;
m_maxConnectNum = numConnections; //m_maxConnectNum = numConnections;
m_revBufferSize = receiveBufferSize; //m_revBufferSize = receiveBufferSize;
// allocate buffers such that the maximum number of sockets can have one outstanding read and //// allocate buffers such that the maximum number of sockets can have one outstanding read and
//write posted to the socket simultaneously ////write posted to the socket simultaneously
m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize); //m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);
//m_pool = new SocketEventPool(numConnections);
m_pool = new SocketEventPool(numConnections); //m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
ClientNumberChange += IOCPNetWork_ClientNumberChange; ClientNumberChange += IOCPNetWork_ClientNumberChange;