合理使用ArrayPool

This commit is contained in:
sin365 2024-06-27 15:49:56 +08:00
parent c1a3650a95
commit 3036ef0e05
10 changed files with 71 additions and 218 deletions

View File

@ -1,4 +1,5 @@
//using HunterProtobufCore;
using System.Buffers;
using System.IO;
using System.Net;
using System.Net.Sockets;
@ -364,90 +365,67 @@ namespace HaoYueNet.ClientNetwork.IOCPMode
{
try
{
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
//读取数据
//byte[] data = new byte[e.BytesTransferred];
//Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
//lock (token.Buffer)
lock (token.memoryStream)
{
//token.Buffer.AddRange(data);
token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred);
do
{
//如果包头不完整
//if (token.Buffer.Count < 4)
if (token.memoryStream.Length < 4)
break;
////判断包的长度
//byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();
//int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
//if (packageLen > token.Buffer.Count - 4)
//{ //长度不够时,退出循环,让程序继续接收
// break;
//}
if (token.memoryStream.Length < 4) break;//包头不完整,继续接收
long FristBeginPos = token.memoryStream.Position;
byte[] lenBytes = new byte[4];
//从Byte池申请
byte[] lenBytes = ArrayPool<byte>.Shared.Rent(4);
token.memoryStream.Seek(0, SeekOrigin.Begin);
token.memoryStream.Read(lenBytes, 0, 4);
int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
//归还byte[]
ArrayPool<byte>.Shared.Return(lenBytes);
if (packageLen > token.memoryStream.Length - 4)
{
token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin);
//长度不够时,退出循环,让程序继续接收
break;
break;//长度不够时,退出循环,让程序继续接收
}
////包够长时,则提取出来,交给后面的程序去处理
//byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();
//申请byte池 一定要记得回收!!
byte[] rev_fromArrayPool = ArrayPool<byte>.Shared.Rent(packageLen);
byte[] rev = new byte[packageLen];
token.memoryStream.Seek(4, SeekOrigin.Begin);
token.memoryStream.Read(rev, 0, packageLen);
//从数据池中移除这组数据
//lock (token.Buffer)
//{
// token.Buffer.RemoveRange(0, packageLen + 4);
//}
token.memoryStream.Read(rev_fromArrayPool, 0, packageLen);
token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin);
//从数据池中移除这组数据
lock (token.memoryStream)
{
//token.memoryStream.Position = 0;
//token.memoryStream.SetLength(0);
int numberOfBytesToRemove = packageLen + 4;
byte[] buf = token.memoryStream.GetBuffer();
Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)token.memoryStream.Length - numberOfBytesToRemove);
token.memoryStream.SetLength(token.memoryStream.Length - numberOfBytesToRemove);
}
DataCallBackReady(token, rev);
//用Span内存切片因为来自ArrayPool的byte长度可能大于本身申请的长度
Span<byte> rev_span = rev_fromArrayPool;
rev_span = rev_span.Slice(0, packageLen);
DataCallBackReady(token, rev_span);
//这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.
//若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.
//} while (token.Buffer.Count > 4);
//回收这里依赖DataCallBackReady中有一次数据拷贝这个后续还要进一步精进性能优化否则不能在这里回收否则影响业务层
ArrayPool<byte>.Shared.Return(rev_fromArrayPool);
} while (token.memoryStream.Length > 4);
}
//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
//如果返回为False则代表此刻已经完成不必等待完成端口回调则直接调用ProcessReceive
if (!token.Socket.ReceiveAsync(e))
{
ProcessReceive(e);
}
this.ProcessReceive(e);
}
else
{
//尝试性,清理数据
//清理数据
token.memoryStream.SetLength(0);
token.memoryStream.Seek(0, SeekOrigin.Begin);
CloseClientSocket(e);
}
}
@ -642,7 +620,7 @@ namespace HaoYueNet.ClientNetwork.IOCPMode
#endregion
#region
private void DataCallBackReady(AsyncUserToken sk, byte[] data)
private void DataCallBackReady(AsyncUserToken sk, Span<byte> data)
{
//增加接收计数
sk.RevIndex = MaxRevIndexNum;

View File

@ -257,8 +257,8 @@ namespace HaoYueNet.ClientNetwork
OnReceiveData(CmdID, Error, resultdata);
}
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个反复使用的内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];//开辟一个反复使用的byte[]
private void Recive(object o)
{
var client = o as Socket;
@ -312,12 +312,6 @@ namespace HaoYueNet.ClientNetwork
//↓↓↓↓↓↓↓↓ ↓↓↓
if (getData.Length - StartIndex < HeadLength || HeadLength == -1)
{
/*
memoryStream.Close();//关闭内存流
memoryStream.Dispose();//释放内存资源
memoryStream = new MemoryStream();//创建新的内存流
*/
//流复用的方式 不用重新new申请
reciveMemoryStream.Position = 0;
reciveMemoryStream.SetLength(0);
@ -327,16 +321,7 @@ namespace HaoYueNet.ClientNetwork
}
else
{
//把头去掉,就可以吃了,蛋白质是牛肉的六倍
//DataCallBackReady(getData.Skip(StartIndex+4).Take(HeadLength-4).ToArray());
int CoreLenght = HeadLength - 4;
//改为Array.Copy 提升效率
//byte[] retData = new byte[CoreLenght];
//Array.Copy(getData, StartIndex + 4, retData, 0, CoreLenght);
//DataCallBackReady(retData);
//用Span
Span<byte> getData_span = getData;
getData_span = getData_span.Slice(StartIndex + 4, CoreLenght);

View File

@ -182,24 +182,6 @@ namespace HaoYueNet.ClientNetwork
/// </summary>
public event delegate_str OnLogOut;
///// <summary>
///// 用于调用者回调的虚函数
///// </summary>
///// <param name="data"></param>
//public virtual void DataCallBack(int CMDID,int ERRCODE,byte[] data)
//{
//}
///// <summary>
///// 断开连接
///// </summary>
///// <param name="sk"></param>
//public virtual void OnClose()
//{
//}
/// <summary>
/// 做好处理的连接管理
/// </summary>
@ -236,16 +218,12 @@ namespace HaoYueNet.ClientNetwork
return;
}
/*
HunterNet_S2C _c2s = DeSerizlize<HunterNet_S2C>(data);
OnDataCallBack(_c2s.HunterNetCoreCmdID, _c2s.HunterNetCoreERRORCode, _c2s.HunterNetCoreData.ToArray());
*/
HunterNet_S2C.AnalysisPkgData(data, out ushort CmdID, out ushort Error, out byte[] resultdata);
OnDataCallBack(CmdID, Error, resultdata);
}
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个反复使用的内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];//开辟一个反复使用的byte[]
private void Recive(object o)
{
var client = o as Socket;
@ -299,12 +277,6 @@ namespace HaoYueNet.ClientNetwork
//↓↓↓↓↓↓↓↓ ↓↓↓
if (getData.Length - StartIndex < HeadLength || HeadLength == -1)
{
/*
memoryStream.Close();//关闭内存流
memoryStream.Dispose();//释放内存资源
memoryStream = new MemoryStream();//创建新的内存流
*/
//流复用的方式 不用重新new申请
reciveMemoryStream.Position = 0;
reciveMemoryStream.SetLength(0);
@ -314,16 +286,8 @@ namespace HaoYueNet.ClientNetwork
}
else
{
//把头去掉,就可以吃了,蛋白质是牛肉的六倍
//DataCallBackReady(getData.Skip(StartIndex+4).Take(HeadLength-4).ToArray());
int CoreLenght = HeadLength - 4;
//改为Array.Copy 提升效率
//byte[] retData = new byte[CoreLenght];
//Array.Copy(getData, StartIndex + 4, retData, 0, CoreLenght);
//DataCallBackReady(retData);
//用Span
Span<byte> getData_span = getData;
getData_span = getData_span.Slice(StartIndex + 4, CoreLenght);

View File

@ -1,4 +1,5 @@
using System.Net;
using System.Buffers;
using System.Net;
using System.Net.Sockets;
using static HaoYueNet.ClientNetwork.BaseData;
@ -190,17 +191,20 @@ namespace HaoYueNet.ClientNetwork.OtherMode
}
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];
//byte[] reciveBuffer = new byte[1024 * 1024 * 2];
private void Recive(object o)
{
var client = o as Socket;
while (true)
{
//申请byte池 一定要记得回收!!
byte[] rev_fromArrayPool = ArrayPool<byte>.Shared.Rent(1024 * 1024 * 2);
int effective = 0;
try
{
effective = client.Receive(reciveBuffer);
effective = client.Receive(rev_fromArrayPool);
if (effective == 0)//为0表示已经断开连接放到后面处理
{
//清理数据
@ -208,6 +212,8 @@ namespace HaoYueNet.ClientNetwork.OtherMode
reciveMemoryStream.Seek(0, SeekOrigin.Begin);
//远程主机强迫关闭了一个现有的连接
OnCloseReady(client);
//回收
ArrayPool<byte>.Shared.Return(rev_fromArrayPool);
return;
}
}
@ -223,7 +229,11 @@ namespace HaoYueNet.ClientNetwork.OtherMode
//断开连接
}
reciveMemoryStream.Write(reciveBuffer, 0, effective);//将接受到的数据写入内存流中
reciveMemoryStream.Write(rev_fromArrayPool, 0, effective);//将接受到的数据写入内存流中
//回收
ArrayPool<byte>.Shared.Return(rev_fromArrayPool);
DataCallBackReady(client, reciveMemoryStream.ToArray());
//流复用的方式 不用重新new申请
reciveMemoryStream.Position = 0;

View File

@ -192,8 +192,8 @@ namespace HaoYueNet.ClientNetwork.OtherMode
OnReceiveData(data);
}
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个反复使用的内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];//开辟一个反复使用的byte[]
private void Recive(object o)
{
var client = o as Socket;

View File

@ -1,4 +1,5 @@
using System.Net.Sockets;
using System.Runtime.InteropServices;
namespace HaoYueNet.ServerNetwork
{

View File

@ -1,35 +0,0 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace HaoYueNet.ServerNetwork.NetWork
{
public static class ArrayPoolManager
{
static ArrayPool<byte> instance = ArrayPool<byte>.Shared;
/// <summary>
/// 租用指定大小byte数组
/// </summary>
/// <param name="lenght"></param>
/// <returns></returns>
public static byte[] RentByteArr(int lenght)
{
return instance.Rent(lenght);
}
/// <summary>
/// 将数组归还给池
/// </summary>
/// <param name="lenght"></param>
/// <returns></returns>
public static void ReturnByteArr(byte[] byteArr)
{
instance.Return(byteArr);
}
}
}

View File

@ -373,51 +373,32 @@ namespace HaoYueNet.ServerNetwork
{
try
{
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
//读取数据
//byte[] data = new byte[e.BytesTransferred];
//Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
//lock (token.Buffer)
lock(token.memoryStream)
{
//token.Buffer.AddRange(data);
token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred);
do
{
//DataCallBackReady(token, data);
////从数据池中移除这组数据
//lock (token.Buffer)
//{
// token.Buffer.Clear();
//}
DataCallBackReady(token, token.memoryStream.ToArray());
//流复用的方式 不用重新new申请
token.memoryStream.Position = 0;
token.memoryStream.SetLength(0);
//这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.
//若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.
} while (token.memoryStream.Length > 0);
//} while (token.Buffer.Count > 4);
}
//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
if (!token.Socket.ReceiveAsync(e))
{
this.ProcessReceive(e);
}
}
else
{
//清理数据
token.memoryStream.SetLength(0);
token.memoryStream.Seek(0, SeekOrigin.Begin);
CloseClientSocket(e);
}
}

View File

@ -1,6 +1,5 @@
//using HunterProtobufCore;
using HaoYueNet.ServerNetwork.NetWork;
using System.IO;
using System.Buffers;
using System.Net;
using System.Net.Sockets;
using static HaoYueNet.ServerNetwork.BaseData;
@ -375,96 +374,67 @@ namespace HaoYueNet.ServerNetwork
{
try
{
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
//读取数据
//byte[] data = new byte[e.BytesTransferred];
//Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
//lock (token.Buffer)
lock(token.memoryStream)
{
//token.Buffer.AddRange(data);
token.memoryStream.Write(e.Buffer, e.Offset, e.BytesTransferred);
do
{
//如果包头不完整
//if (token.Buffer.Count < 4)
if (token.memoryStream.Length < 4)
break;
////判断包的长度
//byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();
//int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
//if (packageLen > token.Buffer.Count - 4)
//{ //长度不够时,退出循环,让程序继续接收
// break;
//}
if (token.memoryStream.Length < 4) break;//包头不完整,继续接收
long FristBeginPos = token.memoryStream.Position;
//byte[] lenBytes = new byte[4];
byte[] lenBytes = ArrayPoolManager.RentByteArr(4);
//从Byte池申请
byte[] lenBytes = ArrayPool<byte>.Shared.Rent(4);
token.memoryStream.Seek(0, SeekOrigin.Begin);
token.memoryStream.Read(lenBytes, 0, 4);
int packageLen = BitConverter.ToInt32(lenBytes, 0) - 4;
ArrayPoolManager.ReturnByteArr(lenBytes);
//归还byte[]
ArrayPool<byte>.Shared.Return(lenBytes);
if (packageLen > token.memoryStream.Length - 4)
{
token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin);
//长度不够时,退出循环,让程序继续接收
break;
break;//长度不够时,退出循环,让程序继续接收
}
////包够长时,则提取出来,交给后面的程序去处理
//byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();
byte[] rev = new byte[packageLen];
//申请byte池 一定要记得回收!!
byte[] rev_fromArrayPool = ArrayPool<byte>.Shared.Rent(packageLen);
token.memoryStream.Seek(4, SeekOrigin.Begin);
token.memoryStream.Read(rev, 0, packageLen);
//从数据池中移除这组数据
//lock (token.Buffer)
//{
// token.Buffer.RemoveRange(0, packageLen + 4);
//}
token.memoryStream.Read(rev_fromArrayPool, 0, packageLen);
token.memoryStream.Seek(FristBeginPos, SeekOrigin.Begin);
//从数据池中移除这组数据
lock (token.memoryStream)
{
//token.memoryStream.Position = 0;
//token.memoryStream.SetLength(0);
int numberOfBytesToRemove = packageLen + 4;
byte[] buf = token.memoryStream.GetBuffer();
Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)token.memoryStream.Length - numberOfBytesToRemove);
token.memoryStream.SetLength(token.memoryStream.Length - numberOfBytesToRemove);
}
DataCallBackReady(token, rev);
//用Span内存切片因为来自ArrayPool的byte长度可能大于本身申请的长度
Span<byte> rev_span = rev_fromArrayPool;
rev_span = rev_span.Slice(0, packageLen);
DataCallBackReady(token, rev_span);
//这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.
//若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.
//} while (token.Buffer.Count > 4);
//回收这里依赖DataCallBackReady中有一次数据拷贝这个后续还要进一步精进性能优化否则不能在这里回收否则影响业务层
ArrayPool<byte>.Shared.Return(rev_fromArrayPool);
} while (token.memoryStream.Length > 4);
}
//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明
//如果返回为False则代表此刻已经完成不必等待完成端口回调则直接调用ProcessReceive
if (!token.Socket.ReceiveAsync(e))
{
this.ProcessReceive(e);
}
}
else
{
//尝试性,清理数据
//清理数据
token.memoryStream.SetLength(0);
token.memoryStream.Seek(0, SeekOrigin.Begin);
CloseClientSocket(e);
}
}
@ -489,8 +459,6 @@ namespace HaoYueNet.ServerNetwork
}
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
// determine which type of operation just completed and call the associated handler
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
@ -661,7 +629,7 @@ namespace HaoYueNet.ServerNetwork
#endregion
#region
private void DataCallBackReady(AsyncUserToken sk, byte[] data)
private void DataCallBackReady(AsyncUserToken sk, Span<byte> data)
{
//增加接收计数
sk.RevIndex = MaxRevIndexNum;

View File

@ -260,8 +260,9 @@ namespace HaoYueNet.ClientNetwork.Standard2
OnReceiveData(CmdID, Error, resultdata);
}
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];
MemoryStream reciveMemoryStream = new MemoryStream();//开辟一个反复使用的内存流
byte[] reciveBuffer = new byte[1024 * 1024 * 2];//开辟一个反复使用的byte[]
private void Recive(object o)
{
var client = o as Socket;