CnThreadPool.pas 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439
  1. {******************************************************************************}
  2. { CnPack For Delphi/C++Builder }
  3. { 中国人自己的开放源码第三方开发包 }
  4. { (C)Copyright 2001-2018 CnPack 开发组 }
  5. { ------------------------------------ }
  6. { }
  7. { 本开发包是开源的自由软件,您可以遵照 CnPack 的发布协议来修 }
  8. { 改和重新发布这一程序。 }
  9. { }
  10. { 发布这一开发包的目的是希望它有用,但没有任何担保。甚至没有 }
  11. { 适合特定目的而隐含的担保。更详细的情况请参阅 CnPack 发布协议。 }
  12. { }
  13. { 您应该已经和开发包一起收到一份 CnPack 发布协议的副本。如果 }
  14. { 还没有,可访问我们的网站: }
  15. { }
  16. { 网站地址:http://www.cnpack.org }
  17. { 电子邮件:master@cnpack.org }
  18. { }
  19. {******************************************************************************}
  20. unit CnThreadPool;
  21. {* |<PRE>
  22. ================================================================================
  23. * 软件名称:不可视工具组件包
  24. * 单元名称:线程池实现单元
  25. * 单元作者:Chinbo(Shenloqi)
  26. * 备 注:
  27. * 开发平台:PWin2000Pro + Delphi 7.0
  28. * 兼容测试:PWin9X/2000/XP + Delphi 5/6
  29. * 本 地 化:该单元中的字符串暂不符合本地化处理方式
  30. * 单元标识:$Id$
  31. * 修改记录:2008.4.1
  32. * 找到了当初这个线程池的主要参考实现单元,补上了原作者
  33. * Aleksej Petrov的版权信息。尽管在原来的基础上修正了很多问题也
  34. * 增强了一些功能,但是整体的思路和大的实现方法还是跟原来的一样,
  35. * 再次感谢Aleksej Petrov,也感谢Leeon帮我找到了原作者信息
  36. * 2007.7.13
  37. * 修改了DeadTaskAsNew会导致无限递归和不生效的BUG
  38. * 如果需要使用DeadTaskAsNew则需要实现TCnTaskDataObject.Clone方法
  39. * 增加了一个TCnThreadPool.AddRequests方法
  40. * 2004.8.9
  41. * 公开了TCnPoolingThread.StillWorking
  42. * 简单修正了TickCount相减的BUG
  43. * 2004.3.14
  44. * 修正一些BUG
  45. * 2003.12.24
  46. * 使用FTaskCount和FIdleThreadCount加快执行效率
  47. * 修正了MinAtLeast不能正确工作的BUG
  48. * 在DefaultGetInfo之中调用FreeFinishedThreads
  49. * 2003.12.21
  50. * 完成并测试单元
  51. * 2003.12.16
  52. * 创建单元,实现功能
  53. ================================================================================
  54. |</PRE>}
  55. (********************************************************
  56. This component is modified from Aleksej Petrov's threadpool, fixed some
  57. memory leaks and enhanced the scheduling implementation and fixed some bugs.
  58. {*************************************************************}
  59. { Component for processing request queue in pool of threads }
  60. { Copyright (c) 2001, Aleksej Petrov, AKPetrov@pisem.net }
  61. { Free for noncommercial use. }
  62. { Please contact with author for use in commercial projects. }
  63. ********************************************************)
  64. (********************************************************
  65. 单元说明:
  66. 该单元实现了线程池的功能
  67. 设计:
  68. 实现线程池,首先要抽象出任务,最简单的就是使用一个指
  69. 向某一结构的指针,不过这样做显然不是一个好的方法,还有
  70. 一种比较简单的方法就是使用对象;有了抽象的任务之后,就
  71. 要有一个对任务的管理列表,这可以简单的用TList实现,尽管
  72. TList的事件少了一些;然后就要有执行处理任务的线程,这最
  73. 好要从TThread继承;然后要有通知机制,任务量,计算时间的
  74. 估算,任务平衡机制。
  75. 最简单的实现是工作线程作完一次任务之后就休眠,管理线
  76. 程使用一个定时器定期给这些线程进行分配,管理,不过这样
  77. 效率不高;也可以在增加任务的时候就进行一次分配,不过这
  78. 样的主动分配对于线程的任务均衡,效率等都不是好的解决方
  79. 法,而信号量则会比较好的解决这个问题。而线程池中线程的
  80. 释放则可以简单通过计时器实现。
  81. 真正的问题是如何估计工作量,线程池的工作强度,然后决
  82. 定何时需要增加线程,何时需要减少线程:)
  83. 限制:
  84. 考虑到线程池的使用范围,所以在实现中采用了不少只有在
  85. NT系列才实现的API,所以该组件在NT系列操作系统上会有最好
  86. 的效果,当然也不支持非Windows环境。
  87. 因为使用了WaitableTimer,所以在9x环境下线程池不会减少
  88. 线程池中的线程数目,应该是可以通过SetTimer替代,不过
  89. SetTimer需要一个消息循环处理WM_TIMER消息,开销较大且不
  90. 容易实现:)
  91. 另外还有一个替代的方法就是mmsystem的timesetevent函数,
  92. 不过这个函数的开销应该比其他的更大
  93. 不过如果通过TTimer来实现的话,应该就可以实现跨平台的
  94. 组件了...
  95. 内存泄漏:
  96. 一般情况下该组件不会有内存泄露,然而当线程池被释放时
  97. 线程池中还有正在工作的线程,且这些线程依赖的外部环境被
  98. 破坏时,为了线程能够正常退出不得已使用了TerminateThread
  99. 函数,而这个函数不会清理线程分配的内存,就可能造成内存
  100. 泄露,幸运的是这种情形一般发生在应用程序退出时破坏了外
  101. 部环境所致,所以一旦应用程序退出,操作系统会做相应的清
  102. 理工作,所以实际上应该不算内存泄露:)
  103. 即使是使用了TerminateThread,也不应该会造成程序退出时
  104. 的种种异常,如RunTime Error 216
  105. 类及函数:
  106. TCnCriticalSection -- 临界区的封装
  107. 在NT中实现了TryEnterCriticalSection,在SyncObjs.pas中
  108. 的TCriticalSection没有封装这个函数,TCnCriticalSection
  109. 封装了它,且直接从TObject继承,开销应该小一些:)
  110. TryEnter -- TryEnterCriticalSection
  111. TryEnterEx -- 9x时候就是Enter,NT就是TryEnter
  112. TCnTaskDataObject -- 线程池线程处理数据的封装
  113. 线程池中的线程处理任务所需的数据的基类
  114. 一般情况下,要实现某种特定的任务就需要实现相应的一个
  115. 从该类继承的类
  116. Duplicate -- 是否与另一个处理数据相同,相同则不处理
  117. Info -- 信息,用于调试输出
  118. TCnPoolingThread -- 线程池中的线程
  119. 线程池中的线程的基类,一般情况下不需要继承该类就可以
  120. 实现大部分的操作,但在线程需要一些外部环境时可以继承该
  121. 类的构造和析构函数,另一种方法是可以在线程池的相关事件
  122. 中进行这些配置
  123. AverageWaitingTime -- 平均等待时间
  124. AverageProcessingTime -- 平均处理时间
  125. Duplicate -- 是否正在处理相同的任务
  126. Info -- 信息,用于调试输出
  127. IsDead -- 是否已死
  128. IsFinished -- 是否执行完成
  129. IsIdle -- 是否空闲
  130. NewAverage -- 计算平均值(特殊算法)
  131. StillWorking -- 表明线程仍然在运行
  132. Execute -- 线程执行函数,一般不需继承
  133. Create -- 构造函数
  134. Destroy -- 析构函数
  135. Terminate -- 结束线程
  136. TCnThreadPool -- 线程池
  137. 控件的事件并没有使用同步方式封装,所以这些事件的代码
  138. 要线程安全才可以
  139. HasSpareThread -- 有空闲的线程
  140. AverageWaitingTime -- 平均等待时间
  141. AverageProcessingTime -- 平均计算时间
  142. TaskCount -- 任务数目
  143. ThreadCount -- 线程数目
  144. CheckTaskEmpty -- 检查任务是否都已经完成
  145. GetRequest -- 从队列中获取任务
  146. DecreaseThreads -- 减少线程
  147. IncreaseThreads -- 增加线程
  148. FreeFinishedThreads -- 释放完成的线程
  149. KillDeadThreads -- 清除死线程
  150. Info -- 信息,用于调试输出
  151. OSIsWin9x -- 操作系统是Win9x
  152. AddRequest -- 增加任务
  153. RemoveRequest -- 从队列中删除任务
  154. CreateSpecial -- 创建自定义线程池线程的构造函数
  155. AdjustInterval -- 减少线程的时间间隔
  156. DeadTaskAsNew -- 将死线程的任务重新加到队列
  157. MinAtLeast -- 线程数不少于最小数目
  158. ThreadDeadTimeout -- 线程死亡超时
  159. ThreadsMinCount -- 最少线程数
  160. ThreadsMaxCount -- 最大线程数
  161. OnGetInfo -- 获取信息事件
  162. OnProcessRequest -- 处理任务事件
  163. OnQueueEmpty -- 队列空事件
  164. OnThreadInitializing -- 线程初始化事件
  165. OnThreadFinalizing -- 线程终止化事件
  166. ********************************************************)
  167. interface
  168. {$I CnPack.inc}
  169. {.$DEFINE DEBUG}//是否输出调试信息
  170. uses
  171. SysUtils, Windows, Classes,
  172. CnConsts, CnClasses, CnCompConsts;
  173. type
  174. TCnCriticalSection = class
  175. protected
  176. FSection: TRTLCriticalSection;
  177. public
  178. constructor Create;
  179. destructor Destroy; override;
  180. procedure Enter;
  181. procedure Leave;
  182. function TryEnter: Boolean;
  183. function TryEnterEx: Boolean;
  184. end;
  185. TCnTaskDataObject = class
  186. public
  187. function Clone: TCnTaskDataObject; virtual; abstract;
  188. function Duplicate(DataObj: TCnTaskDataObject;
  189. const Processing: Boolean): Boolean; virtual;
  190. function Info: string; virtual;
  191. end;
  192. { TCnPoolingThread }
  193. TCnThreadPool = class;
  194. TCnThreadState = (ctsInitializing, ctsWaiting,
  195. ctsGetting,
  196. ctsProcessing, ctsProcessed,
  197. ctsTerminating, ctsForReduce);
  198. TCnPoolingThread = class(TThread)
  199. private
  200. hInitFinished: THandle;
  201. sInitError: string;
  202. {$IFDEF DEBUG}
  203. procedure Trace(const Str: string);
  204. {$ENDIF DEBUG}
  205. procedure ForceTerminate;
  206. protected
  207. FAverageWaitingTime: Integer;
  208. FAverageProcessing: Integer;
  209. uWaitingStart: DWORD;
  210. uProcessingStart: DWORD;
  211. uStillWorking: DWORD;
  212. FWorkCount: Int64;
  213. FCurState: TCnThreadState;
  214. hThreadTerminated: THandle;
  215. FPool: TCnThreadPool;
  216. FProcessingDataObject: TCnTaskDataObject;
  217. csProcessingDataObject: TCnCriticalSection;
  218. function AverageProcessingTime: DWORD;
  219. function AverageWaitingTime: DWORD;
  220. function CloneData: TCnTaskDataObject;
  221. function Duplicate(DataObj: TCnTaskDataObject): Boolean; virtual;
  222. function Info: string; virtual;
  223. function IsDead: Boolean; virtual;
  224. function IsFinished: Boolean; virtual;
  225. function IsIdle: Boolean; virtual;
  226. function NewAverage(OldAvg, NewVal: Integer): Integer; virtual;
  227. procedure Execute; override;
  228. public
  229. constructor Create(aPool: TCnThreadPool); virtual;
  230. destructor Destroy; override;
  231. procedure StillWorking;
  232. procedure Terminate(const Force: Boolean = False);
  233. end;
  234. TCnPoolingThreadClass = class of TCnPoolingThread;
  235. { TCnThreadPool }
  236. TCheckDuplicate = (cdQueue, cdProcessing);
  237. TCheckDuplicates = set of TCheckDuplicate;
  238. TGetInfo = procedure(Sender: TCnThreadPool;
  239. var InfoText: string) of object;
  240. TProcessRequest = procedure(Sender: TCnThreadPool;
  241. aDataObj: TCnTaskDataObject; aThread: TCnPoolingThread) of object;
  242. TEmptyKind = (ekQueueEmpty, ekTaskEmpty);
  243. TQueueEmpty = procedure(Sender: TCnThreadPool;
  244. EmptyKind: TEmptyKind) of object;
  245. TThreadInPoolInitializing = procedure(Sender: TCnThreadPool;
  246. aThread: TCnPoolingThread) of object;
  247. TThreadInPoolFinalizing = procedure(Sender: TCnThreadPool;
  248. aThread: TCnPoolingThread) of object;
  249. TCnThreadPool = class(TCnComponent)
  250. private
  251. csQueueManagment: TCnCriticalSection;
  252. csThreadManagment: TCnCriticalSection;
  253. FQueue: TList;
  254. FThreads: TList;
  255. FThreadsKilling: TList;
  256. FThreadsMinCount, FThreadsMaxCount: Integer;
  257. FThreadDeadTimeout: DWORD;
  258. FThreadClass: TCnPoolingThreadClass;
  259. FAdjustInterval: DWORD;
  260. FDeadTaskAsNew: Boolean;
  261. FMinAtLeast: Boolean;
  262. FIdleThreadCount, FTaskCount: Integer;
  263. FThreadInitializing: TThreadInPoolInitializing;
  264. FThreadFinalizing: TThreadInPoolFinalizing;
  265. FProcessRequest: TProcessRequest;
  266. FQueueEmpty: TQueueEmpty;
  267. FGetInfo: TGetInfo;
  268. procedure SetAdjustInterval(const Value: DWORD);
  269. {$IFDEF DEBUG}
  270. procedure Trace(const Str: string);
  271. {$ENDIF DEBUG}
  272. protected
  273. FLastGetPoint: Integer;
  274. hSemRequestCount: THandle;
  275. hTimReduce: THandle;
  276. function HasSpareThread: Boolean;
  277. function HasTask: Boolean;
  278. function FinishedThreadsAreFull: Boolean; virtual;
  279. procedure CheckTaskEmpty;
  280. procedure GetRequest(var Request: TCnTaskDataObject);
  281. procedure DecreaseThreads;
  282. procedure IncreaseThreads;
  283. procedure FreeFinishedThreads;
  284. procedure KillDeadThreads;
  285. procedure DoProcessRequest(aDataObj: TCnTaskDataObject;
  286. aThread: TCnPoolingThread); virtual;
  287. procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual;
  288. procedure DoThreadInitializing(aThread: TCnPoolingThread); virtual;
  289. procedure DoThreadFinalizing(aThread: TCnPoolingThread); virtual;
  290. procedure GetComponentInfo(var AName, Author, Email, Comment: string); override;
  291. public
  292. uTerminateWaitTime: DWORD;
  293. QueuePackCount: Integer;
  294. constructor Create(AOwner: TComponent); override;
  295. constructor CreateSpecial(AOwner: TComponent;
  296. AClass: TCnPoolingThreadClass);
  297. destructor Destroy; override;
  298. function AverageWaitingTime: Integer;
  299. function AverageProcessingTime: Integer;
  300. function Info: string;
  301. function OSIsWin9x: Boolean;
  302. function TaskCount: Integer;
  303. function ThreadCount: Integer;
  304. function ThreadInfo(const i: Integer): string;
  305. function ThreadKillingCount: Integer;
  306. function ThreadKillingInfo(const i: Integer): string;
  307. procedure DefaultGetInfo(Sender: TCnThreadPool; var InfoText: string);
  308. function AddRequest(aDataObject: TCnTaskDataObject;
  309. CheckDuplicate: TCheckDuplicates = [cdQueue]): Boolean;
  310. procedure AddRequests(aDataObjects: array of TCnTaskDataObject;
  311. CheckDuplicate: TCheckDuplicates = [cdQueue]);
  312. procedure RemoveRequest(aDataObject: TCnTaskDataObject);
  313. published
  314. property AdjustInterval: DWORD read FAdjustInterval write SetAdjustInterval
  315. default 10000;
  316. property DeadTaskAsNew: Boolean read FDeadTaskAsNew write FDeadTaskAsNew
  317. default True;
  318. property MinAtLeast: Boolean read FMinAtLeast write FMinAtLeast
  319. default False;
  320. property ThreadDeadTimeout: DWORD read FThreadDeadTimeout
  321. write FThreadDeadTimeout default 0;
  322. property ThreadsMinCount: Integer read FThreadsMinCount write FThreadsMinCount default 0;
  323. property ThreadsMaxCount: Integer read FThreadsMaxCount write FThreadsMaxCount default 10;
  324. property OnGetInfo: TGetInfo read FGetInfo write FGetInfo;
  325. property OnProcessRequest: TProcessRequest read FProcessRequest
  326. write FProcessRequest;
  327. property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty;
  328. property OnThreadInitializing: TThreadInPoolInitializing
  329. read FThreadInitializing write FThreadInitializing;
  330. property OnThreadFinalizing: TThreadInPoolFinalizing read FThreadFinalizing
  331. write FThreadFinalizing;
  332. end;
  333. {$IFDEF DEBUG}
  334. TLogWriteProc = procedure(const Str: string; const ID: Integer);
  335. var
  336. TraceLog: TLogWriteProc = nil;
  337. {$ENDIF DEBUG}
  338. const
  339. CCnTHREADSTATE: array[TCnThreadState] of string = (
  340. 'ctsInitializing', 'ctsWaiting',
  341. 'ctsGetting',
  342. 'ctsProcessing', 'ctsProcessed',
  343. 'ctsTerminating', 'ctsForReduce');
  344. implementation
  345. uses
  346. Math;
  347. const
  348. MaxInt64 = High(Int64);
  349. var
  350. FOSIsWin9x: Boolean;
  351. {$IFDEF DEBUG}
  352. procedure SimpleTrace(const Str: string; const ID: Integer);
  353. begin
  354. OutputDebugString(PChar(IntToStr(ID) + ':' + Str))
  355. end;
  356. {$ENDIF DEBUG}
  357. function GetTickDiff(const AOldTickCount, ANewTickCount : Cardinal):Cardinal;
  358. begin
  359. if ANewTickCount >= AOldTickCount then
  360. begin
  361. Result := ANewTickCount - AOldTickCount;
  362. end
  363. else
  364. begin
  365. Result := High(Cardinal) - AOldTickCount + ANewTickCount;
  366. end;
  367. end;
  368. { TCnCriticalSection }
  369. constructor TCnCriticalSection.Create;
  370. begin
  371. inherited;
  372. InitializeCriticalSection(FSection)
  373. end;
  374. destructor TCnCriticalSection.Destroy;
  375. begin
  376. DeleteCriticalSection(FSection);
  377. inherited;
  378. end;
  379. procedure TCnCriticalSection.Enter;
  380. begin
  381. EnterCriticalSection(FSection)
  382. end;
  383. procedure TCnCriticalSection.Leave;
  384. begin
  385. LeaveCriticalSection(FSection)
  386. end;
  387. function TCnCriticalSection.TryEnter: Boolean;
  388. begin
  389. Result := TryEnterCriticalSection(FSection)
  390. end;
  391. function TCnCriticalSection.TryEnterEx: Boolean;
  392. begin
  393. if FOSIsWin9x then
  394. begin
  395. Enter;
  396. Result := True
  397. end
  398. else
  399. Result := TryEnter
  400. end;
  401. { TCnTaskDataObject }
  402. function TCnTaskDataObject.Duplicate(DataObj: TCnTaskDataObject;
  403. const Processing: Boolean): Boolean;
  404. begin
  405. Result := False
  406. end;
  407. function TCnTaskDataObject.Info: string;
  408. begin
  409. Result := IntToHex(Cardinal(Self), 8)
  410. end;
  411. { TCnPoolingThread }
  412. constructor TCnPoolingThread.Create(aPool: TCnThreadPool);
  413. begin
  414. {$IFDEF DEBUG}
  415. Trace('TCnPoolingThread.Create');
  416. {$ENDIF DEBUG}
  417. inherited Create(True);
  418. FPool := aPool;
  419. FAverageWaitingTime := 0;
  420. FAverageProcessing := 0;
  421. FWorkCount := 0;
  422. sInitError := '';
  423. FreeOnTerminate := False;
  424. hInitFinished := CreateEvent(nil, True, False, nil);
  425. hThreadTerminated := CreateEvent(nil, True, False, nil);
  426. csProcessingDataObject := TCnCriticalSection.Create;
  427. try
  428. Resume;
  429. WaitForSingleObject(hInitFinished, INFINITE);
  430. if sInitError <> '' then
  431. raise Exception.Create(sInitError);
  432. finally
  433. CloseHandle(hInitFinished);
  434. end;
  435. {$IFDEF DEBUG}
  436. Trace('TCnPoolingThread.Created OK');
  437. {$ENDIF DEBUG}
  438. end;
  439. destructor TCnPoolingThread.Destroy;
  440. begin
  441. {$IFDEF DEBUG}
  442. Trace('TCnPoolingThread.Destroy');
  443. {$ENDIF DEBUG}
  444. FreeAndNil(FProcessingDataObject);
  445. CloseHandle(hThreadTerminated);
  446. csProcessingDataObject.Free;
  447. inherited;
  448. end;
  449. function TCnPoolingThread.AverageProcessingTime: DWORD;
  450. begin
  451. if FCurState in [ctsProcessing] then
  452. Result := NewAverage(FAverageProcessing, GetTickDiff(uProcessingStart, GetTickCount))
  453. else
  454. Result := FAverageProcessing
  455. end;
  456. function TCnPoolingThread.AverageWaitingTime: DWORD;
  457. begin
  458. if FCurState in [ctsWaiting, ctsForReduce] then
  459. Result := NewAverage(FAverageWaitingTime, GetTickDiff(uWaitingStart, GetTickCount))
  460. else
  461. Result := FAverageWaitingTime
  462. end;
  463. function TCnPoolingThread.Duplicate(DataObj: TCnTaskDataObject): Boolean;
  464. begin
  465. csProcessingDataObject.Enter;
  466. try
  467. Result := (FProcessingDataObject <> nil) and
  468. DataObj.Duplicate(FProcessingDataObject, True);
  469. finally
  470. csProcessingDataObject.Leave
  471. end
  472. end;
  473. procedure TCnPoolingThread.ForceTerminate;
  474. begin
  475. {$IFDEF DEBUG}
  476. Trace('TCnPoolingThread.ForceTerminate');
  477. {$ENDIF DEBUG}
  478. TerminateThread(Handle, 0)
  479. end;
  480. procedure TCnPoolingThread.Execute;
  481. type
  482. THandleID = (hidRequest, hidReduce, hidTerminate);
  483. var
  484. WaitedTime: Integer;
  485. Handles: array[THandleID] of THandle;
  486. begin
  487. {$IFDEF DEBUG}
  488. Trace('TCnPoolingThread.Execute');
  489. {$ENDIF DEBUG}
  490. FCurState := ctsInitializing;
  491. try
  492. FPool.DoThreadInitializing(Self);
  493. except
  494. on E: Exception do
  495. sInitError := E.Message;
  496. end;
  497. SetEvent(hInitFinished);
  498. {$IFDEF DEBUG}
  499. Trace('TCnPoolingThread.Execute: Initialized');
  500. {$ENDIF DEBUG}
  501. Handles[hidRequest] := FPool.hSemRequestCount;
  502. Handles[hidReduce] := FPool.hTimReduce;
  503. Handles[hidTerminate] := hThreadTerminated;
  504. uWaitingStart := GetTickCount;
  505. FProcessingDataObject := nil;
  506. while not Terminated do
  507. begin
  508. if not (FCurState in [ctsWaiting, ctsForReduce]) then
  509. InterlockedIncrement(FPool.FIdleThreadCount);
  510. FCurState := ctsWaiting;
  511. case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) of
  512. WAIT_OBJECT_0 + Ord(hidRequest):
  513. begin
  514. {$IFDEF DEBUG}
  515. Trace('TCnPoolingThread.Execute: hidRequest');
  516. {$ENDIF DEBUG}
  517. WaitedTime := GetTickDiff(uWaitingStart, GetTickCount);
  518. FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime);
  519. if FCurState in [ctsWaiting, ctsForReduce] then
  520. InterlockedDecrement(FPool.FIdleThreadCount);
  521. FCurState := ctsGetting;
  522. FPool.GetRequest(FProcessingDataObject);
  523. if FWorkCount < MaxInt64 then
  524. FWorkCount := FWorkCount + 1;
  525. uProcessingStart := GetTickCount;
  526. uStillWorking := uProcessingStart;
  527. FCurState := ctsProcessing;
  528. try
  529. {$IFDEF DEBUG}
  530. Trace('Processing: ' + FProcessingDataObject.Info);
  531. {$ENDIF DEBUG}
  532. FPool.DoProcessRequest(FProcessingDataObject, Self)
  533. except
  534. {$IFDEF DEBUG}
  535. on E: Exception do
  536. Trace('OnProcessRequest Exception: ' + E.Message);
  537. {$ENDIF DEBUG}
  538. end;
  539. csProcessingDataObject.Enter;
  540. try
  541. FreeAndNil(FProcessingDataObject)
  542. finally
  543. csProcessingDataObject.Leave
  544. end;
  545. FAverageProcessing :=
  546. NewAverage(FAverageProcessing, GetTickDiff(uProcessingStart, GetTickCount));
  547. FCurState := ctsProcessed;
  548. FPool.CheckTaskEmpty;
  549. uWaitingStart := GetTickCount;
  550. end;
  551. WAIT_OBJECT_0 + Ord(hidReduce):
  552. begin
  553. {$IFDEF DEBUG}
  554. Trace('TCnPoolingThread.Execute: hidReduce');
  555. {$ENDIF DEBUG}
  556. if not (FCurState in [ctsWaiting, ctsForReduce]) then
  557. InterlockedIncrement(FPool.FIdleThreadCount);
  558. FCurState := ctsForReduce;
  559. FPool.DecreaseThreads;
  560. end;
  561. WAIT_OBJECT_0 + Ord(hidTerminate):
  562. begin
  563. {$IFDEF DEBUG}
  564. Trace('TCnPoolingThread.Execute: hidTerminate');
  565. {$ENDIF DEBUG}
  566. if FCurState in [ctsWaiting, ctsForReduce] then
  567. InterlockedDecrement(FPool.FIdleThreadCount);
  568. FCurState := ctsTerminating;
  569. Break
  570. end;
  571. end;
  572. end;
  573. if FCurState in [ctsWaiting, ctsForReduce] then
  574. InterlockedDecrement(FPool.FIdleThreadCount);
  575. FCurState := ctsTerminating;
  576. FPool.DoThreadFinalizing(Self);
  577. end;
  578. function TCnPoolingThread.Info: string;
  579. begin
  580. Result := 'AverageWaitingTime=' + IntToStr(AverageWaitingTime) + '; ' +
  581. 'AverageProcessingTime=' + IntToStr(AverageProcessingTime) + '; ' +
  582. 'FCurState=' + CCnTHREADSTATE[FCurState] + '; ' +
  583. 'FWorkCount=' + IntToStr(FWorkCount);
  584. if not FPool.OSIsWin9x then
  585. begin
  586. if csProcessingDataObject.TryEnter then
  587. try
  588. Result := Result + '; ' + 'FProcessingDataObject=';
  589. if FProcessingDataObject = nil then
  590. Result := Result + 'nil'
  591. else
  592. Result := Result + FProcessingDataObject.Info
  593. finally
  594. csProcessingDataObject.Leave
  595. end
  596. end
  597. else
  598. begin
  599. if FProcessingDataObject = nil then
  600. Result := Result + '; ' + 'FProcessingDataObject=nil'
  601. else
  602. Result := Result + '; ' + 'FProcessingDataObject!=nil'
  603. end
  604. end;
  605. function TCnPoolingThread.IsDead: Boolean;
  606. begin
  607. Result := Terminated or
  608. ((FPool.ThreadDeadTimeout > 0) and
  609. (FCurState = ctsProcessing) and
  610. (GetTickDiff(uStillWorking, GetTickCount) > FPool.ThreadDeadTimeout));
  611. {$IFDEF DEBUG}
  612. if Result then
  613. Trace('Thread is dead, Info = ' + Info);
  614. {$ENDIF DEBUG}
  615. end;
  616. function TCnPoolingThread.IsFinished: Boolean;
  617. begin
  618. Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0
  619. end;
  620. function TCnPoolingThread.IsIdle: Boolean;
  621. begin
  622. Result := (FCurState in [ctsWaiting, ctsForReduce]) and
  623. (AverageWaitingTime > 200) and
  624. (AverageWaitingTime * 2 > AverageProcessingTime)
  625. end;
  626. function TCnPoolingThread.NewAverage(OldAvg, NewVal: Integer): Integer;
  627. begin
  628. if FWorkCount >= 8 then
  629. Result := (OldAvg * 7 + NewVal) div 8
  630. else if FWorkCount > 0 then
  631. Result := (OldAvg * FWorkCount + NewVal) div FWorkCount
  632. else
  633. Result := NewVal
  634. end;
  635. procedure TCnPoolingThread.StillWorking;
  636. begin
  637. uStillWorking := GetTickCount
  638. end;
  639. procedure TCnPoolingThread.Terminate(const Force: Boolean);
  640. begin
  641. {$IFDEF DEBUG}
  642. Trace('TCnPoolingThread.Terminate');
  643. {$ENDIF DEBUG}
  644. inherited Terminate;
  645. if Force then
  646. begin
  647. ForceTerminate;
  648. Free
  649. end
  650. else
  651. SetEvent(hThreadTerminated)
  652. end;
  653. {$IFDEF DEBUG}
  654. procedure TCnPoolingThread.Trace(const Str: string);
  655. begin
  656. TraceLog(Str, ThreadID);
  657. end;
  658. {$ENDIF DEBUG}
  659. function TCnPoolingThread.CloneData: TCnTaskDataObject;
  660. begin
  661. csProcessingDataObject.Enter;
  662. try
  663. Result := nil;
  664. if FProcessingDataObject <> nil then
  665. Result := FProcessingDataObject.Clone;
  666. finally
  667. csProcessingDataObject.Leave;
  668. end;
  669. end;
  670. { TCnThreadPool }
  671. constructor TCnThreadPool.Create(AOwner: TComponent);
  672. var
  673. DueTo: Int64;
  674. begin
  675. {$IFDEF DEBUG}
  676. Trace('TCnThreadPool.Create');
  677. {$ENDIF DEBUG}
  678. inherited;
  679. csQueueManagment := TCnCriticalSection.Create;
  680. csThreadManagment := TCnCriticalSection.Create;
  681. FQueue := TList.Create;
  682. FThreads := TList.Create;
  683. FThreadsKilling := TList.Create;
  684. FThreadsMinCount := 0;
  685. FThreadsMaxCount := 1;
  686. FThreadDeadTimeout := 0;
  687. FThreadClass := TCnPoolingThread;
  688. FAdjustInterval := 10000;
  689. FDeadTaskAsNew := True;
  690. FMinAtLeast := False;
  691. FLastGetPoint := 0;
  692. uTerminateWaitTime := 10000;
  693. QueuePackCount := 127;
  694. FIdleThreadCount := 0;
  695. FTaskCount := 0;
  696. hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil);
  697. DueTo := -1;
  698. hTimReduce := CreateWaitableTimer(nil, False, nil);
  699. if hTimReduce = 0 then
  700. hTimReduce := CreateEvent(nil, False, False, nil)
  701. else
  702. SetWaitableTimer(hTimReduce, DueTo, FAdjustInterval, nil, nil, False);
  703. end;
  704. constructor TCnThreadPool.CreateSpecial(AOwner: TComponent;
  705. AClass: TCnPoolingThreadClass);
  706. begin
  707. Create(AOwner);
  708. if AClass <> nil then
  709. FThreadClass := AClass
  710. end;
  711. destructor TCnThreadPool.Destroy;
  712. var
  713. i, n: Integer;
  714. Handles: array of THandle;
  715. begin
  716. {$IFDEF DEBUG}
  717. Trace('TCnThreadPool.Destroy');
  718. {$ENDIF DEBUG}
  719. csThreadManagment.Enter;
  720. try
  721. SetLength(Handles, FThreads.Count);
  722. n := 0;
  723. for i := 0 to FThreads.Count - 1 do
  724. if FThreads[i] <> nil then
  725. begin
  726. Handles[n] := TCnPoolingThread(FThreads[i]).Handle;
  727. TCnPoolingThread(FThreads[i]).Terminate(False);
  728. Inc(n);
  729. end;
  730. WaitForMultipleObjects(n, @Handles[0], True, uTerminateWaitTime);
  731. for i := 0 to FThreads.Count - 1 do
  732. begin
  733. {if FThreads[i] <> nil then
  734. TCnPoolingThread(FThreads[i]).Terminate(True)
  735. else}
  736. TCnPoolingThread(FThreads[i]).Free;
  737. end;
  738. FThreads.Free;
  739. FreeFinishedThreads;
  740. for i := 0 to FThreadsKilling.Count - 1 do
  741. begin
  742. {if FThreadsKilling[i] <> nil then
  743. TCnPoolingThread(FThreadsKilling[i]).Terminate(True)
  744. else}
  745. TCnPoolingThread(FThreadsKilling[i]).Free;
  746. end;
  747. FThreadsKilling.Free;
  748. finally
  749. csThreadManagment.Free;
  750. end;
  751. csQueueManagment.Enter;
  752. try
  753. for i := FQueue.Count - 1 downto 0 do
  754. TObject(FQueue[i]).Free;
  755. FQueue.Free;
  756. finally
  757. csQueueManagment.Free;
  758. end;
  759. CloseHandle(hSemRequestCount);
  760. CloseHandle(hTimReduce);
  761. inherited;
  762. end;
  763. function TCnThreadPool.AddRequest(aDataObject: TCnTaskDataObject;
  764. CheckDuplicate: TCheckDuplicates): Boolean;
  765. var
  766. i: Integer;
  767. begin
  768. {$IFDEF DEBUG}
  769. Trace('AddRequest:' + aDataObject.Info);
  770. {$ENDIF DEBUG}
  771. Result := False;
  772. csQueueManagment.Enter;
  773. try
  774. if cdQueue in CheckDuplicate then
  775. for i := 0 to FQueue.Count - 1 do
  776. if (FQueue[i] <> nil) and
  777. aDataObject.Duplicate(TCnTaskDataObject(FQueue[i]), False) then
  778. begin
  779. {$IFDEF DEBUG}
  780. Trace('Duplicate:' + TCnTaskDataObject(FQueue[i]).Info);
  781. {$ENDIF DEBUG}
  782. FreeAndNil(aDataObject);
  783. Exit
  784. end;
  785. csThreadManagment.Enter;
  786. try
  787. IncreaseThreads;
  788. if cdProcessing in CheckDuplicate then
  789. for i := 0 to FThreads.Count - 1 do
  790. if TCnPoolingThread(FThreads[i]).Duplicate(aDataObject) then
  791. begin
  792. {$IFDEF DEBUG}
  793. Trace('Duplicate:' + TCnPoolingThread(FThreads[i]).FProcessingDataObject.Info);
  794. {$ENDIF DEBUG}
  795. FreeAndNil(aDataObject);
  796. Exit
  797. end
  798. finally
  799. csThreadManagment.Leave;
  800. end;
  801. FQueue.Add(aDataObject);
  802. Inc(FTaskCount);
  803. ReleaseSemaphore(hSemRequestCount, 1, nil);
  804. {$IFDEF DEBUG}
  805. Trace('ReleaseSemaphore');
  806. {$ENDIF DEBUG}
  807. Result := True;
  808. finally
  809. csQueueManagment.Leave;
  810. end;
  811. {$IFDEF DEBUG}
  812. Trace('Added Request:' + aDataObject.Info);
  813. {$ENDIF DEBUG}
  814. end;
  815. procedure TCnThreadPool.AddRequests(
  816. aDataObjects: array of TCnTaskDataObject;
  817. CheckDuplicate: TCheckDuplicates);
  818. var
  819. i: Integer;
  820. begin
  821. for i := 0 to Length(aDataObjects) - 1 do
  822. AddRequest(aDataObjects[i], CheckDuplicate)
  823. end;
  824. procedure TCnThreadPool.CheckTaskEmpty;
  825. var
  826. i: Integer;
  827. begin
  828. csQueueManagment.Enter;
  829. try
  830. if (FLastGetPoint < FQueue.Count) then
  831. Exit;
  832. csThreadManagment.Enter;
  833. try
  834. for i := 0 to FThreads.Count - 1 do
  835. if TCnPoolingThread(FThreads[i]).FCurState in [ctsProcessing] then
  836. Exit
  837. finally
  838. csThreadManagment.Leave
  839. end;
  840. DoQueueEmpty(ekTaskEmpty)
  841. finally
  842. csQueueManagment.Leave
  843. end
  844. end;
  845. procedure TCnThreadPool.DecreaseThreads;
  846. var
  847. i: Integer;
  848. begin
  849. {$IFDEF DEBUG}
  850. Trace('TCnThreadPool.DecreaseThreads');
  851. {$ENDIF DEBUG}
  852. if csThreadManagment.TryEnter then
  853. try
  854. KillDeadThreads;
  855. FreeFinishedThreads;
  856. for i := FThreads.Count - 1 downto FThreadsMinCount do
  857. if TCnPoolingThread(FThreads[i]).IsIdle then
  858. begin
  859. TCnPoolingThread(FThreads[i]).Terminate(False);
  860. FThreadsKilling.Add(FThreads[i]);
  861. FThreads.Delete(i);
  862. Break
  863. end
  864. finally
  865. csThreadManagment.Leave
  866. end
  867. end;
  868. procedure TCnThreadPool.DefaultGetInfo(Sender: TCnThreadPool;
  869. var InfoText: string);
  870. var
  871. i: Integer;
  872. sLine: string;
  873. begin
  874. sLine := StringOfChar('=', 15);
  875. with Sender do
  876. begin
  877. FreeFinishedThreads;
  878. InfoText := 'MinCount=' + IntToStr(ThreadsMinCount) +
  879. '; MaxCount=' + IntToStr(ThreadsMaxCount) +
  880. '; AdjustInterval=' + IntToStr(AdjustInterval) +
  881. '; DeadTimeOut=' + IntToStr(ThreadDeadTimeout) + #13#10 +
  882. 'ThreadCount=' + IntToStr(ThreadCount) +
  883. '; KillingCount=' + IntToStr(ThreadKillingCount) +
  884. '; SpareThreadCount=' + IntToStr(FIdleThreadCount) +
  885. '; TaskCount=' + IntToStr(TaskCount) + #13#10 +
  886. 'AverageWaitingTime=' + IntToStr(AverageWaitingTime) +
  887. '; AverageProcessingTime=' + IntToStr(AverageProcessingTime) + #13#10 +
  888. {sLine + }'Working Threads Info' + sLine;
  889. for i := 0 to ThreadCount - 1 do
  890. InfoText := InfoText + #13#10 + ThreadInfo(i);
  891. InfoText := InfoText + #13#10 + {sLine +} 'Killing Threads Info' + sLine;
  892. for i := 0 to ThreadKillingCount - 1 do
  893. InfoText := InfoText + #13#10 + ThreadKillingInfo(i)
  894. end
  895. end;
  896. procedure TCnThreadPool.DoProcessRequest(aDataObj: TCnTaskDataObject;
  897. aThread: TCnPoolingThread);
  898. begin
  899. if Assigned(FProcessRequest) then
  900. FProcessRequest(Self, aDataObj, aThread)
  901. end;
  902. procedure TCnThreadPool.DoQueueEmpty(EmptyKind: TEmptyKind);
  903. begin
  904. if Assigned(FQueueEmpty) then
  905. FQueueEmpty(Self, EmptyKind)
  906. end;
  907. procedure TCnThreadPool.DoThreadFinalizing(aThread: TCnPoolingThread);
  908. begin
  909. if Assigned(FThreadFinalizing) then
  910. FThreadFinalizing(Self, aThread)
  911. end;
  912. procedure TCnThreadPool.DoThreadInitializing(aThread: TCnPoolingThread);
  913. begin
  914. if Assigned(FThreadInitializing) then
  915. FThreadInitializing(Self, aThread)
  916. end;
  917. procedure TCnThreadPool.FreeFinishedThreads;
  918. var
  919. i: Integer;
  920. begin
  921. if csThreadManagment.TryEnter then
  922. try
  923. for i := FThreadsKilling.Count - 1 downto 0 do
  924. if TCnPoolingThread(FThreadsKilling[i]).IsFinished then
  925. begin
  926. TCnPoolingThread(FThreadsKilling[i]).Free;
  927. FThreadsKilling.Delete(i)
  928. end
  929. finally
  930. csThreadManagment.Leave
  931. end
  932. end;
  933. procedure TCnThreadPool.GetRequest(var Request: TCnTaskDataObject);
  934. begin
  935. {$IFDEF DEBUG}
  936. Trace('TCnThreadPool.GetRequest');
  937. {$ENDIF DEBUG}
  938. csQueueManagment.Enter;
  939. try
  940. while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do
  941. Inc(FLastGetPoint);
  942. if (FQueue.Count > QueuePackCount) and
  943. (FLastGetPoint >= FQueue.Count * 3 div 4) then
  944. begin
  945. {$IFDEF DEBUG}
  946. Trace('FQueue.Pack');
  947. {$ENDIF DEBUG}
  948. FQueue.Pack;
  949. FTaskCount := FQueue.Count;
  950. FLastGetPoint := 0
  951. end;
  952. Request := TCnTaskDataObject(FQueue[FLastGetPoint]);
  953. FQueue[FLastGetPoint] := nil;
  954. Dec(FTaskCount);
  955. Inc(FLastGetPoint);
  956. if (FLastGetPoint = FQueue.Count) then
  957. begin
  958. DoQueueEmpty(ekQueueEmpty);
  959. FQueue.Clear;
  960. FTaskCount := 0;
  961. FLastGetPoint := 0
  962. end
  963. finally
  964. csQueueManagment.Leave
  965. end
  966. end;
  967. function TCnThreadPool.HasSpareThread: Boolean;
  968. begin
  969. Result := FIdleThreadCount > 0
  970. end;
  971. function TCnThreadPool.HasTask: Boolean;
  972. begin
  973. Result := FTaskCount > 0
  974. end;
  975. function TCnThreadPool.FinishedThreadsAreFull: Boolean;
  976. begin
  977. csThreadManagment.Enter;
  978. try
  979. if FThreadsMaxCount > 0 then
  980. Result := FThreadsKilling.Count >= FThreadsMaxCount div 2
  981. else
  982. Result := FThreadsKilling.Count >= 50;
  983. finally
  984. csThreadManagment.Leave
  985. end
  986. end;
  987. procedure TCnThreadPool.IncreaseThreads;
  988. var
  989. iAvgWait, iAvgProc: Integer;
  990. i: Integer;
  991. begin
  992. csThreadManagment.Enter;
  993. try
  994. KillDeadThreads;
  995. FreeFinishedThreads;
  996. if FThreads.Count = 0 then
  997. begin
  998. {$IFDEF DEBUG}
  999. Trace('IncreaseThreads: FThreads.Count = 0');
  1000. {$ENDIF DEBUG}
  1001. try
  1002. FThreads.Add(FThreadClass.Create(Self));
  1003. except
  1004. {$IFDEF DEBUG}
  1005. on E: Exception do
  1006. Trace('New thread Exception on ' + E.ClassName + ': ' + E.Message)
  1007. {$ENDIF DEBUG}
  1008. end
  1009. end
  1010. else if FMinAtLeast and (FThreads.Count < FThreadsMinCount) then
  1011. begin
  1012. {$IFDEF DEBUG}
  1013. Trace('IncreaseThreads: FThreads.Count < FThreadsMinCount');
  1014. {$ENDIF DEBUG}
  1015. for i := FThreads.Count to FThreadsMinCount - 1 do
  1016. try
  1017. FThreads.Add(FThreadClass.Create(Self));
  1018. except
  1019. {$IFDEF DEBUG}
  1020. on E: Exception do
  1021. Trace('New thread Exception on ' + E.ClassName + ': ' + E.Message)
  1022. {$ENDIF DEBUG}
  1023. end
  1024. end
  1025. else if (FThreads.Count < FThreadsMaxCount) and HasTask and not HasSpareThread then
  1026. begin
  1027. {$IFDEF DEBUG}
  1028. Trace('IncreaseThreads: FThreads.Count < FThreadsMaxCount');
  1029. {$ENDIF DEBUG}
  1030. i := TaskCount;
  1031. if i <= 0 then
  1032. Exit;
  1033. iAvgWait := Max(AverageWaitingTime, 1);
  1034. if iAvgWait > 100 then
  1035. Exit;
  1036. iAvgProc := Max(AverageProcessingTime, 2);
  1037. {$IFDEF DEBUG}
  1038. Trace(Format(
  1039. 'ThreadCount(%D);ThreadsMaxCount(%D);AvgWait(%D);AvgProc(%D);TaskCount(%D);Killing(%D)',
  1040. [FThreads.Count, FThreadsMaxCount, iAvgWait, iAvgProc, i, ThreadKillingCount]));
  1041. {$ENDIF DEBUG}
  1042. //if i * iAvgWait * 2 > iAvgProc * FThreads.Count then
  1043. if ((iAvgProc + iAvgWait) * i > iAvgProc * FThreads.Count) then
  1044. begin
  1045. try
  1046. FThreads.Add(FThreadClass.Create(Self));
  1047. except
  1048. {$IFDEF DEBUG}
  1049. on E: Exception do
  1050. Trace('New thread Exception on ' + E.ClassName + ': ' + E.Message)
  1051. {$ENDIF DEBUG}
  1052. end
  1053. end
  1054. end
  1055. finally
  1056. csThreadManagment.Leave
  1057. end
  1058. end;
  1059. function TCnThreadPool.Info: string;
  1060. begin
  1061. if csThreadManagment.TryEnter then
  1062. begin
  1063. try
  1064. if Assigned(FGetInfo) then
  1065. begin
  1066. FGetInfo(Self, Result)
  1067. end
  1068. else
  1069. DefaultGetInfo(Self, Result)
  1070. finally
  1071. csThreadManagment.Leave
  1072. end;
  1073. end
  1074. else
  1075. begin
  1076. Result := 'Too busy to get info.';
  1077. end;
  1078. end;
  1079. procedure TCnThreadPool.KillDeadThreads;
  1080. var
  1081. i, iLen: Integer;
  1082. LThread: TCnPoolingThread;
  1083. LObjects: array of TCnTaskDataObject;
  1084. begin
  1085. if FinishedThreadsAreFull then Exit;
  1086. iLen := 0;
  1087. SetLength(LObjects, iLen);
  1088. if csThreadManagment.TryEnter then
  1089. try
  1090. for i := FThreads.Count - 1 downto 0 do
  1091. begin
  1092. LThread := TCnPoolingThread(FThreads[i]);
  1093. if LThread.IsDead then
  1094. begin
  1095. if FDeadTaskAsNew then
  1096. begin
  1097. Inc(iLen);
  1098. SetLength(LObjects, iLen);
  1099. LObjects[iLen - 1] := LThread.CloneData;
  1100. end;
  1101. LThread.Terminate(False);
  1102. FThreadsKilling.Add(LThread);
  1103. FThreads.Delete(i);
  1104. // else
  1105. // try
  1106. // FThreads.Add(FThreadClass.Create(Self));
  1107. // except
  1108. //{$IFDEF DEBUG}
  1109. // on E: Exception do
  1110. // Trace('New thread Exception on ' + E.ClassName + ': ' + E.Message)
  1111. //{$ENDIF DEBUG}
  1112. // end
  1113. end
  1114. end
  1115. finally
  1116. csThreadManagment.Leave
  1117. end;
  1118. AddRequests(LObjects, []);
  1119. end;
  1120. function TCnThreadPool.OSIsWin9x: Boolean;
  1121. begin
  1122. Result := FOsIsWin9x;
  1123. end;
  1124. function TCnThreadPool.AverageProcessingTime: Integer;
  1125. var
  1126. i: Integer;
  1127. begin
  1128. Result := 0;
  1129. if FThreads.Count > 0 then
  1130. begin
  1131. for i := 0 to FThreads.Count - 1 do
  1132. Inc(Result, TCnPoolingThread(FThreads[i]).AverageProcessingTime);
  1133. Result := Result div FThreads.Count
  1134. end
  1135. else
  1136. Result := 20
  1137. end;
  1138. function TCnThreadPool.AverageWaitingTime: Integer;
  1139. var
  1140. i: Integer;
  1141. begin
  1142. Result := 0;
  1143. if FThreads.Count > 0 then
  1144. begin
  1145. for i := 0 to FThreads.Count - 1 do
  1146. Inc(Result, TCnPoolingThread(FThreads[i]).AverageWaitingTime);
  1147. Result := Result div FThreads.Count
  1148. end
  1149. else
  1150. Result := 10
  1151. end;
  1152. procedure TCnThreadPool.RemoveRequest(aDataObject: TCnTaskDataObject);
  1153. begin
  1154. csQueueManagment.Enter;
  1155. try
  1156. FQueue.Remove(aDataObject);
  1157. Dec(FTaskCount);
  1158. FreeAndNil(aDataObject)
  1159. finally
  1160. csQueueManagment.Leave
  1161. end
  1162. end;
  1163. procedure TCnThreadPool.SetAdjustInterval(const Value: DWORD);
  1164. var
  1165. DueTo: Int64;
  1166. begin
  1167. FAdjustInterval := Value;
  1168. if hTimReduce <> 0 then
  1169. SetWaitableTimer(hTimReduce, DueTo, Value, nil, nil, False)
  1170. end;
  1171. function TCnThreadPool.TaskCount: Integer;
  1172. begin
  1173. Result := FTaskCount;
  1174. end;
  1175. function TCnThreadPool.ThreadCount: Integer;
  1176. begin
  1177. if csThreadManagment.TryEnter then
  1178. try
  1179. Result := FThreads.Count
  1180. finally
  1181. csThreadManagment.Leave
  1182. end
  1183. else
  1184. Result := -1
  1185. end;
  1186. function TCnThreadPool.ThreadInfo(const i: Integer): string;
  1187. begin
  1188. Result := '';
  1189. if csThreadManagment.TryEnter then
  1190. try
  1191. if i < FThreads.Count then
  1192. Result := TCnPoolingThread(FThreads[i]).Info
  1193. finally
  1194. csThreadManagment.Leave
  1195. end
  1196. end;
  1197. function TCnThreadPool.ThreadKillingCount: Integer;
  1198. begin
  1199. if csThreadManagment.TryEnter then
  1200. try
  1201. Result := FThreadsKilling.Count
  1202. finally
  1203. csThreadManagment.Leave
  1204. end
  1205. else
  1206. Result := -1
  1207. end;
  1208. function TCnThreadPool.ThreadKillingInfo(const i: Integer): string;
  1209. begin
  1210. Result := '';
  1211. if csThreadManagment.TryEnter then
  1212. try
  1213. if i < FThreadsKilling.Count then
  1214. Result := TCnPoolingThread(FThreadsKilling[i]).Info
  1215. finally
  1216. csThreadManagment.Leave;
  1217. end;
  1218. end;
  1219. procedure TCnThreadPool.GetComponentInfo(var AName, Author, Email,
  1220. Comment: string);
  1221. begin
  1222. AName := SCnThreadPoolName;
  1223. Author := SCnPack_Shenloqi;
  1224. Email := SCnPack_ShenloqiEmail;
  1225. Comment := SCnThreadPoolComment;
  1226. end;
  1227. {$IFDEF DEBUG}
  1228. procedure TCnThreadPool.Trace(const Str: string);
  1229. begin
  1230. TraceLog(Str, 0)
  1231. end;
  1232. {$ENDIF DEBUG}
  1233. var
  1234. V: TOSVersionInfo;
  1235. initialization
  1236. V.dwOSVersionInfoSize := SizeOf(V);
  1237. FOSIsWin9x := GetVersionEx(V) and
  1238. (V.dwPlatformId = VER_PLATFORM_WIN32_WINDOWS);
  1239. {$IFDEF DEBUG}
  1240. TraceLog := SimpleTrace;
  1241. {$ENDIF DEBUG}
  1242. finalization
  1243. end.