BlockingTCPServer.pas 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. {
  2. 文件名:BlockingTCPServer.pas
  3. 功 能:阻塞方式TCP监听组件,单独线程接收数据(每个客户端一个线程)。
  4. 建 立:尹进
  5. 历 史:
  6. 2005.12.23:补文件说明信息(尹进)
  7. }
  8. unit BlockingTCPServer;
  9. interface
  10. uses
  11. WinSock2, RealICQSocket, RealICQProxy,
  12. SysUtils, Classes, Windows, Forms;
  13. type
  14. TAcptThread = class;
  15. TRecvThread = class;
  16. TBlockingTCPServerSendedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; SendBytes: Integer) of object;
  17. TBlockingTCPServerReceivedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; RecvBytes: Integer) of object;
  18. TBlockingTCPServerConnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object;
  19. TBlockingTCPServerDisconnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object;
  20. TBlockingTCPServerBeforeSendDataEvent = procedure(Sender: TObject; var Buf; Size: Integer) of object;
  21. TBlockingTCPServer = class
  22. private
  23. FActive: Boolean;
  24. FSocket: TSocket;
  25. FListenPort: Cardinal;
  26. FRecvBufferSize: Integer;
  27. FAcptThread: TAcptThread;
  28. FRecvThreads: TList;
  29. FOnStart: TNotifyEvent;
  30. FOnStop: TNotifyEvent;
  31. FOnSendedData: TBlockingTCPServerSendedDataEvent;
  32. FOnReceivedData: TBlockingTCPServerReceivedDataEvent;
  33. FOnConnected: TBlockingTCPServerConnectedEvent;
  34. FOnDisconnected: TBlockingTCPServerDisconnectedEvent;
  35. FOnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent;
  36. procedure SetRecvBufferSize(Value:Integer);
  37. protected
  38. procedure DoStart;
  39. procedure DoStop;
  40. procedure DoConnected(RecvThread: TRecvThread);
  41. procedure DoDisconnected(RecvThread: TRecvThread);
  42. procedure DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer);
  43. procedure DoSendedData(RecvThread: TRecvThread; SendBytes: Integer);
  44. procedure DoBeforeSendData(var Buf; Size: Integer);
  45. public
  46. constructor Create;
  47. destructor Destroy; override;
  48. procedure Start(AListenPort: Cardinal);
  49. procedure Stop;
  50. published
  51. property Active: Boolean read FActive;
  52. property ListenPort: Cardinal read FListenPort;
  53. property RecvBufferSize: Integer read FRecvBufferSize write SetRecvBufferSize;
  54. property OnStart: TNotifyEvent read FOnStart write FOnStart;
  55. property OnStop: TNotifyEvent read FOnStop write FOnStart;
  56. property OnSendedData: TBlockingTCPServerSendedDataEvent read FOnSendedData write FOnSendedData;
  57. property OnReceivedData: TBlockingTCPServerReceivedDataEvent read FOnReceivedData write FOnReceivedData;
  58. property OnConnected: TBlockingTCPServerConnectedEvent read FOnConnected write FOnConnected;
  59. property OnDisconnected: TBlockingTCPServerDisconnectedEvent read FOnDisconnected write FOnDisconnected;
  60. property OnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent read FOnBeforeSendData write FOnBeforeSendData;
  61. end;
  62. //接收连接的线程类
  63. TAcptThread = class(TThread)
  64. private
  65. FBlockingTCPServer: TBlockingTCPServer;
  66. FClientSocket: TSocket;
  67. FClientAddr: TSockAddrIn;
  68. FAddrLength: Integer;
  69. procedure CreateRecvThread;
  70. protected
  71. procedure Execute; override;
  72. public
  73. constructor Create(ABlockingTCPServer: TBlockingTCPServer);
  74. destructor Destroy; override;
  75. end;
  76. //接收数据的线程类
  77. TRecvThread = class(TThread)
  78. private
  79. FEncryptCriticalSection: TRTLCriticalSection;
  80. FBlockingTCPServer: TBlockingTCPServer;
  81. FClientSocket: TSocket;
  82. FClientAddr: TSockAddrIn;
  83. FData: TObject;
  84. FCallSynchronize: Boolean;
  85. FNoDelay: Boolean;
  86. FBuf: array of Byte;
  87. FNotProcessedBufferLength: Integer;
  88. FRecvBytes: Integer;
  89. procedure DoDisconnect;
  90. procedure DoReceivedData;
  91. function GetIPAddress: String;
  92. function GetPort: Integer;
  93. procedure SetNoDelay(Value: Boolean);
  94. protected
  95. procedure Execute; override;
  96. public
  97. constructor Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn);
  98. destructor Destroy; override;
  99. procedure Disconnect;
  100. procedure SendBuffer(var Buf; Size: Integer);
  101. procedure CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  102. procedure CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  103. property Data: TObject read FData write FData;
  104. property NotProcessedBufferLength: Integer read FNotProcessedBufferLength;
  105. property IPAddress: String read GetIPAddress;
  106. property Port: Integer read GetPort;
  107. property CallSynchronize: Boolean read FCallSynchronize write FCallSynchronize;
  108. property NoDelay: Boolean read FNoDelay write SetNoDelay;
  109. end;
  110. //procedure Register;
  111. implementation
  112. {TRecvThread}
  113. //------------------------------------------------------------------------------
  114. procedure TRecvThread.SetNoDelay(Value: Boolean);
  115. begin
  116. if FClientSocket <> INVALID_SOCKET then
  117. begin
  118. FNoDelay := Value;
  119. setsockopt(FClientSocket, IPPROTO_TCP, TCP_NODELAY, @FNoDelay, SizeOf(FNoDelay));
  120. end;
  121. end;
  122. //------------------------------------------------------------------------------
  123. function TRecvThread.GetIPAddress: String;
  124. begin
  125. Result := inet_ntoa(FClientAddr.sin_addr);
  126. end;
  127. //------------------------------------------------------------------------------
  128. function TRecvThread.GetPort: Integer;
  129. begin
  130. Result := ntohs(FClientAddr.sin_port);
  131. end;
  132. //------------------------------------------------------------------------------
  133. procedure TRecvThread.Disconnect;
  134. begin
  135. DoDisconnect;
  136. end;
  137. //------------------------------------------------------------------------------
  138. procedure TRecvThread.SendBuffer(var Buf; Size: Integer);
  139. var
  140. SendBytes,
  141. ErrorCode:Integer;
  142. begin
  143. try
  144. EnterCriticalSection(FEncryptCriticalSection);
  145. FBlockingTCPServer.DoBeforeSendData(Buf, Size);
  146. SendBytes:= send(FClientSocket, Buf, Size, 0);
  147. if SendBytes = 0 then
  148. begin
  149. Disconnect;
  150. Exit;
  151. end;
  152. if SendBytes = SOCKET_ERROR then
  153. begin
  154. ErrorCode := GetLastError;
  155. if (ErrorCode = WSAECONNABORTED) or
  156. (ErrorCode = WSAECONNRESET) or
  157. (ErrorCode = WSAETIMEDOUT) or
  158. (ErrorCode = WSAENOTSOCK) then
  159. begin
  160. DoDisconnect;
  161. end;
  162. end;
  163. finally
  164. LeaveCriticalSection(FEncryptCriticalSection);
  165. end;
  166. end;
  167. //------------------------------------------------------------------------------
  168. procedure TRecvThread.DoReceivedData;
  169. begin
  170. if Assigned(FBlockingTCPServer) then
  171. FBlockingTCPServer.DoReceivedData(Self, FRecvBytes);
  172. end;
  173. //------------------------------------------------------------------------------
  174. procedure TRecvThread.DoDisconnect;
  175. begin
  176. try
  177. try
  178. shutdown(FClientSocket, SD_BOTH);
  179. finally
  180. closeSocket(FClientSocket);
  181. FClientSocket := INVALID_SOCKET;
  182. end;
  183. finally
  184. if Assigned(FBlockingTCPServer) then
  185. FBlockingTCPServer.DoDisconnected(Self);
  186. end;
  187. end;
  188. //------------------------------------------------------------------------------
  189. procedure TRecvThread.Execute;
  190. var
  191. ErrorCode: Integer;
  192. begin
  193. FreeOnTerminate := True;
  194. while not Terminated do
  195. begin
  196. FRecvBytes := recv(FClientSocket, FBuf[FNotProcessedBufferLength], Length(FBuf)-FNotProcessedBufferLength, 0);
  197. if FRecvBytes = SOCKET_ERROR then
  198. begin
  199. ErrorCode := GetLastError;
  200. if (ErrorCode = WSAECONNABORTED) or
  201. (ErrorCode = WSAECONNRESET) or
  202. (ErrorCode = WSAETIMEDOUT) or
  203. (ErrorCode = WSAENOTSOCK) then
  204. begin
  205. Synchronize(DoDisconnect);
  206. break;
  207. end
  208. else
  209. begin
  210. Sleep(1);
  211. Continue;
  212. end;
  213. end;
  214. if FRecvBytes = 0 then
  215. begin
  216. Synchronize(DoDisconnect);
  217. break;
  218. end
  219. else
  220. begin
  221. FNotProcessedBufferLength := FNotProcessedBufferLength + FRecvBytes;
  222. try
  223. if FCallSynchronize then
  224. Synchronize(DoReceivedData)
  225. else
  226. DoReceivedData;
  227. except
  228. continue;
  229. end;
  230. end;
  231. end;
  232. end;
  233. //------------------------------------------------------------------------------
  234. constructor TRecvThread.Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn);
  235. begin
  236. inherited Create(True);
  237. InitializeCriticalSection(FEncryptCriticalSection);
  238. FCallSynchronize := True;
  239. FData := nil;
  240. FBlockingTCPServer := ABlockingTCPServer;
  241. FClientSocket := AClientSocket;
  242. FClientAddr := AClientAddr;
  243. SetLength(FBuf,FBlockingTCPServer.RecvBufferSize);
  244. FNotProcessedBufferLength := 0;
  245. with FBlockingTCPServer.FRecvThreads do
  246. begin
  247. Add(Self);
  248. end;
  249. FBlockingTCPServer.DoConnected(Self);
  250. Resume;
  251. end;
  252. //------------------------------------------------------------------------------
  253. destructor TRecvThread.Destroy;
  254. begin
  255. try
  256. if Assigned(FBlockingTCPServer) then
  257. begin
  258. with FBlockingTCPServer.FRecvThreads do
  259. begin
  260. Remove(Self);
  261. end;
  262. end;
  263. finally
  264. EnterCriticalSection(FEncryptCriticalSection);
  265. DeleteCriticalSection(FEncryptCriticalSection);
  266. inherited Destroy;
  267. end;
  268. end;
  269. //------------------------------------------------------------------------------
  270. procedure TRecvThread.CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  271. begin
  272. CopyMemory(@Buf,@FBuf[Offset],Size);
  273. end;
  274. //------------------------------------------------------------------------------
  275. procedure TRecvThread.CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
  276. begin
  277. CopyMemory(@Buf,@FBuf[Offset],Size);
  278. CopyMemory(FBuf,@FBuf[Offset+Size],Length(FBuf)-(Offset+Size));
  279. FNotProcessedBufferLength := FNotProcessedBufferLength - (Offset+Size);
  280. end;
  281. {TAcptThread}
  282. //------------------------------------------------------------------------------
  283. procedure TAcptThread.CreateRecvThread;
  284. begin
  285. TRecvThread.Create(FBlockingTCPServer, FClientSocket, FClientAddr);
  286. end;
  287. //------------------------------------------------------------------------------
  288. procedure TAcptThread.Execute;
  289. begin
  290. while not Terminated do
  291. begin
  292. try
  293. FAddrLength := SizeOf(FClientAddr);
  294. FClientSocket := accept(FBlockingTCPServer.FSocket, FClientAddr, FAddrLength);
  295. if FClientSocket <> INVALID_SOCKET then Synchronize(CreateRecvThread);
  296. except
  297. break;
  298. end;
  299. end;
  300. end;
  301. //------------------------------------------------------------------------------
  302. constructor TAcptThread.Create(ABlockingTCPServer: TBlockingTCPServer);
  303. begin
  304. inherited Create(True);
  305. FreeOnTerminate := True;
  306. FBlockingTCPServer := ABlockingTCPServer;
  307. Resume;
  308. end;
  309. //------------------------------------------------------------------------------
  310. destructor TAcptThread.Destroy;
  311. begin
  312. if FBlockingTCPServer <> nil then FBlockingTCPServer.FAcptThread := nil;
  313. inherited Destroy;
  314. end;
  315. {TBlockingTCPServer}
  316. //------------------------------------------------------------------------------
  317. procedure TBlockingTCPServer.Start(AListenPort: Cardinal);
  318. var
  319. serverAddr: TSockAddrIn;
  320. lastError: Integer;
  321. begin
  322. if FSocket = INVALID_SOCKET then FSocket := Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  323. if FSocket = INVALID_SOCKET then raise TSocketException.CreateFmt('创建套接字失败,错误代码:%d',[WSAGetLastError]);
  324. serverAddr.sin_family:= AF_INET;
  325. serverAddr.sin_port:= htons(AListenPort);
  326. serverAddr.sin_addr.S_addr:= htonl(INADDR_ANY);
  327. //绑定端口
  328. lastError := bind(FSocket, @serverAddr, SizeOf(serverAddr));
  329. if lastError = SOCKET_ERROR then
  330. begin
  331. closeSocket(FSocket);
  332. FSocket:= INVALID_SOCKET;
  333. raise TSocketException.CreateFmt('绑定TCP端口 %d 失败,错误代码:%d',[AListenPort, lastError]);
  334. end;
  335. //开始监听
  336. lastError := listen(FSocket, 5);
  337. if lastError <> 0 then raise TSocketException.CreateFmt('在TCP端口 %d 监听失败,错误代码:%d', [lastError]);
  338. FActive := True;
  339. FListenPort := AListenPort;
  340. if FAcptThread <> nil then FAcptThread.Terminate;
  341. FAcptThread := TAcptThread.Create(Self);
  342. DoStart;
  343. end;
  344. //------------------------------------------------------------------------------
  345. procedure TBlockingTCPServer.Stop;
  346. var
  347. ARecvThread: TRecvThread;
  348. begin
  349. if FSocket <> INVALID_SOCKET then
  350. begin
  351. try
  352. try
  353. shutdown(FSocket, SD_BOTH);
  354. finally
  355. closeSocket(FSocket);
  356. FSocket := INVALID_SOCKET;
  357. end;
  358. finally
  359. FActive := False;
  360. FListenPort := 0;
  361. DoStop;
  362. end;
  363. end;
  364. if FAcptThread <> nil then
  365. begin
  366. FAcptThread.Terminate;
  367. FAcptThread.FBlockingTCPServer := nil;
  368. end;
  369. while FRecvThreads.Count > 0 do
  370. begin
  371. ARecvThread := TRecvThread(FRecvThreads.Items[0]);
  372. FRecvThreads.Delete(0);
  373. ARecvThread.FBlockingTCPServer := nil;
  374. ARecvThread.Disconnect;
  375. ARecvThread.Terminate;
  376. end;
  377. end;
  378. //------------------------------------------------------------------------------
  379. procedure TBlockingTCPServer.DoStart;
  380. begin
  381. if Assigned(FOnStart) then FOnStart(Self);
  382. end;
  383. //------------------------------------------------------------------------------
  384. procedure TBlockingTCPServer.DoStop;
  385. begin
  386. if Assigned(FOnStop) then FOnStop(Self);
  387. end;
  388. //------------------------------------------------------------------------------
  389. procedure TBlockingTCPServer.SetRecvBufferSize(Value: Integer);
  390. begin
  391. if (Value<1) or (Value>65535) then raise TSocketException.Create('缓冲区大小必须为1-65535之间的数值');
  392. if FActive then raise TSocketException.Create('监听状态不允许更改缓冲大小');
  393. FRecvBufferSize := Value;
  394. end;
  395. //------------------------------------------------------------------------------
  396. procedure TBlockingTCPServer.DoConnected(RecvThread: TRecvThread);
  397. begin
  398. if Assigned(FOnConnected) then FOnConnected(Self, RecvThread);
  399. end;
  400. //------------------------------------------------------------------------------
  401. procedure TBlockingTCPServer.DoDisconnected(RecvThread: TRecvThread);
  402. begin
  403. if Assigned(FOnDisconnected) then FOnDisconnected(Self, RecvThread);
  404. end;
  405. //------------------------------------------------------------------------------
  406. procedure TBlockingTCPServer.DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer);
  407. begin
  408. if Assigned(FOnReceivedData) then FOnReceivedData(Self, RecvThread, RecvBytes);
  409. end;
  410. //------------------------------------------------------------------------------
  411. procedure TBlockingTCPServer.DoSendedData(RecvThread: TRecvThread; SendBytes: Integer);
  412. begin
  413. if Assigned(FOnSendedData) then FOnSendedData(Self, RecvThread, SendBytes);
  414. end;
  415. //------------------------------------------------------------------------------
  416. procedure TBlockingTCPServer.DoBeforeSendData(var Buf; Size: Integer);
  417. begin
  418. if Assigned(FOnBeforeSendData) then FOnBeforeSendData(Self, Buf, Size);
  419. end;
  420. //------------------------------------------------------------------------------
  421. constructor TBlockingTCPServer.Create;
  422. begin
  423. inherited Create;
  424. FActive := False;
  425. FSocket := INVALID_SOCKET;
  426. FListenPort := 0;
  427. FRecvBufferSize := 8192 * 2;
  428. FRecvThreads := TList.Create;
  429. FAcptThread := nil;
  430. end;
  431. //------------------------------------------------------------------------------
  432. destructor TBlockingTCPServer.Destroy;
  433. begin
  434. if Active then Stop;
  435. if FAcptThread <> nil then FAcptThread.Terminate;
  436. FRecvThreads.Clear;
  437. FreeAndNil(FRecvThreads);
  438. inherited Destroy;
  439. end;
  440. end.