{ 文件名:BlockingTCPServer.pas 功 能:阻塞方式TCP监听组件,单独线程接收数据(每个客户端一个线程)。 建 立:尹进 历 史: 2005.12.23:补文件说明信息(尹进) } unit BlockingTCPServer; interface uses WinSock2, RealICQSocket, RealICQProxy, SysUtils, Classes, Windows, Forms; type TAcptThread = class; TRecvThread = class; TBlockingTCPServerSendedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; SendBytes: Integer) of object; TBlockingTCPServerReceivedDataEvent = procedure(Sender: TObject; RecvThread: TRecvThread; RecvBytes: Integer) of object; TBlockingTCPServerConnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object; TBlockingTCPServerDisconnectedEvent = procedure(Sender: TObject; RecvThread: TRecvThread) of object; TBlockingTCPServerBeforeSendDataEvent = procedure(Sender: TObject; var Buf; Size: Integer) of object; TBlockingTCPServer = class private FActive: Boolean; FSocket: TSocket; FListenPort: Cardinal; FRecvBufferSize: Integer; FAcptThread: TAcptThread; FRecvThreads: TList; FOnStart: TNotifyEvent; FOnStop: TNotifyEvent; FOnSendedData: TBlockingTCPServerSendedDataEvent; FOnReceivedData: TBlockingTCPServerReceivedDataEvent; FOnConnected: TBlockingTCPServerConnectedEvent; FOnDisconnected: TBlockingTCPServerDisconnectedEvent; FOnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent; procedure SetRecvBufferSize(Value:Integer); protected procedure DoStart; procedure DoStop; procedure DoConnected(RecvThread: TRecvThread); procedure DoDisconnected(RecvThread: TRecvThread); procedure DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer); procedure DoSendedData(RecvThread: TRecvThread; SendBytes: Integer); procedure DoBeforeSendData(var Buf; Size: Integer); public constructor Create; destructor Destroy; override; procedure Start(AListenPort: Cardinal); procedure Stop; published property Active: Boolean read FActive; property ListenPort: Cardinal read FListenPort; property RecvBufferSize: Integer read FRecvBufferSize write SetRecvBufferSize; property OnStart: TNotifyEvent read FOnStart write FOnStart; property OnStop: TNotifyEvent read FOnStop write FOnStart; property OnSendedData: TBlockingTCPServerSendedDataEvent read FOnSendedData write FOnSendedData; property OnReceivedData: TBlockingTCPServerReceivedDataEvent read FOnReceivedData write FOnReceivedData; property OnConnected: TBlockingTCPServerConnectedEvent read FOnConnected write FOnConnected; property OnDisconnected: TBlockingTCPServerDisconnectedEvent read FOnDisconnected write FOnDisconnected; property OnBeforeSendData: TBlockingTCPServerBeforeSendDataEvent read FOnBeforeSendData write FOnBeforeSendData; end; //接收连接的线程类 TAcptThread = class(TThread) private FBlockingTCPServer: TBlockingTCPServer; FClientSocket: TSocket; FClientAddr: TSockAddrIn; FAddrLength: Integer; procedure CreateRecvThread; protected procedure Execute; override; public constructor Create(ABlockingTCPServer: TBlockingTCPServer); destructor Destroy; override; end; //接收数据的线程类 TRecvThread = class(TThread) private FEncryptCriticalSection: TRTLCriticalSection; FBlockingTCPServer: TBlockingTCPServer; FClientSocket: TSocket; FClientAddr: TSockAddrIn; FData: TObject; FCallSynchronize: Boolean; FNoDelay: Boolean; FBuf: array of Byte; FNotProcessedBufferLength: Integer; FRecvBytes: Integer; procedure DoDisconnect; procedure DoReceivedData; function GetIPAddress: String; function GetPort: Integer; procedure SetNoDelay(Value: Boolean); protected procedure Execute; override; public constructor Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn); destructor Destroy; override; procedure Disconnect; procedure SendBuffer(var Buf; Size: Integer); procedure CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer); procedure CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer); property Data: TObject read FData write FData; property NotProcessedBufferLength: Integer read FNotProcessedBufferLength; property IPAddress: String read GetIPAddress; property Port: Integer read GetPort; property CallSynchronize: Boolean read FCallSynchronize write FCallSynchronize; property NoDelay: Boolean read FNoDelay write SetNoDelay; end; //procedure Register; implementation {TRecvThread} //------------------------------------------------------------------------------ procedure TRecvThread.SetNoDelay(Value: Boolean); begin if FClientSocket <> INVALID_SOCKET then begin FNoDelay := Value; setsockopt(FClientSocket, IPPROTO_TCP, TCP_NODELAY, @FNoDelay, SizeOf(FNoDelay)); end; end; //------------------------------------------------------------------------------ function TRecvThread.GetIPAddress: String; begin Result := inet_ntoa(FClientAddr.sin_addr); end; //------------------------------------------------------------------------------ function TRecvThread.GetPort: Integer; begin Result := ntohs(FClientAddr.sin_port); end; //------------------------------------------------------------------------------ procedure TRecvThread.Disconnect; begin DoDisconnect; end; //------------------------------------------------------------------------------ procedure TRecvThread.SendBuffer(var Buf; Size: Integer); var SendBytes, ErrorCode:Integer; begin try EnterCriticalSection(FEncryptCriticalSection); FBlockingTCPServer.DoBeforeSendData(Buf, Size); SendBytes:= send(FClientSocket, Buf, Size, 0); if SendBytes = 0 then begin Disconnect; Exit; end; if SendBytes = SOCKET_ERROR then begin ErrorCode := GetLastError; if (ErrorCode = WSAECONNABORTED) or (ErrorCode = WSAECONNRESET) or (ErrorCode = WSAETIMEDOUT) or (ErrorCode = WSAENOTSOCK) then begin DoDisconnect; end; end; finally LeaveCriticalSection(FEncryptCriticalSection); end; end; //------------------------------------------------------------------------------ procedure TRecvThread.DoReceivedData; begin if Assigned(FBlockingTCPServer) then FBlockingTCPServer.DoReceivedData(Self, FRecvBytes); end; //------------------------------------------------------------------------------ procedure TRecvThread.DoDisconnect; begin try try shutdown(FClientSocket, SD_BOTH); finally closeSocket(FClientSocket); FClientSocket := INVALID_SOCKET; end; finally if Assigned(FBlockingTCPServer) then FBlockingTCPServer.DoDisconnected(Self); end; end; //------------------------------------------------------------------------------ procedure TRecvThread.Execute; var ErrorCode: Integer; begin FreeOnTerminate := True; while not Terminated do begin FRecvBytes := recv(FClientSocket, FBuf[FNotProcessedBufferLength], Length(FBuf)-FNotProcessedBufferLength, 0); if FRecvBytes = SOCKET_ERROR then begin ErrorCode := GetLastError; if (ErrorCode = WSAECONNABORTED) or (ErrorCode = WSAECONNRESET) or (ErrorCode = WSAETIMEDOUT) or (ErrorCode = WSAENOTSOCK) then begin Synchronize(DoDisconnect); break; end else begin Sleep(1); Continue; end; end; if FRecvBytes = 0 then begin Synchronize(DoDisconnect); break; end else begin FNotProcessedBufferLength := FNotProcessedBufferLength + FRecvBytes; try if FCallSynchronize then Synchronize(DoReceivedData) else DoReceivedData; except continue; end; end; end; end; //------------------------------------------------------------------------------ constructor TRecvThread.Create(ABlockingTCPServer: TBlockingTCPServer; AClientSocket: TSocket; AClientAddr: TSockAddrIn); begin inherited Create(True); InitializeCriticalSection(FEncryptCriticalSection); FCallSynchronize := True; FData := nil; FBlockingTCPServer := ABlockingTCPServer; FClientSocket := AClientSocket; FClientAddr := AClientAddr; SetLength(FBuf,FBlockingTCPServer.RecvBufferSize); FNotProcessedBufferLength := 0; with FBlockingTCPServer.FRecvThreads do begin Add(Self); end; FBlockingTCPServer.DoConnected(Self); Resume; end; //------------------------------------------------------------------------------ destructor TRecvThread.Destroy; begin try if Assigned(FBlockingTCPServer) then begin with FBlockingTCPServer.FRecvThreads do begin Remove(Self); end; end; finally EnterCriticalSection(FEncryptCriticalSection); DeleteCriticalSection(FEncryptCriticalSection); inherited Destroy; end; end; //------------------------------------------------------------------------------ procedure TRecvThread.CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer); begin CopyMemory(@Buf,@FBuf[Offset],Size); end; //------------------------------------------------------------------------------ procedure TRecvThread.CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer); begin CopyMemory(@Buf,@FBuf[Offset],Size); CopyMemory(FBuf,@FBuf[Offset+Size],Length(FBuf)-(Offset+Size)); FNotProcessedBufferLength := FNotProcessedBufferLength - (Offset+Size); end; {TAcptThread} //------------------------------------------------------------------------------ procedure TAcptThread.CreateRecvThread; begin TRecvThread.Create(FBlockingTCPServer, FClientSocket, FClientAddr); end; //------------------------------------------------------------------------------ procedure TAcptThread.Execute; begin while not Terminated do begin try FAddrLength := SizeOf(FClientAddr); FClientSocket := accept(FBlockingTCPServer.FSocket, FClientAddr, FAddrLength); if FClientSocket <> INVALID_SOCKET then Synchronize(CreateRecvThread); except break; end; end; end; //------------------------------------------------------------------------------ constructor TAcptThread.Create(ABlockingTCPServer: TBlockingTCPServer); begin inherited Create(True); FreeOnTerminate := True; FBlockingTCPServer := ABlockingTCPServer; Resume; end; //------------------------------------------------------------------------------ destructor TAcptThread.Destroy; begin if FBlockingTCPServer <> nil then FBlockingTCPServer.FAcptThread := nil; inherited Destroy; end; {TBlockingTCPServer} //------------------------------------------------------------------------------ procedure TBlockingTCPServer.Start(AListenPort: Cardinal); var serverAddr: TSockAddrIn; lastError: Integer; begin if FSocket = INVALID_SOCKET then FSocket := Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if FSocket = INVALID_SOCKET then raise TSocketException.CreateFmt('创建套接字失败,错误代码:%d',[WSAGetLastError]); serverAddr.sin_family:= AF_INET; serverAddr.sin_port:= htons(AListenPort); serverAddr.sin_addr.S_addr:= htonl(INADDR_ANY); //绑定端口 lastError := bind(FSocket, @serverAddr, SizeOf(serverAddr)); if lastError = SOCKET_ERROR then begin closeSocket(FSocket); FSocket:= INVALID_SOCKET; raise TSocketException.CreateFmt('绑定TCP端口 %d 失败,错误代码:%d',[AListenPort, lastError]); end; //开始监听 lastError := listen(FSocket, 5); if lastError <> 0 then raise TSocketException.CreateFmt('在TCP端口 %d 监听失败,错误代码:%d', [lastError]); FActive := True; FListenPort := AListenPort; if FAcptThread <> nil then FAcptThread.Terminate; FAcptThread := TAcptThread.Create(Self); DoStart; end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.Stop; var ARecvThread: TRecvThread; begin if FSocket <> INVALID_SOCKET then begin try try shutdown(FSocket, SD_BOTH); finally closeSocket(FSocket); FSocket := INVALID_SOCKET; end; finally FActive := False; FListenPort := 0; DoStop; end; end; if FAcptThread <> nil then begin FAcptThread.Terminate; FAcptThread.FBlockingTCPServer := nil; end; while FRecvThreads.Count > 0 do begin ARecvThread := TRecvThread(FRecvThreads.Items[0]); FRecvThreads.Delete(0); ARecvThread.FBlockingTCPServer := nil; ARecvThread.Disconnect; ARecvThread.Terminate; end; end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoStart; begin if Assigned(FOnStart) then FOnStart(Self); end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoStop; begin if Assigned(FOnStop) then FOnStop(Self); end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.SetRecvBufferSize(Value: Integer); begin if (Value<1) or (Value>65535) then raise TSocketException.Create('缓冲区大小必须为1-65535之间的数值'); if FActive then raise TSocketException.Create('监听状态不允许更改缓冲大小'); FRecvBufferSize := Value; end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoConnected(RecvThread: TRecvThread); begin if Assigned(FOnConnected) then FOnConnected(Self, RecvThread); end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoDisconnected(RecvThread: TRecvThread); begin if Assigned(FOnDisconnected) then FOnDisconnected(Self, RecvThread); end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoReceivedData(RecvThread: TRecvThread; RecvBytes: Integer); begin if Assigned(FOnReceivedData) then FOnReceivedData(Self, RecvThread, RecvBytes); end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoSendedData(RecvThread: TRecvThread; SendBytes: Integer); begin if Assigned(FOnSendedData) then FOnSendedData(Self, RecvThread, SendBytes); end; //------------------------------------------------------------------------------ procedure TBlockingTCPServer.DoBeforeSendData(var Buf; Size: Integer); begin if Assigned(FOnBeforeSendData) then FOnBeforeSendData(Self, Buf, Size); end; //------------------------------------------------------------------------------ constructor TBlockingTCPServer.Create; begin inherited Create; FActive := False; FSocket := INVALID_SOCKET; FListenPort := 0; FRecvBufferSize := 8192 * 2; FRecvThreads := TList.Create; FAcptThread := nil; end; //------------------------------------------------------------------------------ destructor TBlockingTCPServer.Destroy; begin if Active then Stop; if FAcptThread <> nil then FAcptThread.Terminate; FRecvThreads.Clear; FreeAndNil(FRecvThreads); inherited Destroy; end; end.