IdIOHandlerChain.pas 25 KB


  1. { $HDR$}
  2. {**********************************************************************}
  3. { Unit archived using Team Coherence }
  4. { Team Coherence is Copyright 2002 by Quality Software Components }
  5. { }
  6. { For further information / comments, visit our WEB site at }
  7. { http://www.TeamCoherence.com }
  8. {**********************************************************************}
  9. {}
  10. { $Log: 56074: IdIOHandlerChain.pas
  11. {
  12. { Rev 1.6 9/16/2004 8:11:40 PM JPMugaas
  13. { Should compile again.
  14. }
  15. {
  16. Rev 1.5 6/11/2004 8:39:58 AM DSiders
  17. Added "Do not Localize" comments.
  18. }
  19. {
  20. { Rev 1.4 2004.05.06 1:47:26 PM czhower
  21. { Now uses IndexOf
  22. }
  23. {
  24. { Rev 1.3 2004.04.13 10:37:56 PM czhower
  25. { Updates
  26. }
  27. {
  28. { Rev 1.2 2004.03.07 11:46:08 AM czhower
  29. { Flushbuffer fix + other minor ones found
  30. }
  31. {
  32. { Rev 1.1 2004.02.09 9:16:44 PM czhower
  33. { Updated to compile and match lib changes.
  34. }
  35. {
  36. { Rev 1.0 2004.02.03 12:38:56 AM czhower
  37. { Move
  38. }
  39. {
  40. { Rev 1.6 2003.10.24 10:37:38 AM czhower
  41. { IdStream
  42. }
  43. {
  44. { Rev 1.5 2003.10.19 4:38:32 PM czhower
  45. { Updates
  46. }
  47. {
  48. { Rev 1.4 2003.10.19 2:50:40 PM czhower
  49. { Fiber cleanup
  50. }
  51. {
  52. { Rev 1.3 2003.10.14 11:17:02 PM czhower
  53. { Updates to match core changes.
  54. }
  55. {
  56. { Rev 1.2 2003.10.11 5:43:30 PM czhower
  57. { Chained servers now functional.
  58. }
  59. {
  60. { Rev 1.1 2003.09.19 10:09:40 PM czhower
  61. { Next stage of fiber support in servers.
  62. }
  63. {
  64. { Rev 1.0 8/16/2003 11:09:08 AM JPMugaas
  65. { Moved from Indy Core dir as part of package reorg
  66. }
  67. {
  68. { Rev 1.49 2003.07.17 4:42:06 PM czhower
  69. { More IOCP improvements.
  70. }
  71. {
  72. { Rev 1.45 2003.07.14 11:46:46 PM czhower
  73. { IOCP now passes all bubbles.
  74. }
  75. {
  76. { Rev 1.43 2003.07.14 1:10:52 AM czhower
  77. { Now passes all bubble tests for chained stack.
  78. }
  79. {
  80. Rev 1.41 7/7/2003 1:34:06 PM BGooijen
  81. Added WriteFile(...)
  82. }
  83. {
  84. Rev 1.40 7/3/2003 2:03:52 PM BGooijen
  85. IOCP works server-side now
  86. }
  87. {
  88. { Rev 1.39 2003.06.30 5:41:54 PM czhower
  89. { -Fixed AV that occurred sometimes when sockets were closed with chains
  90. { -Consolidated code that was marked by a todo for merging as it no longer
  91. { needed to be separate
  92. { -Removed some older code that was no longer necessary
  93. {
  94. { Passes bubble tests.
  95. }
  96. {
  97. Rev 1.38 6/29/2003 10:56:26 PM BGooijen
  98. Removed .Memory from the buffer, and added some extra methods
  99. }
  100. {
  101. { Rev 1.37 2003.06.25 4:30:02 PM czhower
  102. { Temp hack fix for AV problem. Working on real solution now.
  103. }
  104. {
  105. Rev 1.36 6/24/2003 11:17:44 PM BGooijen
  106. change in TIdIOHandlerChain.ReadLn, LTermPos= 0 is now handled differently
  107. }
  108. {
  109. { Rev 1.35 23/6/2003 22:33:18 GGrieve
  110. { fix CheckForDataOnSource - specify timeout
  111. }
  112. {
  113. { Rev 1.34 6/22/2003 11:22:22 PM JPMugaas
  114. { Should now compile.
  115. }
  116. {
  117. Rev 1.33 6/4/2003 1:08:40 AM BGooijen
  118. Added CheckForDataOnSource and removed some (duplicate) code
  119. }
  120. {
  121. Rev 1.32 6/3/2003 8:07:20 PM BGooijen
  122. Added TIdIOHandlerChain.AllData
  123. }
  124. {
  125. Rev 1.31 5/11/2003 2:37:58 PM BGooijen
  126. Bindings are updated now
  127. }
  128. {
  129. Rev 1.30 5/11/2003 12:00:08 PM BGooijen
  130. }
  131. {
  132. Rev 1.29 5/11/2003 12:03:16 AM BGooijen
  133. }
  134. {
  135. { Rev 1.28 2003.05.09 10:59:24 PM czhower
  136. }
  137. {
  138. { Rev 1.27 2003.04.22 9:48:50 PM czhower
  139. }
  140. {
  141. { Rev 1.25 2003.04.17 11:01:14 PM czhower
  142. }
  143. {
  144. { Rev 1.19 2003.04.10 10:51:04 PM czhower
  145. }
  146. {
  147. Rev 1.18 4/2/2003 3:39:26 PM BGooijen
  148. Added Intercepts
  149. }
  150. {
  151. Rev 1.17 3/29/2003 5:53:52 PM BGooijen
  152. added AfterAccept
  153. }
  154. {
  155. Rev 1.16 3/27/2003 2:57:58 PM BGooijen
  156. Added a RawWrite for streams, implemented WriteStream, changed
  157. WriteToDestination to use TIdWorkOpUnitWriteBuffer
  158. }
  159. {
  160. { Rev 1.15 2003.03.26 12:20:28 AM czhower
  161. { Moved visibility of execute to protected.
  162. }
  163. {
  164. Rev 1.14 3/25/2003 11:07:58 PM BGooijen
  165. ChainEngine descends now from TIdBaseComponent
  166. }
  167. {
  168. { Rev 1.13 3/25/2003 01:33:48 AM JPMugaas
  169. { Fixed compiler warnings.
  170. }
  171. {
  172. Rev 1.12 3/24/2003 11:03:50 PM BGooijen
  173. Various fixes to readln:
  174. - uses connection default now
  175. - doesn't raise an exception on timeout any more
  176. }
  177. {
  178. { Rev 1.11 2003.03.13 1:22:58 PM czhower
  179. { Typo fixed. lenth --> Length
  180. }
  181. {
  182. Rev 1.10 3/13/2003 10:18:20 AM BGooijen
  183. Server side fibers, bug fixes
  184. }
  185. {
  186. Rev 1.9 3/2/2003 12:36:22 AM BGooijen
  187. Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
  188. ReadBuffer doesn't use ReadStream any more.
  189. TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
  190. exceptions).
  191. woReadLn doesn't check the intire buffer any more, but continued where it
  192. stopped the last time.
  193. Added basic support for timeouts (probably only on read operations, and maybe
  194. connect), accuratie of timeout is currently 500msec.
  195. }
  196. {
  197. Rev 1.8 2/28/2003 10:15:16 PM BGooijen
  198. bugfix: changed some occurrences of FRecvBuffer to FInputBuffer
  199. }
  200. {
  201. Rev 1.7 2/27/2003 10:11:12 PM BGooijen
  202. }
  203. {
  204. Rev 1.6 2/26/2003 1:08:52 PM BGooijen
  205. }
  206. {
  207. Rev 1.5 2/25/2003 10:36:28 PM BGooijen
  208. Added more opcodes, methods, and moved opcodes to separate files.
  209. }
  210. {
  211. { Rev 1.4 2003.02.25 9:02:32 PM czhower
  212. { Hand off to Bas
  213. }
  214. {
  215. { Rev 1.3 2003.02.25 1:36:04 AM czhower
  216. }
  217. {
  218. { Rev 1.2 2002.12.11 11:00:58 AM czhower
  219. }
  220. {
  221. { Rev 1.1 2002.12.07 12:26:06 AM czhower
  222. }
  223. {
  224. { Rev 1.0 11/13/2002 08:45:00 AM JPMugaas
  225. }
  226. unit IdIOHandlerChain;
  227. interface
  228. uses
  229. Classes
  230. , IdBaseComponent, IdBuffer, IdGlobal, IdIOHandler, IdIOHandlerSocket
  231. , IdFiber, IdThreadSafe, IdWorkOpUnit, IdStackConsts, IdWinsock2, IdThread
  232. , IdFiberWeaver, IdStream, IdStreamVCL
  233. , Windows;
  234. type
  235. TIdConnectMode = (cmNonBlock, cmIOCP);
  236. TIdIOHandlerChain = class;
  237. TIdChainEngineThread = class;
  238. TIdChainEngine = class(TIdBaseComponent)
  239. protected
  240. FCompletionPort: THandle;
  241. FThread: TIdChainEngineThread;
  242. //
  243. procedure Execute;
  244. function GetInputBuffer(const AIOHandler: TIdIOHandler): TIdBuffer;
  245. procedure InitComponent; override;
  246. procedure SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
  247. procedure Terminating;
  248. public
  249. procedure AddWork(AWorkOpUnit: TIdWorkOpUnit);
  250. procedure BeforeDestruction; override;
  251. destructor Destroy; override;
  252. procedure RemoveSocket(AIOHandler: TIdIOHandlerChain);
  253. procedure SocketAccepted(AIOHandler: TIdIOHandlerChain);
  254. end;
  255. TIdIOHandlerChain = class(TIdIOHandlerSocket)
  256. protected
  257. FChainEngine: TIdChainEngine;
  258. FConnectMode: TIdConnectMode;
  259. FFiber: TIdFiber;
  260. FFiberWeaver: TIdFiberWeaver;
  261. FOverlapped: PIdOverlapped;
  262. //
  263. procedure ConnectClient; override;
  264. procedure QueueAndWait(
  265. AWorkOpUnit: TIdWorkOpUnit;
  266. ATimeout: Integer = IdTimeoutDefault;
  267. AFreeWorkOpUnit: Boolean = True;
  268. AAllowGracefulException: Boolean = True
  269. );
  270. procedure WorkOpUnitCompleted(
  271. AWorkOpUnit: TIdWorkOpUnit
  272. );
  273. public
  274. procedure AfterAccept; override;
  275. function AllData: string; override;
  276. procedure CheckForDataOnSource(
  277. ATimeout : Integer = 0
  278. ); override;
  279. procedure CheckForDisconnect(
  280. ARaiseExceptionIfDisconnected: Boolean = True;
  281. AIgnoreBuffer: Boolean = False
  282. ); override;
  283. constructor Create(
  284. AOwner: TComponent;
  285. AChainEngine: TIdChainEngine;
  286. AFiberWeaver: TIdFiberWeaver;
  287. AFiber: TIdFiber
  288. ); reintroduce; virtual;
  289. destructor Destroy; override;
  290. procedure Open; override;
  291. function ReadFromSource(ARaiseExceptionIfDisconnected: Boolean = True;
  292. ATimeout: Integer = IdTimeoutDefault;
  293. ARaiseExceptionOnTimeout: Boolean = True): Integer; override;
  294. procedure ReadStream(AStream: TIdStreamVCL; AByteCount: Integer;
  295. AReadUntilDisconnect: Boolean); override;
  296. // TODO: Allow ReadBuffer to by pass the internal buffer. Will it really
  297. // help? Only ReadBuffer would be able to use this optimiztion in most
  298. // cases and it is not used by many. Most calls are to stream (disk) based
  299. // or strings as ReadLn.
  300. procedure ReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; AAppend: Boolean = True);
  301. override;
  302. function ReadLn(
  303. ATerminator: string = LF;
  304. ATimeout: Integer = IdTimeoutDefault;
  305. AMaxLineLength: Integer = -1
  306. ): string;
  307. override;
  308. // function WriteFile(
  309. // AFile: string;
  310. // AEnableTransferFile: Boolean
  311. // ): Cardinal; override;
  312. function WriteFile(
  313. const AFile: String;
  314. AEnableTransferFile: Boolean): Cardinal; override;
  315. { procedure Write(
  316. AStream: TIdStream;
  317. ASize: Integer = 0;
  318. AWriteByteCount: Boolean = False);
  319. override; }
  320. procedure Write(
  321. AStream: TIdStreamVCL;
  322. ASize: Integer = 0;
  323. AWriteByteCount: Boolean = False
  324. ); override;
  325. procedure WriteDirect(
  326. ABuffer: TIdBytes
  327. ); override;
  328. //
  329. property ConnectMode: TIdConnectMode read FConnectMode write FConnectMode;
  330. property Overlapped: PIdOverlapped read FOverlapped;
  331. end;
  332. TIdChainEngineThread = class(TIdThread)
  333. protected
  334. FChainEngine: TIdChainEngine;
  335. public
  336. constructor Create(
  337. AOwner: TIdChainEngine;
  338. const AName: string
  339. ); reintroduce;
  340. procedure Run; override;
  341. property Terminated;
  342. end;
  343. implementation
  344. uses
  345. IdComponent, IdException, IdExceptionCore, IdStack, IdResourceStrings, IdWorkOpUnits,
  346. IdStackWindows,
  347. SysUtils;
  348. const
  349. GCompletionKeyTerminate = $F0F0F0F0;
  350. { TIdIOHandlerChain }
  351. procedure TIdIOHandlerChain.CheckForDataOnSource(ATimeout: Integer = 0);
  352. begin
  353. // TODO: Change this so we dont have to rely on an exception trap
  354. try
  355. QueueAndWait(TIdWorkOpUnitReadAvailable.Create, ATimeout, True, False);
  356. except
  357. on E: EIdReadTimeout do begin
  358. // Nothing
  359. end else begin
  360. raise;
  361. end;
  362. end;
  363. end;
  364. procedure TIdIOHandlerChain.ConnectClient;
  365. begin
  366. // TODO: Non blocking does not support Socks
  367. Binding.OverLapped := (ConnectMode = cmIOCP);
  368. inherited;
  369. case ConnectMode of
  370. cmNonBlock: begin
  371. //TODO: Non blocking DNS resolution too?
  372. Binding.SetPeer(GWindowsStack.ResolveHost(Host), Port);
  373. GWindowsStack.SetBlocking(Binding.Handle, False);
  374. // Does not block
  375. Binding.Connect;
  376. end;
  377. cmIOCP: begin
  378. //TODO: For now we are doing blocking, just to get it to work. fix later
  379. // IOCP was not designed for connects, so we'll have to do some monkeying
  380. // maybe even create an engine thread just to watch for connect events.
  381. //TODO: Resolution too?
  382. Binding.SetPeer(GStack.ResolveHost(Host), Port);
  383. Binding.Connect;
  384. GWindowsStack.SetBlocking(Binding.Handle, False);
  385. end;
  386. else begin
  387. EIdException.Toss('Unrecognized ConnectMode'); {do not localize}
  388. end;
  389. end;
  390. QueueAndWait(TIdWorkOpUnitWaitConnected.Create);
  391. //Update the bindings
  392. Binding.UpdateBindingLocal;
  393. //TODO: Could Peer binding ever be other than what we specified above? Need to reread it?
  394. Binding.UpdateBindingPeer;
  395. end;
  396. procedure TIdIOHandlerChain.AfterAccept;
  397. begin
  398. FChainEngine.SocketAccepted(self);
  399. end;
  400. procedure TIdIOHandlerChain.Open;
  401. begin
  402. // Things before inherited, inherited actually connects and ConnectClient
  403. // needs these things
  404. inherited;
  405. end;
  406. procedure TIdIOHandlerChain.CheckForDisconnect(
  407. ARaiseExceptionIfDisconnected: Boolean; AIgnoreBuffer: Boolean);
  408. var
  409. LDisconnected: Boolean;
  410. begin
  411. // ClosedGracefully // Server disconnected
  412. // IOHandler = nil // Client disconnected
  413. if ClosedGracefully then begin
  414. if BindingAllocated then begin
  415. Close;
  416. // Call event handlers to inform the user program that we were disconnected
  417. // DoStatus(hsDisconnected);
  418. //DoOnDisconnected;
  419. end;
  420. LDisconnected := True;
  421. end else begin
  422. LDisconnected := not BindingAllocated;
  423. end;
  424. if LDisconnected then begin
  425. // Do not raise unless all data has been read by the user
  426. if Assigned(FInputBuffer) then begin
  427. if ((FInputBuffer.Size = 0) or AIgnoreBuffer)
  428. and ARaiseExceptionIfDisconnected then begin
  429. RaiseConnClosedGracefully;
  430. end;
  431. end;
  432. end;
  433. end;
  434. function TIdIOHandlerChain.ReadFromSource(
  435. ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer;
  436. ARaiseExceptionOnTimeout: Boolean): Integer;
  437. begin
  438. Result := 0;
  439. EIdException.Toss('Fall through error in ' + ClassName); {do not localize}
  440. end;
  441. procedure TIdIOHandlerChain.ReadStream(AStream: TIdStreamVCL; AByteCount: Integer;
  442. AReadUntilDisconnect: Boolean);
  443. begin
  444. if AReadUntilDisconnect then begin
  445. QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream.VCLStream), -1
  446. , True, False);
  447. end else begin
  448. QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream.VCLStream, AByteCount));
  449. end;
  450. end;
  451. procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
  452. AByteCount: Integer; AAppend: Boolean = True);
  453. begin
  454. EIdException.IfFalse(AByteCount >= 0);
  455. if AByteCount > 0 then begin
  456. if FInputBuffer.Size < AByteCount then begin
  457. QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
  458. end;
  459. Assert(FInputBuffer.Size >= AByteCount);
  460. FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
  461. end;
  462. end;
  463. function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
  464. ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
  465. var
  466. LTermPos: Integer;
  467. begin
  468. if AMaxLineLength = -1 then begin
  469. AMaxLineLength := MaxLineLength;
  470. end;
  471. // User may pass '' if they need to pass arguments beyond the first.
  472. if ATerminator = '' then begin
  473. ATerminator := LF;
  474. end;
  475. FReadLnSplit := False;
  476. FReadLnTimedOut := False;
  477. try
  478. LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
  479. if (LTermPos = 0) and ((AMaxLineLength = 0)
  480. or (FInputBuffer.Size < AMaxLineLength)) then begin
  481. QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
  482. , ATimeout);
  483. LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
  484. end;
  485. // LTermPos cannot be 0, and the code below can't handle it properly
  486. Assert(LTermPos > 0);
  487. if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
  488. case FMaxLineAction of
  489. // TODO: find the right exception class here
  490. maException: EIdException.Toss('MaxLineLength exceded'); {do not localize}
  491. maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
  492. end;
  493. end else begin
  494. Result := FInputBuffer.Extract(LTermPos - 1);
  495. if (ATerminator = LF) and (Copy(Result, Length(Result), 1) = CR) then begin
  496. Delete(Result, Length(Result), 1);
  497. end;
  498. FInputBuffer.Extract(Length(ATerminator));// remove the terminator
  499. end;
  500. except on E: EIdReadTimeout do
  501. FReadLnTimedOut := True;
  502. end;
  503. end;
  504. function TIdIOHandlerChain.AllData: string;
  505. var
  506. LStream: TStringStream;
  507. begin
  508. BeginWork(wmRead); try
  509. Result := '';
  510. LStream := TStringStream.Create(''); try
  511. QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1
  512. , True, False);
  513. Result := LStream.DataString;
  514. finally FreeAndNil(LStream); end;
  515. finally EndWork(wmRead); end;
  516. end;
  517. function TIdIOHandlerChain.WriteFile(
  518. const AFile: String;
  519. AEnableTransferFile: Boolean): Cardinal;
  520. var
  521. LWO:TIdWorkOpUnitWriteFile;
  522. begin
  523. //BGO: we ignore AEnableTransferFile for now
  524. Result := 0;
  525. // if not Assigned(Intercept) then begin
  526. LWO := TIdWorkOpUnitWriteFile.Create(AFile);
  527. try
  528. QueueAndWait(LWO,IdTimeoutDefault, false);
  529. finally
  530. // Result := LWO.BytesSent;
  531. FreeAndNil(LWO);
  532. end;
  533. // end else begin
  534. // inherited WriteFile(AFile, AEnableTransferFile);
  535. // end;
  536. end;
  537. procedure TIdIOHandlerChain.Write(
  538. AStream: TIdStreamVCL;
  539. ASize: Integer = 0;
  540. AWriteByteCount: Boolean = False
  541. );
  542. var
  543. LStart: Integer;
  544. LThisSize: Integer;
  545. begin
  546. if ASize < 0 then begin //"-1" All form current position
  547. LStart := AStream.VCLStream.Seek(0, soFromCurrent);
  548. ASize := AStream.VCLStream.Seek(0, soFromEnd) - LStart;
  549. AStream.VCLStream.Seek(LStart, soFromBeginning);
  550. end else if ASize = 0 then begin //"0" ALL
  551. LStart := 0;
  552. ASize := AStream.VCLStream.Seek(0, soFromEnd);
  553. AStream.VCLStream.Seek(0, soFromBeginning);
  554. end else begin //else ">0" ACount bytes
  555. LStart := AStream.VCLStream.Seek(0, soFromCurrent);
  556. end;
  557. if AWriteByteCount then begin
  558. Write(ASize);
  559. end;
  560. // BeginWork(wmWrite, ASize);
  561. try
  562. while ASize > 0 do begin
  563. LThisSize := Min(128 * 1024, ASize); // 128K blocks
  564. QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream.VCLStream, LStart, LThisSize
  565. , False));
  566. Dec(ASize, LThisSize);
  567. Inc(LStart, LThisSize);
  568. end;
  569. finally
  570. // EndWork(wmWrite);
  571. end;
  572. end;
  573. procedure TIdIOHandlerChain.WriteDirect(
  574. ABuffer: TIdBytes
  575. );
  576. begin
  577. QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(@ABuffer[0], Length(ABuffer), False));
  578. end;
  579. procedure TIdIOHandlerChain.QueueAndWait(
  580. AWorkOpUnit: TIdWorkOpUnit;
  581. ATimeout: Integer = IdTimeoutDefault;
  582. AFreeWorkOpUnit: Boolean = True;
  583. AAllowGracefulException: Boolean = True
  584. );
  585. var
  586. LWorkOpUnit: TIdWorkOpUnit;
  587. begin
  588. try
  589. CheckForDisconnect(AAllowGracefulException);
  590. LWorkOpUnit := AWorkOpUnit;
  591. //
  592. if ATimeout = IdTimeoutInfinite then begin
  593. LWorkOpUnit.TimeOutAt := 0;
  594. end else begin
  595. if ATimeout = IdTimeoutDefault then begin
  596. if FReadTimeout <= 0 then begin
  597. LWorkOpUnit.TimeOutAt := 0;
  598. end else begin
  599. //we type cast FReadTimeOut as a cardinal to prevent the compiler from
  600. //expanding vars to an Int64 type. That can incur a performance penalty.
  601. LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
  602. end
  603. end else begin
  604. //FReadTimeOut is typecase as a cardinal to prevent the compiler from
  605. //expanding vars to an Int64 type which can incur a performance penalty.
  606. LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
  607. end
  608. end;
  609. //
  610. LWorkOpUnit.Fiber := FFiber;
  611. LWorkOpUnit.IOHandler := Self;
  612. LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
  613. LWorkOpUnit.SocketHandle := Binding.Handle;
  614. // Add to queue and wait to be rescheduled when work is completed
  615. FChainEngine.AddWork(LWorkOpUnit);
  616. // Check to see if we need to reraise an exception
  617. LWorkOpUnit.RaiseException;
  618. // Check for timeout
  619. if LWorkOpUnit.TimedOut then begin
  620. EIdReadTimeout.Toss('Timed out'); {do not localize}
  621. end;
  622. // Check to see if it was closed during this operation
  623. CheckForDisconnect(AAllowGracefulException);
  624. finally
  625. if AFreeWorkOpUnit then begin
  626. AWorkOpUnit.Free;
  627. end;
  628. end;
  629. end;
  630. constructor TIdIOHandlerChain.Create(
  631. AOwner: TComponent;
  632. AChainEngine: TIdChainEngine;
  633. AFiberWeaver: TIdFiberWeaver;
  634. AFiber: TIdFiber
  635. );
  636. begin
  637. inherited Create(AOwner);
  638. //
  639. EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
  640. FChainEngine := AChainEngine;
  641. FChainEngine.SetIOHandlerOptions(Self);
  642. //
  643. EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
  644. FFiberWeaver := AFiberWeaver;
  645. //
  646. EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
  647. FFiber := AFiber;
  648. // Initialize Overlapped structure
  649. New(FOverlapped);
  650. ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
  651. New(FOverlapped.Buffer);
  652. end;
  653. procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
  654. begin
  655. FFiberWeaver.Add(AWorkOpUnit.Fiber);
  656. end;
  657. destructor TIdIOHandlerChain.Destroy;
  658. begin
  659. // Tell the chain engine that we are closing and to remove any references to
  660. // us and cease any usage.
  661. // Do not do this in close, it can cause deadlocks because the engine can
  662. // call close while in its Execute.
  663. FChainEngine.RemoveSocket(Self);
  664. Dispose(FOverlapped.Buffer);
  665. Dispose(FOverlapped);
  666. inherited;
  667. end;
  668. { TIdChainEngine }
  669. procedure TIdChainEngine.BeforeDestruction;
  670. begin
  671. if FThread <> nil then begin
  672. // Signal thread for termination
  673. FThread.Terminate;
  674. // Tell the engine we are attempting termination
  675. Terminating;
  676. // Wait for the thread to terminate
  677. FThread.WaitFor;
  678. // Free thread
  679. FreeAndNil(FThread);
  680. end;
  681. inherited;
  682. end;
  683. function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
  684. begin
  685. Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
  686. end;
  687. procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
  688. begin
  689. AIOHandler.ConnectMode := cmIOCP;
  690. end;
  691. procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
  692. begin
  693. // Associate the socket with the completion port.
  694. if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
  695. = 0 then begin
  696. RaiseLastOSError;
  697. end;
  698. end;
  699. procedure TIdChainEngine.Terminating;
  700. begin
  701. if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
  702. , nil) then begin
  703. RaiseLastOSError;
  704. end;
  705. end;
  706. procedure TIdChainEngine.Execute;
  707. var
  708. LBytesTransferred: DWord;
  709. LCompletionKey: DWord;
  710. LOverlapped: PIdOverlapped;
  711. begin
  712. // Wait forever on the completion port. If we are terminating, a terminate
  713. // signal is sent into the queue.
  714. if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
  715. , LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
  716. if LCompletionKey <> GCompletionKeyTerminate then begin
  717. // Socket has been closed
  718. if LBytesTransferred = 0 then begin
  719. LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
  720. end;
  721. LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
  722. end;
  723. end;
  724. end;
  725. procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
  726. begin
  727. // raise EIdException.Create('Fall through error in ' + Self.ClassName+'.RemoveSocket');
  728. end;
  729. procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
  730. begin
  731. if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
  732. // Associate the socket with the completion port.
  733. if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
  734. = 0 then begin
  735. RaiseLastOSError;
  736. end;
  737. AWorkOpUnit.Complete;
  738. end;
  739. AWorkOpUnit.Start;
  740. end;
  741. destructor TIdChainEngine.Destroy;
  742. begin
  743. if CloseHandle(FCompletionPort) = False then begin
  744. RaiseLastOSError;
  745. end;
  746. inherited;
  747. end;
  748. procedure TIdChainEngine.InitComponent;
  749. begin
  750. {
  751. var SysInfo: TSystemInfo;
  752. GetSystemInfo(SysInfo);
  753. SysInfo.dwNumberOfProcessors
  754. Use GetSystemInfo instead. It will return the all info on the local
  755. system's architecture and will also return a valid ActiveProcessorMask
  756. which is a DWORD to be read as a bit array of the processor on the
  757. system...
  758. CZH> And next
  759. CZH> question - any one know off hand how to set affinity? :)
  760. Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
  761. on wether you want to act on the whole process or just a single
  762. thread (SetThreadIdealProcessor is another way to do it: it just gives
  763. the scheduler a hint about where to run a thread without forcing it:
  764. good for keeping two threads doing IO one with each other on the same
  765. processor).
  766. }
  767. inherited;
  768. if not (csDesigning in ComponentState) then begin
  769. // Cant use .Name, its not initialized yet in Create
  770. FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
  771. end;
  772. //MS says destruction is automatic, but Google seems to say that this initial
  773. //one is not auto managed as MS says, and that CloseHandle should be called.
  774. FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
  775. if FCompletionPort = 0 then begin
  776. RaiseLastOSError;
  777. end;
  778. end;
  779. { TIdChainEngineThread }
  780. constructor TIdChainEngineThread.Create(
  781. AOwner: TIdChainEngine;
  782. const AName: string
  783. );
  784. begin
  785. FChainEngine := AOwner;
  786. inherited Create(False, True, AName);
  787. end;
  788. (*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
  789. var
  790. LPOverlapped: PIdOverlapped;
  791. LHFile:THandle;
  792. begin
  793. New(LPOverlapped);
  794. ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
  795. New(LPOverlapped^.Buffer);
  796. LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
  797. LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
  798. LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
  799. if LHFile=INVALID_HANDLE_VALUE then begin
  800. RaiseLastOSError;
  801. end;
  802. try
  803. if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
  804. AWorkOpUnit.Fiber.Relinquish;
  805. end else begin
  806. raise EIdException.Create('error in ServiceQueryTransmitFile');
  807. end;
  808. finally
  809. CloseHandle(LHFile);
  810. end;
  811. end;
  812. *)
  813. (*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
  814. procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
  815. begin
  816. ADst.IOHandler := ASrc.IOHandler;
  817. ADst.Fiber := ASrc.Fiber;
  818. ADst.OnCompleted := ASrc.OnCompleted;
  819. ADst.SocketHandle:= ASrc.SocketHandle;
  820. end;
  821. var
  822. LStream:TfileStream;
  823. LWorkOpUnit : TIdWorkOpUnitWriteStream;
  824. LBuf:pointer;
  825. LBufLen:integer;
  826. begin
  827. Assert(False, 'to do');
  828. LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
  829. try
  830. LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
  831. try
  832. CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
  833. LBufLen:=Min(LStream.size,128*1024);
  834. getmem(LBuf,LBufLen);
  835. LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
  836. LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
  837. IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
  838. finally
  839. AWorkOpUnit.BytesSent := LStream.Size;
  840. LWorkOpUnit.free;
  841. end;
  842. finally
  843. LStream.free;
  844. end;
  845. end;
  846. *)
  847. procedure TIdChainEngineThread.Run;
  848. begin
  849. FChainEngine.Execute;
  850. end;
  851. end.