IdWorkOpUnits.pas 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. { $HDR$}
  2. {**********************************************************************}
  3. { Unit archived using Team Coherence }
  4. { Team Coherence is Copyright 2002 by Quality Software Components }
  5. { }
  6. { For further information / comments, visit our WEB site at }
  7. { http://www.TeamCoherence.com }
  8. {**********************************************************************}
  9. {}
  10. { $Log: 56088: IdWorkOpUnits.pas
  11. {
  12. Rev 1.4 6/11/2004 8:40:12 AM DSiders
  13. Added "Do not Localize" comments.
  14. }
  15. {
  16. { Rev 1.3 2004.05.06 1:47:28 PM czhower
  17. { Now uses IndexOf
  18. }
  19. {
  20. { Rev 1.2 2004.04.22 11:45:18 PM czhower
  21. { Bug fixes
  22. }
  23. {
  24. { Rev 1.1 2004.02.09 9:16:58 PM czhower
  25. { Updated to compile and match lib changes.
  26. }
  27. {
  28. { Rev 1.0 2004.02.03 12:39:10 AM czhower
  29. { Move
  30. }
  31. {
  32. { Rev 1.14 2003.10.19 2:50:42 PM czhower
  33. { Fiber cleanup
  34. }
  35. {
  36. { Rev 1.13 2003.10.11 5:44:20 PM czhower
  37. { Chained servers now functional.
  38. }
  39. {
  40. { Rev 1.12 2003.07.17 4:42:08 PM czhower
  41. { More IOCP improvements.
  42. }
  43. {
  44. { Rev 1.11 2003.07.17 3:55:18 PM czhower
  45. { Removed IdIOChainEngineIOCP and merged it into TIdChaingEngine in
  46. { IdIOHandlerChain.pas.
  47. }
  48. {
  49. { Rev 1.7 2003.07.14 11:00:52 PM czhower
  50. { More IOCP fixes.
  51. }
  52. {
  53. { Rev 1.6 2003.07.14 12:54:34 AM czhower
  54. { Fixed graceful close detection if it occurs after connect.
  55. }
  56. {
  57. Rev 1.5 7/7/2003 1:25:26 PM BGooijen
  58. Added BytesSent property to TIdWorkOpUnitWriteFile
  59. }
  60. {
  61. Rev 1.4 7/5/2003 11:47:14 PM BGooijen
  62. Added TIdWorkOpUnitCheckForDisconnect and TIdWorkOpUnitWriteFile
  63. }
  64. {
  65. Rev 1.3 3/27/2003 2:43:06 PM BGooijen
  66. Added woWriteStream and woWriteBuffer
  67. }
  68. {
  69. { Rev 1.2 3/22/2003 09:45:30 PM JPMugaas
  70. { Now should compile under D4.
  71. }
  72. {
  73. Rev 1.1 3/2/2003 12:36:26 AM BGooijen
  74. Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
  75. ReadBuffer doesn't use ReadStream any more.
  76. TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
  77. exceptions).
  78. woReadLn doesn't check the intire buffer any more, but continued where it
  79. stopped the last time.
  80. Added basic support for timeouts (probably only on read operations, and maybe
  81. connect), accuratie of timeout is currently 500msec.
  82. }
  83. {
  84. Rev 1.0 2/27/2003 10:11:50 PM BGooijen
  85. WorkOpUnits combined in one file
  86. }
  87. unit IdWorkOpUnits;
  88. interface
  89. uses
  90. Classes,
  91. IdWorkOpUnit, IdGlobal,
  92. SysUtils;
  93. type
  94. TIdWorkOpUnitStreamBaseRead = class(TIdWorkOpUnitRead)
  95. protected
  96. FStream: TStream;
  97. public
  98. constructor Create(AStream: TStream); reintroduce; virtual;
  99. end;
  100. TIdWorkOpUnitStreamBaseWrite = class(TIdWorkOpUnitWrite)
  101. protected
  102. FFreeStream: Boolean;
  103. FStream: TStream;
  104. public
  105. constructor Create(
  106. AStream: TStream;
  107. AFreeStream: Boolean = True
  108. ); reintroduce; virtual;
  109. destructor Destroy; override;
  110. end;
  111. TIdWorkOpUnitWriteBuffer = class(TIdWorkOpUnitWrite)
  112. protected
  113. FBuffer: Pointer;
  114. FFreeBuffer: Boolean;
  115. FSize: Integer;
  116. //
  117. procedure Processing(ABytes: Integer); override;
  118. procedure Starting; override;
  119. public
  120. constructor Create(ABuffer: Pointer; ASize: Integer;
  121. AFreeBuffer: Boolean = True); reintroduce; virtual;
  122. destructor Destroy; override;
  123. end;
  124. TIdWorkOpUnitWriteFile = class(TIdWorkOpUnitWrite)
  125. protected
  126. FFilename: String;
  127. FBytesSent: Integer;
  128. //
  129. procedure Processing(ABytes: Integer); override;
  130. procedure Starting; override;
  131. public
  132. constructor Create(AFileName: string); reintroduce;
  133. end;
  134. TIdWorkOpUnitWriteStream = class(TIdWorkOpUnitStreamBaseWrite)
  135. protected
  136. FCount: Integer;
  137. FStartPos: Integer;
  138. //
  139. procedure Processing(ABytes: Integer); override;
  140. procedure Starting; override;
  141. public
  142. constructor Create(AStream: TStream; AStartPos, ACount: Integer;
  143. AFreeStream: Boolean); reintroduce; virtual;
  144. end;
  145. TIdWorkOpUnitWaitConnected = class(TIdWorkOpUnit)
  146. protected
  147. procedure Starting; override;
  148. public
  149. procedure Process(
  150. AOverlapped: PIdOverlapped;
  151. AByteCount: Integer
  152. ); override;
  153. end;
  154. TIdWorkOpUnitReadSized = class(TIdWorkOpUnitRead)
  155. protected
  156. FSize: Integer;
  157. //
  158. procedure Processing(
  159. ABuffer: TIdBytes
  160. ); override;
  161. public
  162. constructor Create(ASize: Integer); reintroduce;
  163. end;
  164. TIdWorkOpUnitReadSizedStream = class(TIdWorkOpUnitStreamBaseRead)
  165. protected
  166. FSize: Integer;
  167. //
  168. procedure Processing(
  169. ABuffer: TIdBytes
  170. ); override;
  171. public
  172. constructor Create(AStream: TStream; ASize: Integer);
  173. reintroduce;
  174. end;
  175. TIdWorkOpUnitReadLn = class(TIdWorkOpUnitRead)
  176. protected
  177. FLastPos: Integer;
  178. FMaxLength: Integer;
  179. FTerminator: string;
  180. //
  181. procedure Processing(
  182. ABuffer: TIdBytes
  183. ); override;
  184. public
  185. constructor Create(
  186. ATerminator: string;
  187. AMaxLength: Integer
  188. ); reintroduce;
  189. end;
  190. TIdWorkOpUnitReadUntilDisconnect = class(TIdWorkOpUnitStreamBaseRead)
  191. protected
  192. procedure Processing(
  193. ABuffer: TIdBytes
  194. ); override;
  195. end;
  196. TIdWorkOpUnitReadAvailable = class(TIdWorkOpUnitRead)
  197. protected
  198. procedure Processing(
  199. ABuffer: TIdBytes
  200. ); override;
  201. end;
  202. implementation
  203. { TIdWorkOpUnitWriteStream }
  204. constructor TIdWorkOpUnitWriteStream.Create(AStream: TStream; AStartPos,ACount:integer; AFreeStream: Boolean);
  205. begin
  206. inherited Create(AStream, AFreeStream);
  207. FStream.Position := AStartPos;
  208. FCount := ACount;
  209. end;
  210. procedure TIdWorkOpUnitWriteStream.Processing(ABytes: Integer);
  211. //TODO: This used to use pages from IdBuffer, which because of .Net do not exist
  212. // anymore. We need to maybe keep a local persistent buffer instead then for
  213. // storage reasons.
  214. var
  215. LBuffer: TIdBytes;
  216. LSize: Integer;
  217. begin
  218. FCount := FCount - ABytes;
  219. if FCount = 0 then begin
  220. Complete;
  221. end else begin
  222. FStream.Position := ABytes;
  223. //
  224. //TODO: Dont hard code this value. Also find an optimal size for IOCP
  225. LSize := Min(FCount, WOPageSize);
  226. SetLength(LBuffer, LSize);
  227. //
  228. FStream.ReadBuffer(LBuffer[0], LSize);
  229. Write(@LBuffer[0], LSize);
  230. end;
  231. end;
  232. procedure TIdWorkOpUnitWriteStream.Starting;
  233. begin
  234. Processing(0);
  235. end;
  236. { TIdWorkOpUnitWriteBuffer }
  237. constructor TIdWorkOpUnitWriteBuffer.Create(ABuffer: pointer; ASize: integer; AFreeBuffer: Boolean = True);
  238. begin
  239. inherited Create;
  240. FSize := ASize;
  241. FBuffer := ABuffer;
  242. FFreeBuffer := AFreeBuffer;
  243. end;
  244. destructor TIdWorkOpUnitWriteBuffer.Destroy;
  245. begin
  246. if FFreeBuffer then begin
  247. FreeMem(FBuffer);
  248. FBuffer := nil;
  249. end;
  250. inherited;
  251. end;
  252. procedure TIdWorkOpUnitWriteBuffer.Processing(ABytes: Integer);
  253. begin
  254. //TODO: Change the pointer to a type that points to bytes
  255. FBuffer := Pointer(Cardinal(FBuffer) + Cardinal(ABytes));
  256. FSize := FSize - ABytes;
  257. if FSize = 0 then begin
  258. Complete;
  259. end else begin
  260. //TODO: Reduce this down so it never sends more than a page
  261. Write(FBuffer, Min(FSize, WOPageSize));
  262. end;
  263. end;
  264. procedure TIdWorkOpUnitWriteBuffer.Starting;
  265. begin
  266. Processing(0);
  267. end;
  268. { TIdWorkOpUnitWriteFile }
  269. constructor TIdWorkOpUnitWriteFile.Create(AFileName:string);
  270. begin
  271. inherited Create;
  272. FFilename := AFileName;
  273. end;
  274. procedure TIdWorkOpUnitWriteFile.Processing(ABytes: Integer);
  275. begin
  276. Assert(False, 'Need to implement WriteFile, also add to a bubble'); {do not localize}
  277. end;
  278. procedure TIdWorkOpUnitWriteFile.Starting;
  279. begin
  280. end;
  281. { TIdWorkOpUnitSizedStream }
  282. constructor TIdWorkOpUnitReadSizedStream.Create(AStream: TStream; ASize:integer);
  283. begin
  284. inherited Create(AStream);
  285. FSize := ASize;
  286. end;
  287. procedure TIdWorkOpUnitWaitConnected.Process(
  288. AOverlapped: PIdOverlapped;
  289. AByteCount: Integer
  290. );
  291. begin
  292. end;
  293. procedure TIdWorkOpUnitWaitConnected.Starting;
  294. begin
  295. end;
  296. { TIdWorkOpUnitReadLn }
  297. constructor TIdWorkOpUnitReadLn.Create(
  298. ATerminator: string;
  299. AMaxLength: Integer);
  300. begin
  301. inherited Create;
  302. FLastPos := 1;
  303. FTerminator := ATerminator;
  304. FMaxLength := AMaxLength;
  305. end;
  306. procedure TIdWorkOpUnitReadLn.Processing(
  307. ABuffer: TIdBytes
  308. );
  309. begin
  310. //TODO: ReadLn is very common. Need to optimize this class and maybe
  311. // even pass pack the result directly so we dont search twice.
  312. //Also allow for hinting from the user.
  313. IOHandler.InputBuffer.Write(ABuffer);
  314. if not IOHandler.Connected then begin
  315. Complete;
  316. end else if IOHandler.InputBuffer.IndexOf(FTerminator, FLastPos) = -1 then begin
  317. Read;
  318. end else begin
  319. Complete;
  320. end;
  321. end;
  322. procedure TIdWorkOpUnitReadUntilDisconnect.Processing(
  323. ABuffer: TIdBytes
  324. );
  325. begin
  326. // 0 is disconnected, so keep requesting til 0
  327. if Length(ABuffer) = 0 then begin
  328. Complete;
  329. end else begin
  330. FStream.WriteBuffer(ABuffer[0], Length(ABuffer));
  331. Read;
  332. end;
  333. end;
  334. { TIdWorkOpUnitReadAvailable }
  335. procedure TIdWorkOpUnitReadAvailable.Processing(
  336. ABuffer: TIdBytes
  337. );
  338. begin
  339. Complete;
  340. end;
  341. { TIdWorkOpUnitReadSized }
  342. constructor TIdWorkOpUnitReadSized.Create(ASize: Integer);
  343. begin
  344. inherited Create;
  345. FSize := ASize;
  346. end;
  347. procedure TIdWorkOpUnitReadSized.Processing(
  348. ABuffer: TIdBytes
  349. );
  350. begin
  351. IOHandler.InputBuffer.Write(ABuffer);
  352. FSize := FSize - Length(ABuffer);
  353. if FSize = 0 then begin
  354. Complete;
  355. end else begin
  356. Read;
  357. end;
  358. end;
  359. { TIdWorkOpUnitStreamBaseRead }
  360. constructor TIdWorkOpUnitStreamBaseRead.Create(AStream: TStream);
  361. begin
  362. inherited Create;
  363. FStream := AStream;
  364. end;
  365. { TIdWorkOpUnitStreamBaseWrite }
  366. constructor TIdWorkOpUnitStreamBaseWrite.Create(AStream: TStream;
  367. AFreeStream: Boolean);
  368. begin
  369. inherited Create;
  370. FStream := AStream;
  371. FFreeStream := AFreeStream;
  372. end;
  373. destructor TIdWorkOpUnitStreamBaseWrite.Destroy;
  374. begin
  375. if FFreeStream then begin
  376. FreeAndNil(FStream);
  377. end;
  378. inherited;
  379. end;
  380. procedure TIdWorkOpUnitReadSizedStream.Processing(
  381. ABuffer: TIdBytes
  382. );
  383. begin
  384. FStream.WriteBuffer(ABuffer[0], Length(ABuffer));
  385. FSize := FSize - Length(ABuffer);
  386. if FSize = 0 then begin
  387. Complete;
  388. end else begin
  389. Read;
  390. end;
  391. end;
  392. end.