{ 文件名:BlockingTCPClient.pas 功 能:阻塞方式TCP客户端组件,单独线程接收数据。 建 立:尹进 历 史: 2005.12.23:补文件说明信息(尹进) } unit BlockingTCPClient; interface uses WinSock2, RealICQSocket, RealICQProxy, SysUtils, Classes, Windows; type {$M+} TBlockingTCPClientRecvThread = class; TBlockingTCPClientReceivedDataEvent = procedure(Sender: TObject; RecvThread: TBlockingTCPClientRecvThread; RecvBytes: Integer) of object; TBlockingTCPClientSendedDataEvent = procedure(Sender: TObject; SendBytes: Integer) of object; TBlockingTCPClientBeforeSendDataEvent = procedure(Sender: TObject; var Buf; Size: Integer) of object; TBlockingTCPClient = class private FEncryptCriticalSection: TRTLCriticalSection; FConnected: Boolean; FSocket: TSocket; FRemoteAddress: String; FRemotePort: Integer; FProxy: TProxy; FRecvBufferSize: Integer; FCallSynchronize: Boolean; FLocalAddress: String; FLocalPort: Integer; FNoDelay: Boolean; FOnConnected: TNotifyEvent; FOnDisconnected: TNotifyEvent; FOnReceivedData: TBlockingTCPClientReceivedDataEvent; FOnSendedData: TBlockingTCPClientSendedDataEvent; FOnBeforeSendData: TBlockingTCPClientBeforeSendDataEvent; procedure SetRemoteAddress(Value:String); procedure SetRemotePort(Value:Integer); procedure SetProxy(Value:TProxy); procedure SetRecvBufferSize(Value:Integer); procedure SetNoDelay(Value: Boolean); protected procedure DoConnected; procedure DoDisconnected; procedure DoReceivedData(RecvThread: TBlockingTCPClientRecvThread; RecvBytes: Integer); procedure DoSendedData(SendBytes: Integer); procedure DoBeforeSendData(var Buf; Size: Integer); public constructor Create; destructor Destroy; override; procedure Connect(StartRecvThread: Boolean = True); procedure Disconnect; procedure SendBuffer(var Buf; Size: Integer); property SocketNO: TSocket read FSocket; property NoDelay: Boolean read FNoDelay write SetNoDelay; property CallSynchronize: Boolean read FCallSynchronize write FCallSynchronize; property RemoteAddress: String read FRemoteAddress write SetRemoteAddress; property RemotePort: Integer read FRemotePort write SetRemotePort; property Proxy: TProxy read FProxy write SetProxy; property Connected: Boolean read FConnected; property RecvBufferSize: Integer read FRecvBufferSize write SetRecvBufferSize; property LocalAddress: String read FLocalAddress write FLocalAddress; property LocalPort: Integer read FLocalPort write FLocalPort; property OnConnected: TNotifyEvent read FOnConnected write FOnConnected; property OnDisconnected: TNotifyEvent read FOnDisconnected write FOnDisconnected; property OnReceivedData: TBlockingTCPClientReceivedDataEvent read FOnReceivedData write FOnReceivedData; property OnSendedData: TBlockingTCPClientSendedDataEvent read FOnSendedData write FOnSendedData; property OnBeforeSendData: TBlockingTCPClientBeforeSendDataEvent read FOnBeforeSendData write FOnBeforeSendData; end; //用于接收数据的线程 TBlockingTCPClientRecvThread = class(TThread) private FBlockingTCPClient: TBlockingTCPClient; FBuf: array of Byte; FNotProcessedBufferLength: Integer; FRecvBytes: Integer; procedure DoDisconnect; procedure DoReceivedData(); protected procedure Execute; override; public constructor Create(ABlockingTCPClient: TBlockingTCPClient); destructor Destroy; override; procedure CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer); procedure CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer); published property NotProcessedBufferLength: Integer read FNotProcessedBufferLength; end; implementation uses LoggerImport; {TBlockingTCPClient} //------------------------------------------------------------------------------ procedure TBlockingTCPClient.SetNoDelay(Value: Boolean); begin if FSocket <> INVALID_SOCKET then begin FNoDelay := Value; setsockopt(FSocket, IPPROTO_TCP, TCP_NODELAY, @FNoDelay, SizeOf(FNoDelay)); end; end; //------------------------------------------------------------------------------ constructor TBlockingTCPClient.Create; begin inherited Create; InitializeCriticalSection(FEncryptCriticalSection); FCallSynchronize := True; FNoDelay := False; FConnected := False; FSocket := INVALID_SOCKET; FProxy := TProxy.Create; FRecvBufferSize := 65534; // FRecvBufferSize := 8192 * 2; end; //------------------------------------------------------------------------------ destructor TBlockingTCPClient.Destroy; begin try if Connected then Disconnect; FProxy.Free; finally EnterCriticalSection(FEncryptCriticalSection); DeleteCriticalSection(FEncryptCriticalSection); inherited Destroy; end; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.Connect(StartRecvThread: Boolean = True); var serverAddr: TSockAddrIn; BindProxyAddr: TSockAddrIn; lastError: Integer; ARemoteIP: String; ret:Integer; LocalAddr: TSockAddrIn; Length: Integer; begin if FRemoteAddress = '' then raise TSocketException.Create('Server address empty'); if FRemotePort = 0 then raise TSocketException.Create('Server port empty'); if Connected then Disconnect; case FProxy.ProxyType of ptNone: begin FSocket := Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if FSocket = INVALID_SOCKET then raise TSocketException.CreateFmt('ErrorCode:%d',[WSAGetLastError]); if not HostToIP(FRemoteAddress, ARemoteIP) then ARemoteIP := FRemoteAddress; serverAddr.sin_family:= AF_INET; serverAddr.sin_port:= htons(FRemotePort); serverAddr.sin_addr.S_addr:= inet_addr(PAnsiChar(ARemoteIP)); ret := WinSock2.connect(FSocket,@serverAddr,SizeOf(serverAddr)); if ret = SOCKET_ERROR then begin lastError := WSAGetLastError(); if lastError <> 0 then raise TSocketException.CreateFmt('连接失败:%d',[lastError]); end; end; ptSocks5: begin if FProxy.Address = '' then raise TSocketException.Create('Proxy server address Empty'); if FProxy.Port = 0 then raise TSocketException.Create('Proxy server port Empty'); FSocket := ConnectToSocks5Proxy(FRemoteAddress, FRemotePort, FProxy.Address, FProxy.Port, FProxy.Username, FProxy.Password, ppTCP, BindProxyAddr); end; ptHttp: begin if FProxy.Address = '' then raise TSocketException.Create('Proxy server address Empty'); if FProxy.Port = 0 then raise TSocketException.Create('Proxy server port Empty'); FSocket := ConnectToHttpProxy(FRemoteAddress, FRemotePort, FProxy.Address, FProxy.Port, FProxy.Username, FProxy.Password, FProxy.Domain); end; end; Length := SizeOf(LocalAddr); getSockname(FSocket, LocalAddr, Length); FLocalAddress := inet_ntoa(LocalAddr.sin_addr); FLocalPort := ntohs(localAddr.sin_port); FConnected := True; DoConnected; if StartRecvThread then TBlockingTCPClientRecvThread.Create(Self); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.Disconnect; begin if FSocket <> INVALID_SOCKET then begin try try shutdown(FSocket, SD_BOTH); finally closeSocket(FSocket); FSocket := INVALID_SOCKET; end; finally FConnected := False; DoDisconnected; end; end; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.SendBuffer(var Buf; Size: Integer); var SendBytes, ErrorCode: Integer; begin ErrorCode := 0; try EnterCriticalSection(FEncryptCriticalSection); DoBeforeSendData(Buf, Size); SendBytes:= send(FSocket, Buf, Size, 0); if SendBytes = 0 then begin Info('SendBytes = 0:' + SysErrorMessage(ErrorCode),'TBlockingTCPClient.SendBuffer'); Disconnect; Exit; end; if SendBytes = SOCKET_ERROR then begin ErrorCode := GetLastError; if (ErrorCode = WSAECONNABORTED) or (ErrorCode = WSAECONNRESET) or (ErrorCode = WSAETIMEDOUT) or (ErrorCode = WSAENOTSOCK) then begin Info('发送数据时报错:' + SysErrorMessage(ErrorCode),'TBlockingTCPClient.SendBuffer'); Disconnect; end; end else begin DoSendedData(SendBytes); end; finally LeaveCriticalSection(FEncryptCriticalSection); end; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.DoConnected; begin if Assigned(FOnConnected) then FOnConnected(Self); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.DoBeforeSendData(var Buf; Size: Integer); begin if Assigned(FOnBeforeSendData) then FOnBeforeSendData(Self, Buf, Size); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.DoSendedData(SendBytes: Integer); begin if Assigned(FOnSendedData) then FOnSendedData(Self, SendBytes); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.DoReceivedData(RecvThread: TBlockingTCPClientRecvThread; RecvBytes: Integer); begin if Assigned(FOnReceivedData) then FOnReceivedData(Self, RecvThread, RecvBytes); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.DoDisconnected; begin if Assigned(FOnDisconnected) then FOnDisconnected(Self); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.SetRemoteAddress(Value: String); begin FRemoteAddress := Value; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.SetRemotePort(Value: Integer); begin if (Value<0) or (Value>65535) then raise TSocketException.Create('端口号必须为0-65535之间的数值'); FRemotePort := Value; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.SetProxy(Value: TProxy); begin if Assigned(Value) then FProxy.Assign(Value); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClient.SetRecvBufferSize(Value: Integer); begin if (Value<1) or (Value>65535) then raise TSocketException.Create('缓冲区大小必须为1-65535之间的数值'); if Connected then raise TSocketException.Create('连接已建立时不允许更改缓冲大小'); FRecvBufferSize := Value; end; {TBlockingTCPClientRecvThread} //------------------------------------------------------------------------------ procedure TBlockingTCPClientRecvThread.Execute; var ErrorCode: Integer; begin ErrorCode := 0; FreeOnTerminate := True; while (not Terminated) do begin FRecvBytes := recv(FBlockingTCPClient.FSocket, FBuf[FNotProcessedBufferLength], Length(FBuf)-FNotProcessedBufferLength, 0); if FRecvBytes = SOCKET_ERROR then begin ErrorCode := GetLastError; if (ErrorCode = WSAECONNABORTED) or (ErrorCode = WSAECONNRESET) or (ErrorCode = WSAETIMEDOUT) or (ErrorCode = WSAENOTSOCK) then begin try Error('接收数据时报错:' + SysErrorMessage(ErrorCode) + ' ' +FBlockingTCPClient.FRemoteAddress + ':' + IntToStr(FBlockingTCPClient.FRemotePort),'TBlockingTCPClientRecvThread.Execute'); Synchronize(DoDisconnect); except end; Exit; end else begin Sleep(1); Continue; end; end; if FRecvBytes = 0 then begin try Error('RecvBytes = 0' + SysErrorMessage(ErrorCode),'recv'); Synchronize(DoDisconnect); except end; Exit; end else begin FNotProcessedBufferLength := FNotProcessedBufferLength + FRecvBytes; try if FBlockingTCPClient.FCallSynchronize then Synchronize(DoReceivedData) else DoReceivedData; except end; end; end; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClientRecvThread.DoReceivedData(); begin if Assigned(FBlockingTCPClient) then FBlockingTCPClient.DoReceivedData(Self, FRecvBytes); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClientRecvThread.DoDisconnect; begin // Info('断开:' + SysErrorMessage(GetLastError),'DoDisconnect'); if Assigned(FBlockingTCPClient) then FBlockingTCPClient.Disconnect; end; //------------------------------------------------------------------------------ procedure TBlockingTCPClientRecvThread.CopyRecvBufferTo(var Buf; Offset: Integer; Size: Integer); begin CopyMemory(@Buf,@FBuf[Offset],Size); end; //------------------------------------------------------------------------------ procedure TBlockingTCPClientRecvThread.CutRecvBufferTo(var Buf; Offset: Integer; Size: Integer); begin CopyMemory(@Buf, @FBuf[Offset], Size); CopyMemory(FBuf, @FBuf[Offset+Size], Length(FBuf)-(Offset+Size)); FNotProcessedBufferLength := FNotProcessedBufferLength - (Offset+Size); end; //------------------------------------------------------------------------------ constructor TBlockingTCPClientRecvThread.Create(ABlockingTCPClient: TBlockingTCPClient); begin inherited Create(True); FBlockingTCPClient := ABlockingTCPClient; SetLength(FBuf,FBlockingTCPClient.RecvBufferSize); FNotProcessedBufferLength := 0; Resume; end; //------------------------------------------------------------------------------ destructor TBlockingTCPClientRecvThread.Destroy; begin inherited Destroy; end; end.