CnIocpSocketAdapter.pas 28 KB


  1. {******************************************************************************}
  2. { CnPack For Delphi/C++Builder }
  3. { 中国人自己的开放源码第三方开发包 }
  4. { (C)Copyright 2001-2018 CnPack 开发组 }
  5. { ------------------------------------ }
  6. { }
  7. { 本开发包是开源的自由软件,您可以遵照 CnPack 的发布协议来修 }
  8. { 改和重新发布这一程序。 }
  9. { }
  10. { 发布这一开发包的目的是希望它有用,但没有任何担保。甚至没有 }
  11. { 适合特定目的而隐含的担保。更详细的情况请参阅 CnPack 发布协议。 }
  12. { }
  13. { 您应该已经和开发包一起收到一份 CnPack 发布协议的副本。如果 }
  14. { 还没有,可访问我们的网站: }
  15. { }
  16. { 网站地址:http://www.cnpack.org }
  17. { 电子邮件:master@cnpack.org }
  18. { }
  19. {******************************************************************************}
  20. unit CnIocpSocketAdapter;
  21. {* |<PRE>
  22. ================================================================================
  23. * 软件名称:Windows完成端口封装单元
  24. * 单元名称:Windows完成端口封装实现单元
  25. * 单元作者:cnwinds
  26. * 菩提(cxmld@126.com)、Childe Ng、Liu Xiao 移植修改
  27. * 备 注:
  28. * 开发平台:PWin2000Pro + Delphi 7.01
  29. * 兼容测试:PWin9X/2000/XP + Delphi 5/6/7 + C++Builder 5/6
  30. * 本 地 化:该单元中的字符串均符合本地化处理方式
  31. * 单元标识:$Id$
  32. * 修改记录:2008.11.04 V1.0
  33. * 创建单元
  34. ================================================================================
  35. |</PRE>}
  36. {
  37. Windows完成端口封装单元。
  38. 如果不指定并发线程数则使用(CPU个数*2+2)作为默认并发线程个数。
  39. create by cnwinds, 2007-3-1
  40. Modify by cnwinds 2007-3-20
  41. + 增加面向非连接的套接口收发支持
  42. Modify by cnwinds 2007-4-10
  43. * 修正了Udp的10054错误
  44. Modify by cnwinds 2007-4-18
  45. + 为ISocketIocpEvent接口增加GUID,使得接口之间可以导航
  46. Modify by cnwinds 2007-4-19
  47. + 将接口分离成面向流和面向包的两种类型。用户使用更方便。
  48. 本类似乎还有些小问题,不知如何解决
  49. 1.在测试程序中,关闭测试程序时, MEMO中会快速闪过一些数据
  50. 这些好像是错误信息,不知是怎么来的.
  51. 2.在测试程序中,发送和接收是0..100有101次,并应有202条信息,
  52. 实际没有这么多, 没有找到原因
  53. 3.以上二个问题是原类就有的,所以移植后的组件也有此问题
  54. }
  55. interface
  56. {$I CnPack.inc}
  57. uses
  58. Windows, SysUtils, Classes, WinSock, CnNativeDecl, CnIocpSimpleMemPool;
  59. const
  60. CN_MAX_WSABUF_COUNT = 8;
  61. SCnErrorCompletePortError = 'Error in Completion IO. Errro code %d';
  62. SCnErrorCallbackException = 'Exception in CallBacl: %s';
  63. SCnErrorSendBufferOverflow = 'Send Buffer Overflow. Max %d WSABUF.';
  64. // SCnErrorCompletePortError = '完成端口发生错误,错误代码(%d)';
  65. // SCnErrorCallbackException = '回调用户事件中发生异常(%s)';
  66. // SCnErrorSendBufferOverflow = '发送缓冲区超过大小!目前程序定义最多只能发送 %d 个WSABUF!';
  67. type
  68. WSABUF = packed record
  69. len: U_LONG; { the length of the buffer }
  70. buf: PChar; { the pointer to the buffer }
  71. end {WSABUF};
  72. PWSABUF = ^WSABUF;
  73. LPWSABUF = PWSABUF;
  74. WSAOVERLAPPED = TOverlapped;
  75. TWSAOverlapped = WSAOverlapped;
  76. PWSAOverlapped = ^WSAOverlapped;
  77. LPWSAOVERLAPPED = PWSAOverlapped;
  78. TServiceType = LongInt;
  79. TFlowSpec = packed record
  80. TokenRate, // In Bytes/sec
  81. TokenBucketSize, // In Bytes
  82. PeakBandwidth, // In Bytes/sec
  83. Latency, // In microseconds
  84. DelayVariation: LongInt; // In microseconds
  85. ServiceType: TServiceType;
  86. MaxSduSize, MinimumPolicedSize: LongInt; // In Bytes
  87. end;
  88. PFlowSpec = ^TFLOWSPEC;
  89. QOS = packed record
  90. SendingFlowspec: TFlowSpec; { the flow spec for data sending }
  91. ReceivingFlowspec: TFlowSpec; { the flow spec for data receiving }
  92. ProviderSpecific: WSABUF; { additional provider specific stuff }
  93. end;
  94. TQualityOfService = QOS;
  95. PQOS = ^QOS;
  96. LPQOS = PQOS;
  97. LPCONDITIONPROC = function(lpCallerId: LPWSABUF; lpCallerData: LPWSABUF;
  98. lpSQOS, lpGQOS: LPQOS; lpCalleeId, lpCalleeData: LPWSABUF;
  99. g: DWORD; dwCallbackData: DWORD): Integer; stdcall;
  100. LPWSAOVERLAPPED_COMPLETION_ROUTINE = procedure(const dwError, cbTransferred:
  101. DWORD; const lpOverlapped: LPWSAOVERLAPPED; const dwFlags: DWORD); stdcall;
  102. // Peer 地址信息
  103. PPeerAddress = ^TPeerAddress;
  104. TPeerAddress = packed record
  105. Ip: Integer; // IP (主机字节顺序)
  106. Port: Integer; // 端口
  107. end;
  108. TCnIocpSocketAdapter = class;
  109. TSocketIocpThread = class;
  110. TSocketOverlappedType = (sotUnknow, sotSend, sotRecv, sotSendTo, sotRecvFrom);
  111. TSocketOverlapped = record
  112. Overlapped: TOverlapped;
  113. SocketOverlappedType: TSocketOverlappedType;
  114. Iocp: TCnIocpSocketAdapter;
  115. Param: Pointer;
  116. Buffer: array[0..CN_MAX_WSABUF_COUNT - 1] of WSABUF;
  117. // 用于TCP接收
  118. TransfferBuffer: array[0..CN_MAX_WSABUF_COUNT - 1] of WSABUF;
  119. BufCount: Cardinal;
  120. WantBytesCount: Cardinal;
  121. TransferredBytesCount: Cardinal;
  122. SocketHandle: TSocket;
  123. SocketType: Cardinal;
  124. ToAddr: TSockAddr;
  125. FromAddr: TSockAddr;
  126. FromLen: Cardinal;
  127. end;
  128. PSocketOverlapped = ^TSocketOverlapped;
  129. TCnIocpSendEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
  130. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer) of object;
  131. TCnIocpRecvEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
  132. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer) of object;
  133. TCnIocpSendToEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
  134. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; ToAddr: PPeerAddress) of object;
  135. TCnIocpRecvFromEvent = procedure (Sender: TObject; Error, Transferred: Cardinal;
  136. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; FromAddr: PPeerAddress) of object;
  137. { TCnIocpSocketAdapter }
  138. TCnIocpSocketAdapter = class(TComponent)
  139. private
  140. FIocpHandle: THandle;
  141. FSocketIocpThreadArray: array of TSocketIocpThread;
  142. FOnSendEvent : TCnIocpSendEvent;
  143. FOnRecvEvent : TCnIocpRecvEvent;
  144. FOnSendToEvent : TCnIocpSendToEvent;
  145. FOnRecvFromEvent: TCnIocpRecvFromEvent;
  146. // FMemeryPoolType: Integer;
  147. FMemoryPool: TCnIocpSimpleMemPool;
  148. function SolveConnectResetBug(SocketHandle: TSocket): DWord;
  149. {* 修复UDP的BUG}
  150. function GetThreadCount: Integer;
  151. {* 获取线程总数}
  152. procedure CreateCompletionIo(var ConcurrentThreads: Cardinal;
  153. var NumberOfThreads: Cardinal);
  154. {* 创建完成端口,创建多线程,在构造函数中调用}
  155. procedure DestroyCompletionIo(var IocpHandle: THandle);
  156. {* 释放完成端口,和多线程}
  157. procedure ThrowException;
  158. function CreateOverlapped(Buffer: PWSABUF; BufCount: Cardinal;
  159. Param: Pointer): PSocketOverlapped; overload;
  160. {* 申请TSocketOverlapped变量}
  161. procedure DestroyOverlapped(SocketOverlapped: PSocketOverlapped);
  162. {* 释放TSocketOverlapped变量}
  163. procedure QueuedCompletionStatus(Milliseconds: Cardinal = INFINITE);
  164. {* 完成端口完成之后的处理, 被线程调用}
  165. procedure SetMemoryPool(const Value: TCnIocpSimpleMemPool);
  166. protected
  167. procedure DoSendEvent(Sender: TObject; Error, Transferred: Cardinal;
  168. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer);
  169. procedure DoRecvEvent(Sender: TObject; Error, Transferred: Cardinal;
  170. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer);
  171. procedure DoSendToEvent(Sender: TObject; Error, Transferred: Cardinal;
  172. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; ToAddr: PPeerAddress);
  173. procedure DoRecvFromEvent(Sender: TObject; Error, Transferred: Cardinal;
  174. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer; FromAddr: PPeerAddress);
  175. procedure InternalRentMemory(var MemoryPtr: Pointer);
  176. procedure InternalReturnMemory(MemoryPtr: Pointer);
  177. public
  178. constructor Create(AOwner: TComponent); override;
  179. destructor Destroy; override;
  180. procedure AssicoateSocket(SocketHandle: TSocket);
  181. procedure Recv(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
  182. Param: Pointer); overload;
  183. procedure Recv(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
  184. Param: Pointer); overload;
  185. procedure Send(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
  186. Param: Pointer); overload;
  187. procedure Send(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
  188. Param: Pointer); overload;
  189. procedure RecvFrom(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
  190. Param: Pointer); overload;
  191. procedure RecvFrom(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
  192. Param: Pointer); overload;
  193. procedure SendTo(SocketHandle: TSocket; Memory: PChar; MemLength: Cardinal;
  194. ToAddr: PPeerAddress; Param: Pointer); overload;
  195. procedure SendTo(SocketHandle: TSocket; Buffer: PWSABUF; BufCount: Cardinal;
  196. ToAddr: PPeerAddress; Param: Pointer); overload;
  197. published
  198. property ThreadCount: Integer read GetThreadCount;
  199. property MemoryPool: TCnIocpSimpleMemPool read FMemoryPool write SetMemoryPool;
  200. property OnSendEvent: TCnIocpSendEvent read FOnSendEvent write FOnSendEvent;
  201. property OnRecvEvent: TCnIocpRecvEvent read FOnRecvEvent write FOnRecvEvent;
  202. property OnSendToEvent: TCnIocpSendToEvent read FOnSendToEvent write FOnSendToEvent;
  203. property OnRecvFromEvent: TCnIocpRecvFromEvent read FOnRecvFromEvent write FOnRecvFromEvent;
  204. end;
  205. { TSocketIocpThread }
  206. TSocketIocpThread = class(TThread)
  207. protected
  208. FSocketIocp: TCnIocpSocketAdapter;
  209. FIsRunning: Boolean;
  210. procedure Execute; override;
  211. public
  212. constructor Create(SocketIocp: TCnIocpSocketAdapter);
  213. procedure Stop;
  214. end;
  215. function WSARecv(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
  216. lpNumberOfBytesRecvd: DWORD; var lpFlags: DWORD;
  217. lpOverlapped: LPWSAOVERLAPPED; lpCompletionRoutine:
  218. LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
  219. function WSASend(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
  220. lpNumberOfBytesSent: DWORD; dwFlags: DWORD;
  221. lpOverlapped: LPWSAOVERLAPPED; lpCompletionRoutine:
  222. LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
  223. function WSAIoctl(s: TSocket; dwIoControlCode: DWORD; lpvInBuffer: Pointer;
  224. cbInBuffer: DWORD; lpvOutBuffer: Pointer; cbOutBuffer: DWORD;
  225. lpcbBytesReturned: LPDWORD; lpOverlapped: LPWSAOVERLAPPED;
  226. lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
  227. function WSARecvFrom(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
  228. lpNumberOfBytesRecvd: DWORD; var lpFlags: DWORD;
  229. lpFrom: PSockAddr; lpFromlen: PInteger; lpOverlapped: LPWSAOVERLAPPED;
  230. lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
  231. function WSASendTo(s: TSocket; lpBuffers: LPWSABUF; dwBufferCount: DWORD; var
  232. lpNumberOfBytesSent: DWORD; dwFlags: DWORD;
  233. lpTo: PSockAddr; iTolen: Integer; lpOverlapped: LPWSAOVERLAPPED;
  234. lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE): Integer; stdcall;
  235. implementation
  236. const
  237. WINSOCK2_DLL = 'ws2_32.dll';
  238. function WSAIoctl; external WINSOCK2_DLL name 'WSAIoctl';
  239. function WSARecv; external WINSOCK2_DLL name 'WSARecv';
  240. function WSARecvFrom; external WINSOCK2_DLL name 'WSARecvFrom';
  241. function WSASend; external WINSOCK2_DLL name 'WSASend';
  242. function WSASendTo; external WINSOCK2_DLL name 'WSASendTo';
  243. function PeerAddress2SockAddr(PeerAddr: PPeerAddress): TSockAddr;
  244. begin
  245. Result.sin_family := AF_INET;
  246. Result.sin_addr.s_addr := htonl(PeerAddr.Ip);
  247. Result.sin_port := htons(PeerAddr.Port);
  248. end;
  249. function SockAddr2PeerAddress(SockAddr: PSockAddr): TPeerAddress;
  250. begin
  251. Result.Ip := ntohl(SockAddr.sin_addr.s_addr);
  252. Result.Port := ntohs(SockAddr.sin_port);
  253. end;
  254. destructor TCnIocpSocketAdapter.Destroy;
  255. begin
  256. if not (csDesigning in ComponentState) then
  257. DestroyCompletionIo(FIocpHandle);
  258. inherited;
  259. end;
  260. constructor TCnIocpSocketAdapter.Create(AOwner: TComponent);
  261. var k, l: Cardinal;
  262. begin
  263. inherited Create(AOwner);
  264. if not (csDesigning in ComponentState) then
  265. begin
  266. k := 0;
  267. l := 0;
  268. CreateCompletionIo(k, l);
  269. // FMemeryPoolType := CnSimpleMemoryPool.GetFreeMemoryType();
  270. // CnSimpleMemoryPool.RegisterMemoryType(FMemeryPoolType, nil, nil);
  271. // CnMemoryPool.SetParam(FMemeryPoolType, 0 * 2 + 5);
  272. // 这一句话为什么有错误,请调试
  273. end;
  274. end;
  275. procedure TCnIocpSocketAdapter.CreateCompletionIo(var ConcurrentThreads: Cardinal;
  276. var NumberOfThreads: Cardinal);
  277. var
  278. I: Integer;
  279. SystemInfo: TSystemInfo;
  280. begin
  281. if ConcurrentThreads = 0 then
  282. begin
  283. GetSystemInfo(SystemInfo);
  284. ConcurrentThreads := SystemInfo.dwNumberOfProcessors * 2 + 2;
  285. end;
  286. if NumberOfThreads < ConcurrentThreads then
  287. NumberOfThreads := ConcurrentThreads;
  288. FIocpHandle := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
  289. ConcurrentThreads);
  290. if FIocpHandle = 0 then ThrowException;
  291. SetLength(FSocketIocpThreadArray, NumberOfThreads);
  292. for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
  293. begin
  294. FSocketIocpThreadArray[I] := TSocketIocpThread.Create(Self);
  295. FSocketIocpThreadArray[I].Priority := tpHigher;
  296. end;
  297. end;
  298. procedure TCnIocpSocketAdapter.DestroyCompletionIo(var IocpHandle: THandle);
  299. var
  300. I: Integer;
  301. begin
  302. // 通知线程停止,并发出通知消息。
  303. for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
  304. FSocketIocpThreadArray[I].Stop;
  305. for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
  306. PostQueuedCompletionStatus(FIocpHandle, 0, 0, nil);
  307. // 等待线程停止并释放
  308. for I := Low(FSocketIocpThreadArray) to High(FSocketIocpThreadArray) do
  309. begin
  310. FSocketIocpThreadArray[I].WaitFor;
  311. FreeAndNil(FSocketIocpThreadArray[I]);
  312. end;
  313. SetLength(FSocketIocpThreadArray, 0);
  314. CloseHandle(IocpHandle);
  315. IocpHandle := INVALID_HANDLE_VALUE;
  316. end;
  317. procedure TCnIocpSocketAdapter.ThrowException;
  318. begin
  319. raise Exception.Create(Format(SCnErrorCompletePortError, [GetLastError]));
  320. end;
  321. procedure TCnIocpSocketAdapter.AssicoateSocket(SocketHandle: TSocket);
  322. var
  323. Handle: THandle;
  324. Val: Integer;
  325. Len: Integer;
  326. begin
  327. // 注意:UDP在.net 2003以前的开发库中有产生10054的bug。需要修复。
  328. Len := SizeOf(Val);
  329. if getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE, @Val, Len) = 0 then
  330. begin
  331. if Val = SOCK_DGRAM then
  332. begin
  333. if SolveConnectResetBug(SocketHandle) <> 0 then
  334. raise Exception.Create('Can NOT Fix Udp 10054 Error.');
  335. end;
  336. end;
  337. Handle := CreateIoCompletionPort(SocketHandle, FIocpHandle, 0, 0);
  338. if Handle = 0 then
  339. ThrowException;
  340. end;
  341. function TCnIocpSocketAdapter.SolveConnectResetBug(SocketHandle: TSocket): DWord;
  342. const
  343. SIO_UDP_CONNRESET = $80000000 or $18000000 or 12;
  344. var
  345. NewBehavior: Boolean;
  346. BytesReturned: DWord;
  347. Status: DWord;
  348. begin
  349. NewBehavior := False;
  350. BytesReturned := 0;
  351. Status := WSAIoctl(SocketHandle, SIO_UDP_CONNRESET,
  352. @NewBehavior, SizeOf(NewBehavior),
  353. nil, 0, @BytesReturned, nil, nil);
  354. Result := Status;
  355. end;
  356. function TCnIocpSocketAdapter.CreateOverlapped(Buffer: PWSABUF; BufCount: Cardinal;
  357. Param: Pointer): PSocketOverlapped;
  358. begin
  359. // 建立重叠IO内存块
  360. InternalRentMemory(Pointer(Result));
  361. Result.SocketOverlappedType := sotUnknow;
  362. if BufCount > SizeOf(Result.Buffer) then
  363. raise Exception.CreateFmt(SCnErrorSendBufferOverflow, [CN_MAX_WSABUF_COUNT]);
  364. Move(Buffer^, Result.Buffer, SizeOf(WSABUF) * BufCount);
  365. Move(Buffer^, Result.TransfferBuffer, SizeOf(WSABUF) * BufCount);
  366. Result.BufCount := BufCount;
  367. Result.Iocp := Self;
  368. Result.Param := Param;
  369. Result.WantBytesCount := 0;
  370. Result.TransferredBytesCount := 0;
  371. Result.SocketType := 0;
  372. end;
  373. procedure TCnIocpSocketAdapter.DestroyOverlapped(SocketOverlapped: PSocketOverlapped);
  374. begin
  375. // 释放重叠IO内存块
  376. SocketOverlapped.Iocp := nil;
  377. InternalReturnMemory(SocketOverlapped);
  378. end;
  379. procedure TCnIocpSocketAdapter.DoRecvEvent(Sender: TObject; Error, Transferred: Cardinal; Buffer: PWSABUF;
  380. BufCount: Cardinal; Param: Pointer);
  381. begin
  382. if Assigned(FOnSendEvent) then
  383. FOnRecvEvent(Sender, Error, Transferred, Buffer, BufCount, Param);
  384. end;
  385. procedure TCnIocpSocketAdapter.DoRecvFromEvent(Sender: TObject; Error,
  386. Transferred: Cardinal; Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer;
  387. FromAddr: PPeerAddress);
  388. begin
  389. if Assigned(FOnRecvFromEvent) then
  390. FOnRecvFromEvent(Sender, Error, Transferred, Buffer, BufCount, Param,FromAddr);
  391. end;
  392. procedure TCnIocpSocketAdapter.DoSendEvent(Sender: TObject; Error, Transferred: Cardinal;
  393. Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer);
  394. begin
  395. if Assigned(FOnSendEvent) then
  396. FOnSendEvent(Sender, Error, Transferred, Buffer, BufCount, Param);
  397. end;
  398. procedure TCnIocpSocketAdapter.DoSendToEvent(Sender: TObject; Error,
  399. Transferred: Cardinal; Buffer: PWSABUF; BufCount: Cardinal; Param: Pointer;
  400. ToAddr: PPeerAddress);
  401. begin
  402. if Assigned(FOnSendToEvent) then
  403. FOnSendToEvent(Sender, Error, Transferred, Buffer, BufCount, Param, ToAddr);
  404. end;
  405. function TCnIocpSocketAdapter.GetThreadCount: Integer;
  406. begin
  407. Result := Length(FSocketIocpThreadArray);
  408. end;
  409. procedure TCnIocpSocketAdapter.QueuedCompletionStatus(Milliseconds: Cardinal);
  410. var
  411. NumberOfBytesTransferred: Cardinal;
  412. SocketOverlapped: PSocketOverlapped;
  413. Error: Cardinal;
  414. Tmp: TCnNativePointer;
  415. IsFreeOverlapped: Boolean;
  416. procedure CallbackEvent;
  417. var
  418. PeerAddr: TPeerAddress;
  419. begin
  420. try
  421. // 产生回调事件
  422. case SocketOverlapped.SocketOverlappedType of
  423. sotSend:
  424. begin
  425. DoSendEvent(Self, Error,
  426. NumberOfBytesTransferred, @SocketOverlapped.Buffer,
  427. SocketOverlapped.BufCount, SocketOverlapped.Param);
  428. end;
  429. sotRecv:
  430. begin
  431. DoRecvEvent(Self, Error,
  432. NumberOfBytesTransferred, @SocketOverlapped.Buffer,
  433. SocketOverlapped.BufCount, SocketOverlapped.Param);
  434. end;
  435. sotSendTo:
  436. begin
  437. PeerAddr := SockAddr2PeerAddress(@SocketOverlapped.ToAddr);
  438. DoSendToEvent(Self, Error,
  439. NumberOfBytesTransferred, @SocketOverlapped.Buffer,
  440. SocketOverlapped.BufCount, SocketOverlapped.Param,
  441. @PeerAddr);
  442. end;
  443. sotRecvFrom:
  444. begin
  445. PeerAddr := SockAddr2PeerAddress(@SocketOverlapped.FromAddr);
  446. DoRecvFromEvent(Self, Error,
  447. NumberOfBytesTransferred, @SocketOverlapped.Buffer,
  448. SocketOverlapped.BufCount, SocketOverlapped.Param,
  449. @PeerAddr);
  450. end;
  451. end;
  452. except
  453. On E: Exception do
  454. ;
  455. end;
  456. end;
  457. procedure ProcessEvent(var IsFreeOverlapped: Boolean);
  458. var
  459. I: Integer;
  460. WsaBuf: PWSABUF;
  461. Flags: Cardinal;
  462. Count: Cardinal;
  463. begin
  464. IsFreeOverlapped := True;
  465. // 发生了错误,直接调用回调事件让用户处理
  466. if (Error <> 0) then
  467. begin
  468. CallbackEvent;
  469. Exit;
  470. end;
  471. // 非TCP接收,直接调用回调事件让用户处理
  472. if not ((SocketOverlapped.SocketType = SOCK_STREAM) and
  473. (SocketOverlapped.SocketOverlappedType = sotRecv)) then
  474. begin
  475. CallbackEvent;
  476. Exit;
  477. end;
  478. {
  479. TCP在接收大块的数据时,返回的事件可能告知你接收了一部分数据,
  480. 你需要继续请求后续的数据。这在用户端会造成一些麻烦。
  481. 下面的代码让用户端只在收到了期望的所有数据后才得到事件回调。
  482. }
  483. if (NumberOfBytesTransferred + SocketOverlapped.TransferredBytesCount =
  484. SocketOverlapped.WantBytesCount) then
  485. begin
  486. // 已经收到了用户期望的数据个数
  487. NumberOfBytesTransferred := SocketOverlapped.WantBytesCount;
  488. CallbackEvent;
  489. Exit;
  490. end;
  491. // 继续接收后续的数据
  492. Inc(SocketOverlapped.TransferredBytesCount, NumberOfBytesTransferred);
  493. for I := 0 to SocketOverlapped.BufCount - 1 do
  494. begin
  495. WsaBuf := PWSABUF(@SocketOverlapped.TransfferBuffer[I]);
  496. if WsaBuf.len <> 0 then
  497. begin
  498. if WsaBuf.len >= Integer(NumberOfBytesTransferred) then
  499. begin
  500. Dec(WsaBuf.len, NumberOfBytesTransferred);
  501. Inc(WsaBuf.buf, NumberOfBytesTransferred);
  502. Break;
  503. end else
  504. begin
  505. Dec(NumberOfBytesTransferred, WsaBuf.len);
  506. WsaBuf.len := 0;
  507. end;
  508. end;
  509. end;
  510. Flags := 0;
  511. Count := SocketOverlapped.WantBytesCount -
  512. SocketOverlapped.TransferredBytesCount;
  513. if WSARecv(SocketOverlapped.SocketHandle,
  514. @SocketOverlapped.TransfferBuffer,
  515. SocketOverlapped.BufCount,
  516. Count, Flags,
  517. PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
  518. begin
  519. if GetLastError <> ERROR_IO_PENDING then
  520. // 发送失败,压入一个接收失败的事件(当做连接断开处理)
  521. PostQueuedCompletionStatus(FIocpHandle, 0, 0,
  522. POverlapped(SocketOverlapped));
  523. end;
  524. IsFreeOverlapped := False;
  525. end;
  526. begin
  527. Error := 0;
  528. IsFreeOverlapped := False;
  529. try
  530. if GetQueuedCompletionStatus(FIocpHandle, NumberOfBytesTransferred,
  531. Tmp, POverlapped(SocketOverlapped), Milliseconds) then
  532. begin
  533. if SocketOverlapped <> nil then
  534. begin
  535. if NumberOfBytesTransferred = 0 then Error := WSAECONNRESET;
  536. ProcessEvent(IsFreeOverlapped);
  537. end;
  538. end else
  539. begin
  540. if SocketOverlapped <> nil then
  541. begin
  542. Error := GetLastError;
  543. ProcessEvent(IsFreeOverlapped);
  544. end else
  545. if GetLastError <> WAIT_TIMEOUT then ThrowException;
  546. end;
  547. finally
  548. if (SocketOverlapped <> nil) and IsFreeOverlapped then
  549. DestroyOverlapped(SocketOverlapped);
  550. end;
  551. end;
  552. procedure TCnIocpSocketAdapter.Recv(SocketHandle: TSocket; Buffer: PWSABUF;
  553. BufCount: Cardinal; Param: Pointer);
  554. var
  555. SocketOverlapped: PSocketOverlapped;
  556. NumberOfBytesRecvd: Cardinal;
  557. Flags: Cardinal;
  558. I: Integer;
  559. Len: Integer;
  560. begin
  561. SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
  562. SocketOverlapped.SocketOverlappedType := sotRecv;
  563. SocketOverlapped.SocketHandle := SocketHandle;
  564. Len := SizeOf(SocketOverlapped.SocketHandle);
  565. if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
  566. @SocketOverlapped.SocketType, Len) then ThrowException;
  567. for I := 0 to BufCount - 1 do
  568. Inc(SocketOverlapped.WantBytesCount,
  569. PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
  570. Flags := 0;
  571. if WSARecv(SocketOverlapped.SocketHandle,
  572. @SocketOverlapped.TransfferBuffer,
  573. SocketOverlapped.BufCount, NumberOfBytesRecvd, Flags,
  574. PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
  575. begin
  576. if GetLastError <> ERROR_IO_PENDING then ThrowException;
  577. end;
  578. end;
  579. procedure TCnIocpSocketAdapter.Recv(SocketHandle: TSocket; Memory: PChar;
  580. MemLength: Cardinal; Param: Pointer);
  581. var
  582. Buffer: WSABUF;
  583. begin
  584. Buffer.len := MemLength;
  585. Buffer.buf := Memory;
  586. Recv(SocketHandle, PWSABUF(@Buffer), 1, Param);
  587. end;
  588. procedure TCnIocpSocketAdapter.Send(SocketHandle: TSocket; Buffer: PWSABUF;
  589. BufCount: Cardinal; Param: Pointer);
  590. var
  591. SocketOverlapped: PSocketOverlapped;
  592. NumberOfBytesSent: Cardinal;
  593. I: Integer;
  594. Len: Integer;
  595. begin
  596. SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
  597. SocketOverlapped.SocketOverlappedType := sotSend;
  598. SocketOverlapped.SocketHandle := SocketHandle;
  599. Len := SizeOf(SocketOverlapped.SocketHandle);
  600. if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
  601. @SocketOverlapped.SocketType, Len) then ThrowException;
  602. for I := 0 to BufCount - 1 do
  603. Inc(SocketOverlapped.WantBytesCount,
  604. PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
  605. if WSASend(SocketOverlapped.SocketHandle,
  606. @SocketOverlapped.TransfferBuffer,
  607. SocketOverlapped.BufCount, NumberOfBytesSent, 0,
  608. PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
  609. begin
  610. if GetLastError <> ERROR_IO_PENDING then ThrowException;
  611. end;
  612. end;
  613. procedure TCnIocpSocketAdapter.Send(SocketHandle: TSocket; Memory: PChar;
  614. MemLength: Cardinal; Param: Pointer);
  615. var
  616. Buffer: WSABUF;
  617. begin
  618. Buffer.len := MemLength;
  619. Buffer.buf := Memory;
  620. Send(SocketHandle, PWSABUF(@Buffer), 1, Param);
  621. end;
  622. procedure TCnIocpSocketAdapter.RecvFrom(SocketHandle: TSocket; Buffer: PWSABUF;
  623. BufCount: Cardinal; Param: Pointer);
  624. var
  625. SocketOverlapped: PSocketOverlapped;
  626. NumberOfBytesRecvd: Cardinal;
  627. Flags: Cardinal;
  628. I: Integer;
  629. Len: Integer;
  630. begin
  631. SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
  632. SocketOverlapped.SocketOverlappedType := sotRecvFrom;
  633. SocketOverlapped.SocketHandle := SocketHandle;
  634. Len := SizeOf(SocketOverlapped.SocketHandle);
  635. if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
  636. @SocketOverlapped.SocketType, Len) then ThrowException;
  637. SocketOverlapped.FromLen := SizeOf(TSockAddr);
  638. for I := 0 to BufCount - 1 do
  639. Inc(SocketOverlapped.WantBytesCount,
  640. PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
  641. Flags := 0;
  642. if WSARecvFrom(SocketOverlapped.SocketHandle,
  643. @SocketOverlapped.TransfferBuffer,
  644. SocketOverlapped.BufCount, NumberOfBytesRecvd, Flags,
  645. @SocketOverlapped.FromAddr, @SocketOverlapped.FromLen,
  646. PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
  647. begin
  648. if GetLastError <> ERROR_IO_PENDING then ThrowException;
  649. end;
  650. end;
  651. procedure TCnIocpSocketAdapter.RecvFrom(SocketHandle: TSocket; Memory: PChar;
  652. MemLength: Cardinal; Param: Pointer);
  653. var
  654. Buffer: WSABUF;
  655. begin
  656. Buffer.len := MemLength;
  657. Buffer.buf := Memory;
  658. RecvFrom(SocketHandle, PWSABUF(@Buffer), 1, Param);
  659. end;
  660. procedure TCnIocpSocketAdapter.SendTo(SocketHandle: TSocket; Buffer: PWSABUF;
  661. BufCount: Cardinal; ToAddr: PPeerAddress; Param: Pointer);
  662. var
  663. SocketOverlapped: PSocketOverlapped;
  664. NumberOfBytesSent: Cardinal;
  665. I: Integer;
  666. Len: Integer;
  667. begin
  668. SocketOverlapped := CreateOverlapped(Buffer, BufCount, Param);
  669. SocketOverlapped.SocketOverlappedType := sotSendTo;
  670. SocketOverlapped.SocketHandle := SocketHandle;
  671. Len := SizeOf(SocketOverlapped.SocketHandle);
  672. if 0 <> getsockopt(SocketHandle, SOL_SOCKET, SO_TYPE,
  673. @SocketOverlapped.SocketType, Len) then ThrowException;
  674. SocketOverlapped.ToAddr := PeerAddress2SockAddr(ToAddr);
  675. for I := 0 to BufCount - 1 do
  676. Inc(SocketOverlapped.WantBytesCount,
  677. PWSABUF(Integer(Buffer) + I * SizeOf(WSABUF)).len);
  678. if WSASendTo(SocketOverlapped.SocketHandle,
  679. @SocketOverlapped.TransfferBuffer,
  680. SocketOverlapped.BufCount, NumberOfBytesSent, 0,
  681. @SocketOverlapped.ToAddr, SizeOf(TSockAddr),
  682. PWSAOverlapped(SocketOverlapped), nil) = SOCKET_ERROR then
  683. begin
  684. if GetLastError <> ERROR_IO_PENDING then ThrowException;
  685. end;
  686. end;
  687. procedure TCnIocpSocketAdapter.SetMemoryPool(const Value: TCnIocpSimpleMemPool);
  688. begin
  689. if Value <> nil then
  690. begin
  691. //修改分配内存的大小
  692. Value.MemorySize := SizeOf(TSocketOverlapped);
  693. end;
  694. FMemoryPool := Value;
  695. end;
  696. procedure TCnIocpSocketAdapter.SendTo(SocketHandle: TSocket; Memory: PChar;
  697. MemLength: Cardinal; ToAddr: PPeerAddress; Param: Pointer);
  698. var
  699. Buffer: WSABUF;
  700. begin
  701. Buffer.len := MemLength;
  702. Buffer.buf := Memory;
  703. SendTo(SocketHandle, PWSABUF(@Buffer), 1, ToAddr, Param);
  704. end;
  705. { TSocketIocpThread }
  706. constructor TSocketIocpThread.Create(SocketIocp: TCnIocpSocketAdapter);
  707. begin
  708. FSocketIocp := SocketIocp;
  709. FIsRunning := True;
  710. inherited Create(False);
  711. end;
  712. procedure TSocketIocpThread.Execute;
  713. begin
  714. try
  715. while FIsRunning do
  716. begin
  717. FSocketIocp.QueuedCompletionStatus;
  718. end;
  719. except
  720. // nothing
  721. end;
  722. end;
  723. procedure TSocketIocpThread.Stop;
  724. begin
  725. FIsRunning := False;
  726. end;
  727. procedure TCnIocpSocketAdapter.InternalRentMemory(var MemoryPtr: Pointer);
  728. begin
  729. if FMemoryPool = nil then
  730. MemoryPtr := Pointer(GlobalAlloc(GPTR, SizeOf(TSocketOverlapped)))
  731. else
  732. FMemoryPool.RentMemory(MemoryPtr);
  733. end;
  734. procedure TCnIocpSocketAdapter.InternalReturnMemory(MemoryPtr: Pointer);
  735. begin
  736. if FMemoryPool = nil then
  737. GlobalFree(Cardinal(MemoryPtr))
  738. else
  739. FMemoryPool.ReturnMemory(MemoryPtr);
  740. end;
  741. initialization
  742. finalization
  743. // nothing
  744. end.