| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872 |
- {******************************************************************************}
- { CnPack For Delphi/C++Builder }
- { 中国人自己的开放源码第三方开发包 }
- { (C)Copyright 2001-2018 CnPack 开发组 }
- { ------------------------------------ }
- { }
- { 本开发包是开源的自由软件,您可以遵照 CnPack 的发布协议来修 }
- { 改和重新发布这一程序。 }
- { }
- { 发布这一开发包的目的是希望它有用,但没有任何担保。甚至没有 }
- { 适合特定目的而隐含的担保。更详细的情况请参阅 CnPack 发布协议。 }
- { }
- { 您应该已经和开发包一起收到一份 CnPack 发布协议的副本。如果 }
- { 还没有,可访问我们的网站: }
- { }
- { 网站地址:http://www.cnpack.org }
- { 电子邮件:master@cnpack.org }
- { }
- {******************************************************************************}
- unit CnIocpSocketAdapter;
- {* |<PRE>
- ================================================================================
- * 软件名称:Windows完成端口封装单元
- * 单元名称:Windows完成端口封装实现单元
- * 单元作者:cnwinds
- * 菩提(cxmld@126.com)、Childe Ng、Liu Xiao 移植修改
- * 备 注:
- * 开发平台:PWin2000Pro + Delphi 7.01
- * 兼容测试:PWin9X/2000/XP + Delphi 5/6/7 + C++Builder 5/6
- * 本 地 化:该单元中的字符串均符合本地化处理方式
- * 单元标识:$Id$
- * 修改记录:2008.11.04 V1.0
- * 创建单元
- ================================================================================
- |</PRE>}
- {
- Windows完成端口封装单元。
- 如果不指定并发线程数则使用(CPU个数*2+2)作为默认并发线程个数。
- create by cnwinds, 2007-3-1
- Modify by cnwinds 2007-3-20
- + 增加面向非连接的套接口收发支持
- Modify by cnwinds 2007-4-10
- * 修正了Udp的10054错误
- Modify by cnwinds 2007-4-18
- + 为ISocketIocpEvent接口增加GUID,使得接口之间可以导航
- Modify by cnwinds 2007-4-19
- + 将接口分离成面向流和面向包的两种类型。用户使用更方便。
- 本类似乎还有些小问题,不知如何解决
- 1.在测试程序中,关闭测试程序时, MEMO中会快速闪过一些数据
- 这些好像是错误信息,不知是怎么来的.
- 2.在测试程序中,发送和接收是0..100有101次,并应有202条信息,
- 实际没有这么多, 没有找到原因
- 3.以上二个问题是原类就有的,所以移植后的组件也有此问题
- }
- interface
- {$I CnPack.inc}
- uses
- Windows, SysUtils, Classes, WinSock, CnNativeDecl, CnIocpSimpleMemPool;
- const
- CN_MAX_WSABUF_COUNT = 8;
- SCnErrorCompletePortError = 'Error in Completion IO. Errro code %d';
- SCnErrorCallbackException = 'Exception in CallBacl: %s';
- SCnErrorSendBufferOverflow = 'Send Buffer Overflow. Max %d WSABUF.';
- // SCnErrorCompletePortError = '完成端口发生错误,错误代码(%d)';
- // SCnErrorCallbackException = '回调用户事件中发生异常(%s)';
- // SCnErrorSendBufferOverflow = '发送缓冲区超过大小!目前程序定义最多只能发送 %d 个WSABUF!';
- type
- WSABUF = packed record
- len: U_LONG; { the length of the buffer }
- buf: PChar; { the pointer to the buffer }
- end {WSABUF};
- PWSABUF = ^WSABUF;
- LPWSABUF = PWSABUF;
- WSAOVERLAPPED = TOverlapped;
- TWSAOverlapped = WSAOverlapped;
- PWSAOverlapped = ^WSAOverlapped;
- LPWSAOVERLAPPED = PWSAOverlapped;
- TServiceType = LongInt;
- TFlowSpec = packed record
- TokenRate, // In Bytes/sec
- TokenBucketSize, // In Bytes
- PeakBandwidth, // In Bytes/sec
- Latency, // In microseconds
- DelayVariation: LongInt; // In microseconds
- ServiceType: TServiceType;
- MaxSduSize, MinimumPolicedSize: LongInt; // In Bytes
- end;
- PFlowSpec = ^TFLOWSPEC;
- QOS = packed record
- SendingFlowspec: TFlowSpec; { the flow spec for data sending }
- ReceivingFlowspec: TFlowSpec; { the flow spec for data receiving }
- ProviderSpecific: WSABUF; { additional provider specific stuff }
- end;
- TQualityOfService = QOS;
- PQOS = ^QOS;
- LPQOS = PQOS;
- LPCONDITIONPROC = function(lpCallerId: LPWSABUF; lpCallerData: LPWSABUF;
- lpSQOS, lpGQOS: LPQOS; lpCalleeId, lpCalleeData: LPWSABUF;
- g: DWORD; dwCallbackData: DWORD): Integer; stdcall;
- LPWSAOVERLAPPED_COMPLETION_ROUTINE = procedure(const dwError, cbTransferred:
- DWORD; const lpOverlapped: LPWSAOVERLAPPED; const dwFlags: DWORD); stdcall;
- // Peer 地址信息
- PPeerAddress = ^TPeerAddress;
- TPeerAddress = packed record
- Ip: Integer; // IP (主机字节顺序)
- Port: Integer; // 端口
- end;
- TCnIocpSocketAdapter = class;
- TSocketIocpThread = class;
- TSocketOverlappedType = (sotUnknow, sotSend, sotRecv, sotSendTo, sotRecvFrom);
- TSocketOverlapped = record
- Overlapped: TOverlapped;
- SocketOverlappedType: TSocketOverlappedType;
- Iocp: TCnIocpSocketAdapter;
- Param: Pointer;
- Buffer: array[0..CN_MAX_WSABUF_COUNT - 1] of WSABUF;
- // 用于TCP接收
- TransfferBuffer: array[0..CN_MAX_WSABUF_COUNT - 1] of WSABUF;
- BufCount: Cardinal;
- WantBytesCount: Cardinal;
- TransferredBytesCount: Cardinal;
- SocketHandle: TSocket;
- SocketType: Cardinal;
- ToAddr: TSockAddr;
- FromAddr: TSockAddr;
- FromLen: Cardinal;
- end;
- PSocketOverlapped = ^TSocketOverlapped;
- TCnIocpSendEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer) of object;
- TCnIocpRecvEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer) of object;
- TCnIocpSendToEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; ToAddr: PPeerAddress) of object;
- TCnIocpRecvFromEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; FromAddr: PPeerAddress) of object;
- { TCnIocpSocketAdapter }
- TCnIocpSocketAdapter = class(TComponent)
- private
- FIocpHandle: THandle;
- FSocketIocpThreadArray: array of TSocketIocpThread;
- FOnSendEvent : TCnIocpSendEvent;
- FOnRecvEvent : TCnIocpRecvEvent;
- FOnSendToEvent : TCnIocpSendToEvent;
- FOnRecvFromEvent: TCnIocpRecvFromEvent;
- // FMemeryPoolType: Integer;
- FMemoryPool: TCnIocpSimpleMemPool;
- function SolveConnectResetBug(SocketHandle: TSocket): DWord;
- {* 修复UDP的BUG}
- function GetThreadCount: Integer;
- {* 获取线程总数}
- procedure CreateCompletionIo(var ConcurrentThreads: Cardinal;
- var NumberOfThreads: Cardinal);
- {* 创建完成端口,创建多线程,在构造函数中调用}
- procedure DestroyCompletionIo(var IocpHandle: THandle);
- {* 释放完成端口,和多线程}
- procedure ThrowException;
- function CreateOverlapped(Buffer: PWSABUF; BufCount: Cardinal;
- Param: Pointer): PSocketOverlapped; overload;
- {* 申请TSocketOverlapped变量}
- procedure DestroyOverlapped(SocketOverlapped: PSocketOverlapped);
- {* 释放TSocketOverlapped变量}
- procedure QueuedCompletionStatus(Milliseconds: Cardinal = INFINITE);
- {* 完成端口完成之后的处理, 被线程调用}
- procedure SetMemoryPool(const Value: TCnIocpSimpleMemPool);
- protected
- procedure DoSendEvent(Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer);
- procedure DoRecvEvent(Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer);
- procedure DoSendToEvent(Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; ToAddr: PPeerAddress);
- procedure DoRecvFromEvent(Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; FromAddr: PPeerAddress);
- procedure InternalRentMemory(var MemoryPtr: Pointer);
- procedure InternalReturnMemory(MemoryPtr: Pointer);
- public
- constructor Create(AOwner: TComponent); override;
- destructor Destroy; override;
- procedure AssicoateSocket(SocketHandle: TSocket);
- procedure Recv(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
- Param: Pointer); overload;
- procedure Recv(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
- Param: Pointer); overload;
- procedure Send(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
- Param: Pointer); overload;
- procedure Send(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
- Param: Pointer); overload;
- procedure RecvFrom(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
- Param: Pointer); overload;
- procedure RecvFrom(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
- Param: Pointer); overload;
- procedure SendTo(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
- ToAddr: PPeerAddress; Param: Pointer); overload;
- procedure SendTo(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
- ToAddr: PPeerAddress; Param: Pointer); overload;
- published
- property ThreadCount: Integer read GetThreadCount;
- property MemoryPool: TCnIocpSimpleMemPool read FMemoryPool write SetMemoryPool;
- property OnSendEvent: TCnIocpSendEvent read FOnSendEvent write FOnSendEvent;
- property OnRecvEvent: TCnIocpRecvEvent read FOnRecvEvent write FOnRecvEvent;
- property OnSendToEvent: TCnIocpSendToEvent read FOnSendToEvent write FOnSendToEvent;
- property OnRecvFromEvent: TCnIocpRecvFromEvent read FOnRecvFromEvent write FOnRecvFromEvent;
- end;
- { TSocketIocpThread }
- TSocketIocpThread = class(TThread)
- protected
- FSocketIocp: TCnIocpSocketAdapter;
- FIsRunning: Boolean;
- procedure Execute; override;
- public
- constructor Create(SocketIocp: TCnIocpSocketAdapter);
- procedure Stop;
- end;
- function WSARecv(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
- lpNumberOfBytesRecvd: DWORD; var lpFlags: DWORD;
- lpOverlapped: LPWSAOVERLAPPED; lpCompletionRoutine:
- LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
- function WSASend(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
- lpNumberOfBytesSent: DWORD; dwFlags: DWORD;
- lpOverlapped: LPWSAOVERLAPPED; lpCompletionRoutine:
- LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
- function WSAIoctl(s: TSocket; dwIoControlCode: DWORD; lpvInBuffer: Pointer;
- cbInBuffer: DWORD; lpvOutBuffer: Pointer; cbOutBuffer: DWORD;
- lpcbBytesReturned: LPDWORD; lpOverlapped: LPWSAOVERLAPPED;
- lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
- function WSARecvFrom(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
- lpNumberOfBytesRecvd: DWORD; var lpFlags: DWORD;
- lpFrom: PSockAddr; lpFromlen: PInteger; lpOverlapped: LPWSAOVERLAPPED;
- lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
- function WSASendTo(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
- lpNumberOfBytesSent: DWORD; dwFlags: DWORD;
- lpTo: PSockAddr; iTolen: Integer; lpOverlapped: LPWSAOVERLAPPED;
- lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
-
- implementation
- const
- WINSOCK2_DLL = 'ws2_32.dll';
- function WSAIoctl; external WINSOCK2_DLL name 'WSAIoctl';
- function WSARecv; external WINSOCK2_DLL name 'WSARecv';
- function WSARecvFrom; external WINSOCK2_DLL name 'WSARecvFrom';
- function WSASend; external WINSOCK2_DLL name 'WSASend';
- function WSASendTo; external WINSOCK2_DLL name 'WSASendTo';
- function PeerAddress2SockAddr(PeerAddr: PPeerAddress): TSockAddr;
- begin
- Result.sin_family := AF_INET;
- Result.sin_addr.s_addr := htonl(PeerAddr.Ip);
- Result.sin_port := htons(PeerAddr.Port);
- end;
- function SockAddr2PeerAddress(SockAddr: PSockAddr): TPeerAddress;
- begin
- Result.Ip := ntohl(SockAddr.sin_addr.s_addr);
- Result.Port := ntohs(SockAddr.sin_port);
- end;
- destructor TCnIocpSocketAdapter.Destroy;
- begin
- if not (csDesigning in ComponentState) then
- DestroyCompletionIo(FIocpHandle);
- inherited;
- end;
- constructor TCnIocpSocketAdapter.Create(AOwner: TComponent);
- var k, l: Cardinal;
- begin
- inherited Create(AOwner);
- if not (csDesigning in ComponentState) then
- begin
- k := 0;
- l := 0;
- CreateCompletionIo(k, l);
- // FMemeryPoolType := CnSimpleMemoryPool.GetFreeMemoryType();
- // CnSimpleMemoryPool.RegisterMemoryType(FMemeryPoolType, nil, nil);
- // CnMemoryPool.SetParam(FMemeryPoolType, 0 * 2 + 5);
- // 这一句话为什么有错误,请调试
- end;
- end;
- procedure TCnIocpSocketAdapter.CreateCompletionIo(var ConcurrentThreads: Cardinal;
- var NumberOfThreads: Cardinal);
- var
- I: Integer;
- SystemInfo: TSystemInfo;
- begin
- if ConcurrentThreads = 0 then
- begin
- GetSystemInfo(SystemInfo);
- ConcurrentThreads := SystemInfo.dwNumberOfProcessors * 2 + 2;
- end;
- if NumberOfThreads < ConcurrentThreads then
- NumberOfThreads := ConcurrentThreads;
- FIocpHandle := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
- ConcurrentThreads);
- if FIocpHandle = 0 then ThrowException;
- SetLength(FSocketIocpThreadArray, NumberOfThreads);
- for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
- begin
- FSocketIocpThreadArray[I] := TSocketIocpThread.Create(Self);
- FSocketIocpThreadArray[I].Priority := tpHigher;
- end;
- end;
- procedure TCnIocpSocketAdapter.DestroyCompletionIo(var IocpHandle: THandle);
- var
- I: Integer;
- begin
- // 通知线程停止,并发出通知消息。
- for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
- FSocketIocpThreadArray[I].Stop;
- for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
- PostQueuedCompletionStatus(FIocpHandle, 0, 0, nil);
- // 等待线程停止并释放
- for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
- begin
- FSocketIocpThreadArray[I].WaitFor;
- FreeAndNil(FSocketIocpThreadArray[I]);
- end;
- SetLength(FSocketIocpThreadArray, 0);
- CloseHandle(IocpHandle);
- IocpHandle := INVALID_HANDLE_VALUE;
- end;
- procedure TCnIocpSocketAdapter.ThrowException;
- begin
- raise Exception.Create(Format(SCnErrorCompletePortError, [GetLastError]));
- end;
- procedure TCnIocpSocketAdapter.AssicoateSocket(SocketHandle: TSocket);
- var
- Handle: THandle;
- Val: Integer;
- Len: Integer;
- begin
- // 注意:UDP在.net 2003以前的开发库中有产生10054的bug。需要修复。
- Len := SizeOf(Val);
- if getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE, @Val, Len) = 0 then
- begin
- if Val = SOCK_DGRAM then
- begin
- if SolveConnectResetBug(SocketHandle) <> 0 then
- raise Exception.Create('Can NOT Fix Udp 10054 Error.');
- end;
- end;
- Handle := CreateIoCompletionPort(SocketHandle, FIocpHandle, 0, 0);
- if Handle = 0 then
- ThrowException;
- end;
- function TCnIocpSocketAdapter.SolveConnectResetBug(SocketHandle: TSocket): DWord;
- const
- SIO_UDP_CONNRESET = $80000000 or $18000000 or 12;
- var
- NewBehavior: Boolean;
- BytesReturned: DWord;
- Status: DWord;
- begin
- NewBehavior := False;
- BytesReturned := 0;
- Status := WSAIoctl(SocketHandle, SIO_UDP_CONNRESET,
- @NewBehavior, SizeOf(NewBehavior),
- nil, 0, @BytesReturned, nil, nil);
- Result := Status;
- end;
- function TCnIocpSocketAdapter.CreateOverlapped(Buffer: PWSABUF; BufCount: Cardinal;
- Param: Pointer): PSocketOverlapped;
- begin
- // 建立重叠IO内存块
- InternalRentMemory(Pointer(Result));
-
- Result.SocketOverlappedType := sotUnknow;
- if BufCount > SizeOf(Result.Buffer) then
- raise Exception.CreateFmt(SCnErrorSendBufferOverflow, [CN_MAX_WSABUF_COUNT]);
- Move(Buffer^, Result.Buffer, SizeOf(WSABUF) * BufCount);
- Move(Buffer^, Result.TransfferBuffer, SizeOf(WSABUF) * BufCount);
- Result.BufCount := BufCount;
- Result.Iocp := Self;
- Result.Param := Param;
- Result.WantBytesCount := 0;
- Result.TransferredBytesCount := 0;
- Result.SocketType := 0;
- end;
- procedure TCnIocpSocketAdapter.DestroyOverlapped(SocketOverlapped: PSocketOverlapped);
- begin
- // 释放重叠IO内存块
- SocketOverlapped.Iocp := nil;
- InternalReturnMemory(SocketOverlapped);
- end;
- procedure TCnIocpSocketAdapter.DoRecvEvent(Sender: TObject; Error, Transferred: Cardinal; Buffer: PWSABUF;
- BufCount: Cardinal; Param: Pointer);
- begin
- if Assigned(FOnSendEvent) then
- FOnRecvEvent(Sender, Error, Transferred, Buffer, BufCount, Param);
- end;
- procedure TCnIocpSocketAdapter.DoRecvFromEvent(Sender: TObject; Error,
- Transferred: Cardinal; Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer;
- FromAddr: PPeerAddress);
- begin
- if Assigned(FOnRecvFromEvent) then
- FOnRecvFromEvent(Sender, Error, Transferred, Buffer, BufCount, Param,FromAddr);
- end;
- procedure TCnIocpSocketAdapter.DoSendEvent(Sender: TObject; Error, Transferred: Cardinal;
- Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer);
- begin
- if Assigned(FOnSendEvent) then
- FOnSendEvent(Sender, Error, Transferred, Buffer, BufCount, Param);
- end;
- procedure TCnIocpSocketAdapter.DoSendToEvent(Sender: TObject; Error,
- Transferred: Cardinal; Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer;
- ToAddr: PPeerAddress);
- begin
- if Assigned(FOnSendToEvent) then
- FOnSendToEvent(Sender, Error, Transferred, Buffer, BufCount, Param, ToAddr);
- end;
- function TCnIocpSocketAdapter.GetThreadCount: Integer;
- begin
- Result := Length(FSocketIocpThreadArray);
- end;
- procedure TCnIocpSocketAdapter.QueuedCompletionStatus(Milliseconds: Cardinal);
- var
- NumberOfBytesTransferred: Cardinal;
- SocketOverlapped: PSocketOverlapped;
- Error: Cardinal;
- Tmp: TCnNativePointer;
- IsFreeOverlapped: Boolean;
- procedure CallbackEvent;
- var
- PeerAddr: TPeerAddress;
- begin
- try
- // 产生回调事件
- case SocketOverlapped.SocketOverlappedType of
- sotSend:
- begin
- DoSendEvent(Self, Error,
- NumberOfBytesTransferred, @SocketOverlapped.Buffer,
- SocketOverlapped.BufCount, SocketOverlapped.Param);
- end;
- sotRecv:
- begin
- DoRecvEvent(Self, Error,
- NumberOfBytesTransferred, @SocketOverlapped.Buffer,
- SocketOverlapped.BufCount, SocketOverlapped.Param);
- end;
- sotSendTo:
- begin
- PeerAddr := SockAddr2PeerAddress(@SocketOverlapped.ToAddr);
- DoSendToEvent(Self, Error,
- NumberOfBytesTransferred, @SocketOverlapped.Buffer,
- SocketOverlapped.BufCount, SocketOverlapped.Param,
- @PeerAddr);
- end;
- sotRecvFrom:
- begin
- PeerAddr := SockAddr2PeerAddress(@SocketOverlapped.FromAddr);
- DoRecvFromEvent(Self, Error,
- NumberOfBytesTransferred, @SocketOverlapped.Buffer,
- SocketOverlapped.BufCount, SocketOverlapped.Param,
- @PeerAddr);
- end;
- end;
- except
- On E: Exception do
- ;
- end;
- end;
- procedure ProcessEvent(var IsFreeOverlapped: Boolean);
- var
- I: Integer;
- WsaBuf: PWSABUF;
- Flags: Cardinal;
- Count: Cardinal;
- begin
- IsFreeOverlapped := True;
- // 发生了错误,直接调用回调事件让用户处理
- if (Error <> 0) then
- begin
- CallbackEvent;
- Exit;
- end;
- // 非TCP接收,直接调用回调事件让用户处理
- if not ((SocketOverlapped.SocketType = SOCK_STREAM) and
- (SocketOverlapped.SocketOverlappedType = sotRecv)) then
- begin
- CallbackEvent;
- Exit;
- end;
- {
- TCP在接收大块的数据时,返回的事件可能告知你接收了一部分数据,
- 你需要继续请求后续的数据。这在用户端会造成一些麻烦。
- 下面的代码让用户端只在收到了期望的所有数据后才得到事件回调。
- }
- if (NumberOfBytesTransferred + SocketOverlapped.TransferredBytesCount =
- SocketOverlapped.WantBytesCount) then
- begin
- // 已经收到了用户期望的数据个数
- NumberOfBytesTransferred := SocketOverlapped.WantBytesCount;
- CallbackEvent;
- Exit;
- end;
- // 继续接收后续的数据
- Inc(SocketOverlapped.TransferredBytesCount, NumberOfBytesTransferred);
- for I := 0 to SocketOverlapped.BufCount - 1 do
- begin
- WsaBuf := PWSABUF(@SocketOverlapped.TransfferBuffer[I]);
- if WsaBuf.len <> 0 then
- begin
- if WsaBuf.len >= Integer(NumberOfBytesTransferred) then
- begin
- Dec(WsaBuf.len, NumberOfBytesTransferred);
- Inc(WsaBuf.buf, NumberOfBytesTransferred);
- Break;
- end else
- begin
- Dec(NumberOfBytesTransferred, WsaBuf.len);
- WsaBuf.len := 0;
- end;
- end;
- end;
- Flags := 0;
- Count := SocketOverlapped.WantBytesCount -
- SocketOverlapped.TransferredBytesCount;
- if WSARecv(SocketOverlapped.SocketHandle,
- @SocketOverlapped.TransfferBuffer,
- SocketOverlapped.BufCount,
- Count, Flags,
- PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
- begin
- if GetLastError <> ERROR_IO_PENDING then
- // 发送失败,压入一个接收失败的事件(当做连接断开处理)
- PostQueuedCompletionStatus(FIocpHandle, 0, 0,
- POverlapped(SocketOverlapped));
- end;
- IsFreeOverlapped := False;
- end;
- begin
- Error := 0;
- IsFreeOverlapped := False;
- try
- if GetQueuedCompletionStatus(FIocpHandle, NumberOfBytesTransferred,
- Tmp, POverlapped(SocketOverlapped), Milliseconds) then
- begin
- if SocketOverlapped <> nil then
- begin
- if NumberOfBytesTransferred = 0 then Error := WSAECONNRESET;
- ProcessEvent(IsFreeOverlapped);
- end;
- end else
- begin
- if SocketOverlapped <> nil then
- begin
- Error := GetLastError;
- ProcessEvent(IsFreeOverlapped);
- end else
- if GetLastError <> WAIT_TIMEOUT then ThrowException;
- end;
- finally
- if (SocketOverlapped <> nil) and IsFreeOverlapped then
- DestroyOverlapped(SocketOverlapped);
- end;
- end;
- procedure TCnIocpSocketAdapter.Recv(SocketHandle: TSocket; Buffer: PWSABUF;
- BufCount: Cardinal; Param: Pointer);
- var
- SocketOverlapped: PSocketOverlapped;
- NumberOfBytesRecvd: Cardinal;
- Flags: Cardinal;
- I: Integer;
- Len: Integer;
- begin
- SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
- SocketOverlapped.SocketOverlappedType := sotRecv;
- SocketOverlapped.SocketHandle := SocketHandle;
- Len := SizeOf(SocketOverlapped.SocketHandle);
- if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
- @SocketOverlapped.SocketType, Len) then ThrowException;
- for I := 0 to BufCount - 1 do
- Inc(SocketOverlapped.WantBytesCount,
- PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
- Flags := 0;
- if WSARecv(SocketOverlapped.SocketHandle,
- @SocketOverlapped.TransfferBuffer,
- SocketOverlapped.BufCount, NumberOfBytesRecvd, Flags,
- PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
- begin
- if GetLastError <> ERROR_IO_PENDING then ThrowException;
- end;
- end;
- procedure TCnIocpSocketAdapter.Recv(SocketHandle: TSocket; Memory: PChar;
- MemLength: Cardinal; Param: Pointer);
- var
- Buffer: WSABUF;
- begin
- Buffer.len := MemLength;
- Buffer.buf := Memory;
- Recv(SocketHandle, PWSABUF(@Buffer), 1, Param);
- end;
- procedure TCnIocpSocketAdapter.Send(SocketHandle: TSocket; Buffer: PWSABUF;
- BufCount: Cardinal; Param: Pointer);
- var
- SocketOverlapped: PSocketOverlapped;
- NumberOfBytesSent: Cardinal;
- I: Integer;
- Len: Integer;
- begin
- SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
- SocketOverlapped.SocketOverlappedType := sotSend;
- SocketOverlapped.SocketHandle := SocketHandle;
- Len := SizeOf(SocketOverlapped.SocketHandle);
- if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
- @SocketOverlapped.SocketType, Len) then ThrowException;
- for I := 0 to BufCount - 1 do
- Inc(SocketOverlapped.WantBytesCount,
- PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
-
- if WSASend(SocketOverlapped.SocketHandle,
- @SocketOverlapped.TransfferBuffer,
- SocketOverlapped.BufCount, NumberOfBytesSent, 0,
- PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
- begin
- if GetLastError <> ERROR_IO_PENDING then ThrowException;
- end;
- end;
- procedure TCnIocpSocketAdapter.Send(SocketHandle: TSocket; Memory: PChar;
- MemLength: Cardinal; Param: Pointer);
- var
- Buffer: WSABUF;
- begin
- Buffer.len := MemLength;
- Buffer.buf := Memory;
- Send(SocketHandle, PWSABUF(@Buffer), 1, Param);
- end;
- procedure TCnIocpSocketAdapter.RecvFrom(SocketHandle: TSocket; Buffer: PWSABUF;
- BufCount: Cardinal; Param: Pointer);
- var
- SocketOverlapped: PSocketOverlapped;
- NumberOfBytesRecvd: Cardinal;
- Flags: Cardinal;
- I: Integer;
- Len: Integer;
- begin
- SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
- SocketOverlapped.SocketOverlappedType := sotRecvFrom;
- SocketOverlapped.SocketHandle := SocketHandle;
- Len := SizeOf(SocketOverlapped.SocketHandle);
- if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
- @SocketOverlapped.SocketType, Len) then ThrowException;
- SocketOverlapped.FromLen := SizeOf(TSockAddr);
- for I := 0 to BufCount - 1 do
- Inc(SocketOverlapped.WantBytesCount,
- PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
- Flags := 0;
- if WSARecvFrom(SocketOverlapped.SocketHandle,
- @SocketOverlapped.TransfferBuffer,
- SocketOverlapped.BufCount, NumberOfBytesRecvd, Flags,
- @SocketOverlapped.FromAddr, @SocketOverlapped.FromLen,
- PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
- begin
- if GetLastError <> ERROR_IO_PENDING then ThrowException;
- end;
- end;
- procedure TCnIocpSocketAdapter.RecvFrom(SocketHandle: TSocket; Memory: PChar;
- MemLength: Cardinal; Param: Pointer);
- var
- Buffer: WSABUF;
- begin
- Buffer.len := MemLength;
- Buffer.buf := Memory;
- RecvFrom(SocketHandle, PWSABUF(@Buffer), 1, Param);
- end;
- procedure TCnIocpSocketAdapter.SendTo(SocketHandle: TSocket; Buffer: PWSABUF;
- BufCount: Cardinal; ToAddr: PPeerAddress; Param: Pointer);
- var
- SocketOverlapped: PSocketOverlapped;
- NumberOfBytesSent: Cardinal;
- I: Integer;
- Len: Integer;
- begin
- SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
- SocketOverlapped.SocketOverlappedType := sotSendTo;
- SocketOverlapped.SocketHandle := SocketHandle;
- Len := SizeOf(SocketOverlapped.SocketHandle);
- if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
- @SocketOverlapped.SocketType, Len) then ThrowException;
- SocketOverlapped.ToAddr := PeerAddress2SockAddr(ToAddr);
- for I := 0 to BufCount - 1 do
- Inc(SocketOverlapped.WantBytesCount,
- PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
- if WSASendTo(SocketOverlapped.SocketHandle,
- @SocketOverlapped.TransfferBuffer,
- SocketOverlapped.BufCount, NumberOfBytesSent, 0,
- @SocketOverlapped.ToAddr, SizeOf(TSockAddr),
- PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
- begin
- if GetLastError <> ERROR_IO_PENDING then ThrowException;
- end;
- end;
- procedure TCnIocpSocketAdapter.SetMemoryPool(const Value: TCnIocpSimpleMemPool);
- begin
- if Value <> nil then
- begin
- //修改分配内存的大小
- Value.MemorySize := SizeOf(TSocketOverlapped);
- end;
- FMemoryPool := Value;
- end;
- procedure TCnIocpSocketAdapter.SendTo(SocketHandle: TSocket; Memory: PChar;
- MemLength: Cardinal; ToAddr: PPeerAddress; Param: Pointer);
- var
- Buffer: WSABUF;
- begin
- Buffer.len := MemLength;
- Buffer.buf := Memory;
- SendTo(SocketHandle, PWSABUF(@Buffer), 1, ToAddr, Param);
- end;
- { TSocketIocpThread }
- constructor TSocketIocpThread.Create(SocketIocp: TCnIocpSocketAdapter);
- begin
- FSocketIocp := SocketIocp;
- FIsRunning := True;
- inherited Create(False);
- end;
- procedure TSocketIocpThread.Execute;
- begin
- try
- while FIsRunning do
- begin
- FSocketIocp.QueuedCompletionStatus;
- end;
- except
- // nothing
- end;
- end;
- procedure TSocketIocpThread.Stop;
- begin
- FIsRunning := False;
- end;
- procedure TCnIocpSocketAdapter.InternalRentMemory(var MemoryPtr: Pointer);
- begin
- if FMemoryPool = nil then
- MemoryPtr := Pointer(GlobalAlloc(GPTR, SizeOf(TSocketOverlapped)))
- else
- FMemoryPool.RentMemory(MemoryPtr);
- end;
- procedure TCnIocpSocketAdapter.InternalReturnMemory(MemoryPtr: Pointer);
- begin
- if FMemoryPool = nil then
- GlobalFree(Cardinal(MemoryPtr))
- else
- FMemoryPool.ReturnMemory(MemoryPtr);
- end;
- initialization
-
- finalization
- // nothing
- end.
|