| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- {
- 文件名:BlockingTCPServer.pas
- 功 能:阻塞方式TCP监听组件,单独线程接收数据(每个客户端一个线程)。
- 建 立:尹进
- 历 史:
- 2005.12.23:补文件说明信息(尹进)
- }
- unit BlockingTCPServer;
- interface
- uses
- WinSock2, RealICQSocket, RealICQProxy,
- SysUtils, Classes, Windows, Forms;
- type
- TAcptThread = class;
- TRecvThread = class;
-
- TBlockingTCPServerSendedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; SendBytes: Integer) of object;
- TBlockingTCPServerReceivedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; RecvBytes: Integer) of object;
- TBlockingTCPServerConnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object;
- TBlockingTCPServerDisconnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object;
- TBlockingTCPServerBeforeSendDataEvent = procedure(Sender: TObject; var Buf; Size: Integer) of object;
- TBlockingTCPServer = class
- private
- FActive: Boolean;
- FSocket: TSocket;
- FListenPort: Cardinal;
- FRecvBufferSize: Integer;
- FAcptThread: TAcptThread;
- FRecvThreads: TList;
- FOnStart: TNotifyEvent;
- FOnStop: TNotifyEvent;
- FOnSendedData: TBlockingTCPServerSendedDataEvent;
- FOnReceivedData: TBlockingTCPServerReceivedDataEvent;
- FOnConnected: TBlockingTCPServerConnectedEvent;
- FOnDisconnected: TBlockingTCPServerDisconnectedEvent;
- FOnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent;
-
- procedure SetRecvBufferSize(Value:Integer);
- protected
- procedure DoStart;
- procedure DoStop;
- procedure DoConnected(RecvThread: TRecvThread);
- procedure DoDisconnected(RecvThread: TRecvThread);
- procedure DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer);
- procedure DoSendedData(RecvThread: TRecvThread; SendBytes: Integer);
- procedure DoBeforeSendData(var Buf; Size: Integer);
- public
- constructor Create;
- destructor Destroy; override;
- procedure Start(AListenPort: Cardinal);
- procedure Stop;
- published
- property Active: Boolean read FActive;
- property ListenPort: Cardinal read FListenPort;
- property RecvBufferSize: Integer read FRecvBufferSize write SetRecvBufferSize;
- property OnStart: TNotifyEvent read FOnStart write FOnStart;
- property OnStop: TNotifyEvent read FOnStop write FOnStart;
- property OnSendedData: TBlockingTCPServerSendedDataEvent read FOnSendedData write FOnSendedData;
- property OnReceivedData: TBlockingTCPServerReceivedDataEvent read FOnReceivedData write FOnReceivedData;
- property OnConnected: TBlockingTCPServerConnectedEvent read FOnConnected write FOnConnected;
- property OnDisconnected: TBlockingTCPServerDisconnectedEvent read FOnDisconnected write FOnDisconnected;
- property OnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent read FOnBeforeSendData write FOnBeforeSendData;
- end;
- //接收连接的线程类
- TAcptThread = class(TThread)
- private
- FBlockingTCPServer: TBlockingTCPServer;
- FClientSocket: TSocket;
- FClientAddr: TSockAddrIn;
- FAddrLength: Integer;
- procedure CreateRecvThread;
- protected
- procedure Execute; override;
- public
- constructor Create(ABlockingTCPServer: TBlockingTCPServer);
- destructor Destroy; override;
- end;
- //接收数据的线程类
- TRecvThread = class(TThread)
- private
- FEncryptCriticalSection: TRTLCriticalSection;
- FBlockingTCPServer: TBlockingTCPServer;
- FClientSocket: TSocket;
- FClientAddr: TSockAddrIn;
- FData: TObject;
- FCallSynchronize: Boolean;
- FNoDelay: Boolean;
- FBuf: array of Byte;
- FNotProcessedBufferLength: Integer;
- FRecvBytes: Integer;
- procedure DoDisconnect;
- procedure DoReceivedData;
- function GetIPAddress: String;
- function GetPort: Integer;
-
- procedure SetNoDelay(Value: Boolean);
- protected
- procedure Execute; override;
- public
- constructor Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn);
- destructor Destroy; override;
- procedure Disconnect;
- procedure SendBuffer(var Buf; Size: Integer);
- procedure CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
- procedure CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
- property Data: TObject read FData write FData;
- property NotProcessedBufferLength: Integer read FNotProcessedBufferLength;
- property IPAddress: String read GetIPAddress;
- property Port: Integer read GetPort;
- property CallSynchronize: Boolean read FCallSynchronize write FCallSynchronize;
- property NoDelay: Boolean read FNoDelay write SetNoDelay;
- end;
-
- //procedure Register;
-
- implementation
- {TRecvThread}
- //------------------------------------------------------------------------------
- procedure TRecvThread.SetNoDelay(Value: Boolean);
- begin
- if FClientSocket <> INVALID_SOCKET then
- begin
- FNoDelay := Value;
- setsockopt(FClientSocket, IPPROTO_TCP, TCP_NODELAY, @FNoDelay, SizeOf(FNoDelay));
- end;
- end;
- //------------------------------------------------------------------------------
- function TRecvThread.GetIPAddress: String;
- begin
- Result := inet_ntoa(FClientAddr.sin_addr);
- end;
- //------------------------------------------------------------------------------
- function TRecvThread.GetPort: Integer;
- begin
- Result := ntohs(FClientAddr.sin_port);
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.Disconnect;
- begin
- DoDisconnect;
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.SendBuffer(var Buf; Size: Integer);
- var
- SendBytes,
- ErrorCode:Integer;
- begin
- try
- EnterCriticalSection(FEncryptCriticalSection);
-
- FBlockingTCPServer.DoBeforeSendData(Buf, Size);
- SendBytes:= send(FClientSocket, Buf, Size, 0);
- if SendBytes = 0 then
- begin
- Disconnect;
- Exit;
- end;
- if SendBytes = SOCKET_ERROR then
- begin
- ErrorCode := GetLastError;
- if (ErrorCode = WSAECONNABORTED) or
- (ErrorCode = WSAECONNRESET) or
- (ErrorCode = WSAETIMEDOUT) or
- (ErrorCode = WSAENOTSOCK) then
- begin
- DoDisconnect;
- end;
- end;
- finally
- LeaveCriticalSection(FEncryptCriticalSection);
- end;
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.DoReceivedData;
- begin
- if Assigned(FBlockingTCPServer) then
- FBlockingTCPServer.DoReceivedData(Self, FRecvBytes);
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.DoDisconnect;
- begin
- try
- try
- shutdown(FClientSocket, SD_BOTH);
- finally
- closeSocket(FClientSocket);
- FClientSocket := INVALID_SOCKET;
- end;
- finally
- if Assigned(FBlockingTCPServer) then
- FBlockingTCPServer.DoDisconnected(Self);
- end;
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.Execute;
- var
- ErrorCode: Integer;
- begin
- FreeOnTerminate := True;
- while not Terminated do
- begin
- FRecvBytes := recv(FClientSocket, FBuf[FNotProcessedBufferLength], Length(FBuf)-FNotProcessedBufferLength, 0);
- if FRecvBytes = SOCKET_ERROR then
- begin
- ErrorCode := GetLastError;
- if (ErrorCode = WSAECONNABORTED) or
- (ErrorCode = WSAECONNRESET) or
- (ErrorCode = WSAETIMEDOUT) or
- (ErrorCode = WSAENOTSOCK) then
- begin
- Synchronize(DoDisconnect);
- break;
- end
- else
- begin
- Sleep(1);
- Continue;
- end;
- end;
-
- if FRecvBytes = 0 then
- begin
- Synchronize(DoDisconnect);
- break;
- end
- else
- begin
- FNotProcessedBufferLength := FNotProcessedBufferLength + FRecvBytes;
- try
- if FCallSynchronize then
- Synchronize(DoReceivedData)
- else
- DoReceivedData;
- except
- continue;
- end;
- end;
- end;
- end;
- //------------------------------------------------------------------------------
- constructor TRecvThread.Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn);
- begin
- inherited Create(True);
- InitializeCriticalSection(FEncryptCriticalSection);
- FCallSynchronize := True;
- FData := nil;
-
- FBlockingTCPServer := ABlockingTCPServer;
- FClientSocket := AClientSocket;
- FClientAddr := AClientAddr;
-
- SetLength(FBuf,FBlockingTCPServer.RecvBufferSize);
- FNotProcessedBufferLength := 0;
-
- with FBlockingTCPServer.FRecvThreads do
- begin
- Add(Self);
- end;
- FBlockingTCPServer.DoConnected(Self);
- Resume;
- end;
- //------------------------------------------------------------------------------
- destructor TRecvThread.Destroy;
- begin
- try
- if Assigned(FBlockingTCPServer) then
- begin
- with FBlockingTCPServer.FRecvThreads do
- begin
- Remove(Self);
- end;
- end;
- finally
- EnterCriticalSection(FEncryptCriticalSection);
- DeleteCriticalSection(FEncryptCriticalSection);
- inherited Destroy;
- end;
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
- begin
- CopyMemory(@Buf,@FBuf[Offset],Size);
- end;
- //------------------------------------------------------------------------------
- procedure TRecvThread.CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer);
- begin
- CopyMemory(@Buf,@FBuf[Offset],Size);
- CopyMemory(FBuf,@FBuf[Offset+Size],Length(FBuf)-(Offset+Size));
- FNotProcessedBufferLength := FNotProcessedBufferLength - (Offset+Size);
- end;
- {TAcptThread}
- //------------------------------------------------------------------------------
- procedure TAcptThread.CreateRecvThread;
- begin
- TRecvThread.Create(FBlockingTCPServer, FClientSocket, FClientAddr);
- end;
- //------------------------------------------------------------------------------
- procedure TAcptThread.Execute;
- begin
- while not Terminated do
- begin
- try
- FAddrLength := SizeOf(FClientAddr);
- FClientSocket := accept(FBlockingTCPServer.FSocket, FClientAddr, FAddrLength);
- if FClientSocket <> INVALID_SOCKET then Synchronize(CreateRecvThread);
- except
- break;
- end;
- end;
- end;
- //------------------------------------------------------------------------------
- constructor TAcptThread.Create(ABlockingTCPServer: TBlockingTCPServer);
- begin
- inherited Create(True);
- FreeOnTerminate := True;
-
- FBlockingTCPServer := ABlockingTCPServer;
- Resume;
- end;
- //------------------------------------------------------------------------------
- destructor TAcptThread.Destroy;
- begin
- if FBlockingTCPServer <> nil then FBlockingTCPServer.FAcptThread := nil;
- inherited Destroy;
- end;
- {TBlockingTCPServer}
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.Start(AListenPort: Cardinal);
- var
- serverAddr: TSockAddrIn;
- lastError: Integer;
- begin
- if FSocket = INVALID_SOCKET then FSocket := Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if FSocket = INVALID_SOCKET then raise TSocketException.CreateFmt('创建套接字失败,错误代码:%d',[WSAGetLastError]);
- serverAddr.sin_family:= AF_INET;
- serverAddr.sin_port:= htons(AListenPort);
- serverAddr.sin_addr.S_addr:= htonl(INADDR_ANY);
- //绑定端口
- lastError := bind(FSocket, @serverAddr, SizeOf(serverAddr));
- if lastError = SOCKET_ERROR then
- begin
- closeSocket(FSocket);
- FSocket:= INVALID_SOCKET;
- raise TSocketException.CreateFmt('绑定TCP端口 %d 失败,错误代码:%d',[AListenPort, lastError]);
- end;
- //开始监听
- lastError := listen(FSocket, 5);
- if lastError <> 0 then raise TSocketException.CreateFmt('在TCP端口 %d 监听失败,错误代码:%d', [lastError]);
- FActive := True;
- FListenPort := AListenPort;
- if FAcptThread <> nil then FAcptThread.Terminate;
- FAcptThread := TAcptThread.Create(Self);
-
- DoStart;
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.Stop;
- var
- ARecvThread: TRecvThread;
- begin
- if FSocket <> INVALID_SOCKET then
- begin
- try
- try
- shutdown(FSocket, SD_BOTH);
- finally
- closeSocket(FSocket);
- FSocket := INVALID_SOCKET;
- end;
- finally
- FActive := False;
- FListenPort := 0;
- DoStop;
- end;
- end;
- if FAcptThread <> nil then
- begin
- FAcptThread.Terminate;
- FAcptThread.FBlockingTCPServer := nil;
- end;
- while FRecvThreads.Count > 0 do
- begin
- ARecvThread := TRecvThread(FRecvThreads.Items[0]);
- FRecvThreads.Delete(0);
-
- ARecvThread.FBlockingTCPServer := nil;
- ARecvThread.Disconnect;
- ARecvThread.Terminate;
- end;
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoStart;
- begin
- if Assigned(FOnStart) then FOnStart(Self);
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoStop;
- begin
- if Assigned(FOnStop) then FOnStop(Self);
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.SetRecvBufferSize(Value: Integer);
- begin
- if (Value<1) or (Value>65535) then raise TSocketException.Create('缓冲区大小必须为1-65535之间的数值');
- if FActive then raise TSocketException.Create('监听状态不允许更改缓冲大小');
- FRecvBufferSize := Value;
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoConnected(RecvThread: TRecvThread);
- begin
- if Assigned(FOnConnected) then FOnConnected(Self, RecvThread);
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoDisconnected(RecvThread: TRecvThread);
- begin
- if Assigned(FOnDisconnected) then FOnDisconnected(Self, RecvThread);
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer);
- begin
- if Assigned(FOnReceivedData) then FOnReceivedData(Self, RecvThread, RecvBytes);
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoSendedData(RecvThread: TRecvThread; SendBytes: Integer);
- begin
- if Assigned(FOnSendedData) then FOnSendedData(Self, RecvThread, SendBytes);
- end;
- //------------------------------------------------------------------------------
- procedure TBlockingTCPServer.DoBeforeSendData(var Buf; Size: Integer);
- begin
- if Assigned(FOnBeforeSendData) then FOnBeforeSendData(Self, Buf, Size);
- end;
-
- //------------------------------------------------------------------------------
- constructor TBlockingTCPServer.Create;
- begin
- inherited Create;
- FActive := False;
- FSocket := INVALID_SOCKET;
- FListenPort := 0;
- FRecvBufferSize := 8192 * 2;
-
- FRecvThreads := TList.Create;
- FAcptThread := nil;
- end;
- //------------------------------------------------------------------------------
- destructor TBlockingTCPServer.Destroy;
- begin
- if Active then Stop;
- if FAcptThread <> nil then FAcptThread.Terminate;
-
- FRecvThreads.Clear;
- FreeAndNil(FRecvThreads);
-
- inherited Destroy;
- end;
- end.
|