BlockingTCPServer.pas 15 KB


  1. {
  2. 文件名:BlockingTCPServer.pas
  3. 功 能:阻塞方式TCP监听组件,单独线程接收数据(每个客户端一个线程)。
  4. 建 立:尹进
  5. 历 史:
  6. 2005.12.23:补文件说明信息(尹进)
  7. }
  8. unit BlockingTCPServer;
  9. interface
  10. uses
  11. WinSock2, RealICQSocket, RealICQProxy,
  12. SysUtils, Classes, Windows, Forms;
  13. type
  14. {$M+}
  15. TAcptThread = class;
  16. TRecvThread = class;
  17. TBlockingTCPServerSendedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; SendBytes: Integer) of object;
  18. TBlockingTCPServerReceivedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; RecvBytes: Integer) of object;
  19. TBlockingTCPServerConnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object;
  20. TBlockingTCPServerDisconnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object;
  21. TBlockingTCPServerBeforeSendDataEvent = procedure(Sender: TObject; var Buf; Size: Integer) of object;
  22. TBlockingTCPServer = class
  23. private
  24. FActive: Boolean;
  25. FSocket: TSocket;
  26. FListenPort: Cardinal;
  27. FRecvBufferSize: Integer;
  28. FAcptThread: TAcptThread;
  29. FRecvThreads: TList;
  30. FOnStart: TNotifyEvent;
  31. FOnStop: TNotifyEvent;
  32. FOnSendedData: TBlockingTCPServerSendedDataEvent;
  33. FOnReceivedData: TBlockingTCPServerReceivedDataEvent;
  34. FOnConnected: TBlockingTCPServerConnectedEvent;
  35. FOnDisconnected: TBlockingTCPServerDisconnectedEvent;
  36. FOnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent;
  37. procedure SetRecvBufferSize(Value:Integer);
  38. protected
  39. procedure DoStart;
  40. procedure DoStop;
  41. procedure DoConnected(RecvThread: TRecvThread);
  42. procedure DoDisconnected(RecvThread: TRecvThread);
  43. procedure DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer);
  44. procedure DoSendedData(RecvThread: TRecvThread; SendBytes: Integer);
  45. procedure DoBeforeSendData(var Buf; Size: Integer);
  46. public
  47. constructor Create;
  48. destructor Destroy; override;
  49. procedure Start(AListenPort: Cardinal);
  50. procedure Stop;
  51. published
  52. property Active: Boolean read FActive;
  53. property ListenPort: Cardinal read FListenPort;
  54. property RecvBufferSize: Integer read FRecvBufferSize write SetRecvBufferSize;
  55. property OnStart: TNotifyEvent read FOnStart write FOnStart;
  56. property OnStop: TNotifyEvent read FOnStop write FOnStart;
  57. property OnSendedData: TBlockingTCPServerSendedDataEvent read FOnSendedData write FOnSendedData;
  58. property OnReceivedData: TBlockingTCPServerReceivedDataEvent read FOnReceivedData write FOnReceivedData;
  59. property OnConnected: TBlockingTCPServerConnectedEvent read FOnConnected write FOnConnected;
  60. property OnDisconnected: TBlockingTCPServerDisconnectedEvent read FOnDisconnected write FOnDisconnected;
  61. property OnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent read FOnBeforeSendData write FOnBeforeSendData;
  62. end;
  63. //接收连接的线程类
  64. TAcptThread = class(TThread)
  65. private
  66. FBlockingTCPServer: TBlockingTCPServer;
  67. FClientSocket: TSocket;
  68. FClientAddr: TSockAddrIn;
  69. FAddrLength: Integer;
  70. procedure CreateRecvThread;
  71. protected
  72. procedure Execute; override;
  73. public
  74. constructor Create(ABlockingTCPServer: TBlockingTCPServer);
  75. destructor Destroy; override;
  76. end;
  77. //接收数据的线程类
  78. TRecvThread = class(TThread)
  79. private
  80. FEncryptCriticalSection: TRTLCriticalSection;
  81. FBlockingTCPServer: TBlockingTCPServer;
  82. FClientSocket: TSocket;
  83. FClientAddr: TSockAddrIn;
  84. FData: TObject;
  85. FCallSynchronize: Boolean;
  86. FNoDelay: Boolean;
  87. FBuf: array of Byte;
  88. FNotProcessedBufferLength: Integer;
  89. FRecvBytes: Integer;
  90. procedure DoDisconnect;
  91. procedure DoReceivedData;
  92. function GetIPAddress: String;
  93. function GetPort: Integer;
  94. procedure SetNoDelay(Value: Boolean);
  95. protected
  96. procedure Execute; override;
  97. public
  98. constructor Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn);
  99. destructor Destroy; override;
  100. procedure Disconnect;
  101. procedure SendBuffer(var Buf; Size: Integer);
  102. procedure CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  103. procedure CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  104. property Data: TObject read FData write FData;
  105. property NotProcessedBufferLength: Integer read FNotProcessedBufferLength;
  106. property IPAddress: String read GetIPAddress;
  107. property Port: Integer read GetPort;
  108. property CallSynchronize: Boolean read FCallSynchronize write FCallSynchronize;
  109. property NoDelay: Boolean read FNoDelay write SetNoDelay;
  110. end;
  111. //procedure Register;
  112. implementation
  113. {TRecvThread}
  114. //------------------------------------------------------------------------------
  115. procedure TRecvThread.SetNoDelay(Value: Boolean);
  116. begin
  117. if FClientSocket <> INVALID_SOCKET then
  118. begin
  119. FNoDelay := Value;
  120. setsockopt(FClientSocket, IPPROTO_TCP, TCP_NODELAY, @FNoDelay, SizeOf(FNoDelay));
  121. end;
  122. end;
  123. //------------------------------------------------------------------------------
  124. function TRecvThread.GetIPAddress: String;
  125. begin
  126. Result := inet_ntoa(FClientAddr.sin_addr);
  127. end;
  128. //------------------------------------------------------------------------------
  129. function TRecvThread.GetPort: Integer;
  130. begin
  131. Result := ntohs(FClientAddr.sin_port);
  132. end;
  133. //------------------------------------------------------------------------------
  134. procedure TRecvThread.Disconnect;
  135. begin
  136. DoDisconnect;
  137. end;
  138. //------------------------------------------------------------------------------
  139. procedure TRecvThread.SendBuffer(var Buf; Size: Integer);
  140. var
  141. SendBytes,
  142. ErrorCode:Integer;
  143. begin
  144. try
  145. EnterCriticalSection(FEncryptCriticalSection);
  146. FBlockingTCPServer.DoBeforeSendData(Buf, Size);
  147. SendBytes:= send(FClientSocket, Buf, Size, 0);
  148. if SendBytes = 0 then
  149. begin
  150. Disconnect;
  151. Exit;
  152. end;
  153. if SendBytes = SOCKET_ERROR then
  154. begin
  155. ErrorCode := GetLastError;
  156. if (ErrorCode = WSAECONNABORTED) or
  157. (ErrorCode = WSAECONNRESET) or
  158. (ErrorCode = WSAETIMEDOUT) or
  159. (ErrorCode = WSAENOTSOCK) then
  160. begin
  161. DoDisconnect;
  162. end;
  163. end;
  164. finally
  165. LeaveCriticalSection(FEncryptCriticalSection);
  166. end;
  167. end;
  168. //------------------------------------------------------------------------------
  169. procedure TRecvThread.DoReceivedData;
  170. begin
  171. if Assigned(FBlockingTCPServer) then
  172. FBlockingTCPServer.DoReceivedData(Self, FRecvBytes);
  173. end;
  174. //------------------------------------------------------------------------------
  175. procedure TRecvThread.DoDisconnect;
  176. begin
  177. try
  178. try
  179. shutdown(FClientSocket, SD_BOTH);
  180. finally
  181. closeSocket(FClientSocket);
  182. FClientSocket := INVALID_SOCKET;
  183. end;
  184. finally
  185. if Assigned(FBlockingTCPServer) then
  186. FBlockingTCPServer.DoDisconnected(Self);
  187. end;
  188. end;
  189. //------------------------------------------------------------------------------
  190. procedure TRecvThread.Execute;
  191. var
  192. ErrorCode: Integer;
  193. begin
  194. FreeOnTerminate := True;
  195. while not Terminated do
  196. begin
  197. FRecvBytes := recv(FClientSocket, FBuf[FNotProcessedBufferLength], Length(FBuf)-FNotProcessedBufferLength, 0);
  198. if FRecvBytes = SOCKET_ERROR then
  199. begin
  200. ErrorCode := GetLastError;
  201. if (ErrorCode = WSAECONNABORTED) or
  202. (ErrorCode = WSAECONNRESET) or
  203. (ErrorCode = WSAETIMEDOUT) or
  204. (ErrorCode = WSAENOTSOCK) then
  205. begin
  206. Synchronize(DoDisconnect);
  207. break;
  208. end
  209. else
  210. begin
  211. Sleep(1);
  212. Continue;
  213. end;
  214. end;
  215. if FRecvBytes = 0 then
  216. begin
  217. Synchronize(DoDisconnect);
  218. break;
  219. end
  220. else
  221. begin
  222. FNotProcessedBufferLength := FNotProcessedBufferLength + FRecvBytes;
  223. try
  224. if FCallSynchronize then
  225. Synchronize(DoReceivedData)
  226. else
  227. DoReceivedData;
  228. except
  229. continue;
  230. end;
  231. end;
  232. end;
  233. end;
  234. //------------------------------------------------------------------------------
  235. constructor TRecvThread.Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn);
  236. begin
  237. inherited Create(True);
  238. InitializeCriticalSection(FEncryptCriticalSection);
  239. FCallSynchronize := True;
  240. FData := nil;
  241. FBlockingTCPServer := ABlockingTCPServer;
  242. FClientSocket := AClientSocket;
  243. FClientAddr := AClientAddr;
  244. SetLength(FBuf,FBlockingTCPServer.RecvBufferSize);
  245. FNotProcessedBufferLength := 0;
  246. with FBlockingTCPServer.FRecvThreads do
  247. begin
  248. Add(Self);
  249. end;
  250. FBlockingTCPServer.DoConnected(Self);
  251. Resume;
  252. end;
  253. //------------------------------------------------------------------------------
  254. destructor TRecvThread.Destroy;
  255. begin
  256. try
  257. if Assigned(FBlockingTCPServer) then
  258. begin
  259. with FBlockingTCPServer.FRecvThreads do
  260. begin
  261. Remove(Self);
  262. end;
  263. end;
  264. finally
  265. EnterCriticalSection(FEncryptCriticalSection);
  266. DeleteCriticalSection(FEncryptCriticalSection);
  267. inherited Destroy;
  268. end;
  269. end;
  270. //------------------------------------------------------------------------------
  271. procedure TRecvThread.CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  272. begin
  273. CopyMemory(@Buf,@FBuf[Offset],Size);
  274. end;
  275. //------------------------------------------------------------------------------
  276. procedure TRecvThread.CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  277. begin
  278. CopyMemory(@Buf,@FBuf[Offset],Size);
  279. CopyMemory(FBuf,@FBuf[Offset+Size],Length(FBuf)-(Offset+Size));
  280. FNotProcessedBufferLength := FNotProcessedBufferLength - (Offset+Size);
  281. end;
  282. {TAcptThread}
  283. //------------------------------------------------------------------------------
  284. procedure TAcptThread.CreateRecvThread;
  285. begin
  286. TRecvThread.Create(FBlockingTCPServer, FClientSocket, FClientAddr);
  287. end;
  288. //------------------------------------------------------------------------------
  289. procedure TAcptThread.Execute;
  290. begin
  291. while not Terminated do
  292. begin
  293. try
  294. FAddrLength := SizeOf(FClientAddr);
  295. FClientSocket := accept(FBlockingTCPServer.FSocket, FClientAddr, FAddrLength);
  296. if FClientSocket <> INVALID_SOCKET then Synchronize(CreateRecvThread);
  297. except
  298. break;
  299. end;
  300. end;
  301. end;
  302. //------------------------------------------------------------------------------
  303. constructor TAcptThread.Create(ABlockingTCPServer: TBlockingTCPServer);
  304. begin
  305. inherited Create(True);
  306. FreeOnTerminate := True;
  307. FBlockingTCPServer := ABlockingTCPServer;
  308. Resume;
  309. end;
  310. //------------------------------------------------------------------------------
  311. destructor TAcptThread.Destroy;
  312. begin
  313. if FBlockingTCPServer <> nil then FBlockingTCPServer.FAcptThread := nil;
  314. inherited Destroy;
  315. end;
  316. {TBlockingTCPServer}
  317. //------------------------------------------------------------------------------
  318. procedure TBlockingTCPServer.Start(AListenPort: Cardinal);
  319. var
  320. serverAddr: TSockAddrIn;
  321. lastError: Integer;
  322. begin
  323. if FSocket = INVALID_SOCKET then FSocket := Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  324. if FSocket = INVALID_SOCKET then raise TSocketException.CreateFmt('创建套接字失败,错误代码:%d',[WSAGetLastError]);
  325. serverAddr.sin_family:= AF_INET;
  326. serverAddr.sin_port:= htons(AListenPort);
  327. serverAddr.sin_addr.S_addr:= htonl(INADDR_ANY);
  328. //绑定端口
  329. lastError := bind(FSocket, @serverAddr, SizeOf(serverAddr));
  330. if lastError = SOCKET_ERROR then
  331. begin
  332. closeSocket(FSocket);
  333. FSocket:= INVALID_SOCKET;
  334. raise TSocketException.CreateFmt('绑定TCP端口 %d 失败,错误代码:%d',[AListenPort, lastError]);
  335. end;
  336. //开始监听
  337. lastError := listen(FSocket, 5);
  338. if lastError <> 0 then raise TSocketException.CreateFmt('在TCP端口 %d 监听失败,错误代码:%d', [lastError]);
  339. FActive := True;
  340. FListenPort := AListenPort;
  341. if FAcptThread <> nil then FAcptThread.Terminate;
  342. FAcptThread := TAcptThread.Create(Self);
  343. DoStart;
  344. end;
  345. //------------------------------------------------------------------------------
  346. procedure TBlockingTCPServer.Stop;
  347. var
  348. ARecvThread: TRecvThread;
  349. begin
  350. if FSocket <> INVALID_SOCKET then
  351. begin
  352. try
  353. try
  354. shutdown(FSocket, SD_BOTH);
  355. finally
  356. closeSocket(FSocket);
  357. FSocket := INVALID_SOCKET;
  358. end;
  359. finally
  360. FActive := False;
  361. FListenPort := 0;
  362. DoStop;
  363. end;
  364. end;
  365. if FAcptThread <> nil then
  366. begin
  367. FAcptThread.Terminate;
  368. FAcptThread.FBlockingTCPServer := nil;
  369. end;
  370. while FRecvThreads.Count > 0 do
  371. begin
  372. ARecvThread := TRecvThread(FRecvThreads.Items[0]);
  373. FRecvThreads.Delete(0);
  374. ARecvThread.FBlockingTCPServer := nil;
  375. ARecvThread.Disconnect;
  376. ARecvThread.Terminate;
  377. end;
  378. end;
  379. //------------------------------------------------------------------------------
  380. procedure TBlockingTCPServer.DoStart;
  381. begin
  382. if Assigned(FOnStart) then FOnStart(Self);
  383. end;
  384. //------------------------------------------------------------------------------
  385. procedure TBlockingTCPServer.DoStop;
  386. begin
  387. if Assigned(FOnStop) then FOnStop(Self);
  388. end;
  389. //------------------------------------------------------------------------------
  390. procedure TBlockingTCPServer.SetRecvBufferSize(Value: Integer);
  391. begin
  392. if (Value<1) or (Value>65535) then raise TSocketException.Create('缓冲区大小必须为1-65535之间的数值');
  393. if FActive then raise TSocketException.Create('监听状态不允许更改缓冲大小');
  394. FRecvBufferSize := Value;
  395. end;
  396. //------------------------------------------------------------------------------
  397. procedure TBlockingTCPServer.DoConnected(RecvThread: TRecvThread);
  398. begin
  399. if Assigned(FOnConnected) then FOnConnected(Self, RecvThread);
  400. end;
  401. //------------------------------------------------------------------------------
  402. procedure TBlockingTCPServer.DoDisconnected(RecvThread: TRecvThread);
  403. begin
  404. if Assigned(FOnDisconnected) then FOnDisconnected(Self, RecvThread);
  405. end;
  406. //------------------------------------------------------------------------------
  407. procedure TBlockingTCPServer.DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer);
  408. begin
  409. if Assigned(FOnReceivedData) then FOnReceivedData(Self, RecvThread, RecvBytes);
  410. end;
  411. //------------------------------------------------------------------------------
  412. procedure TBlockingTCPServer.DoSendedData(RecvThread: TRecvThread; SendBytes: Integer);
  413. begin
  414. if Assigned(FOnSendedData) then FOnSendedData(Self, RecvThread, SendBytes);
  415. end;
  416. //------------------------------------------------------------------------------
  417. procedure TBlockingTCPServer.DoBeforeSendData(var Buf; Size: Integer);
  418. begin
  419. if Assigned(FOnBeforeSendData) then FOnBeforeSendData(Self, Buf, Size);
  420. end;
  421. //------------------------------------------------------------------------------
  422. constructor TBlockingTCPServer.Create;
  423. begin
  424. inherited Create;
  425. FActive := False;
  426. FSocket := INVALID_SOCKET;
  427. FListenPort := 0;
  428. FRecvBufferSize := 8192 * 2;
  429. FRecvThreads := TList.Create;
  430. FAcptThread := nil;
  431. end;
  432. //------------------------------------------------------------------------------
  433. destructor TBlockingTCPServer.Destroy;
  434. begin
  435. if Active then Stop;
  436. if FAcptThread <> nil then FAcptThread.Terminate;
  437. FRecvThreads.Clear;
  438. FreeAndNil(FRecvThreads);
  439. inherited Destroy;
  440. end;
  441. end.