zmqapi.pas 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805
  1. {
  2. Copyright (c) 2012 Varga Balázs (bb.varga@gmail.com)
  3. This file is part of 0MQ Delphi binding
  4. 0MQ Delphi binding is free software; you can redistribute it and/or
  5. modify it under the terms of the GNU Lesser General Public License as
  6. published by the Free Software Foundation; either version 3 of the
  7. License, or (at your option) any later version.
  8. 0MQ Delphi binding is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. }
  15. unit zmqapi;
  16. {$ifdef FPC}
  17. {$mode delphi}{$H+}
  18. {$endif}
  19. {$I zmq.inc}
  20. interface
  21. uses
  22. {$ifdef UNIX}
  23. BaseUnix,
  24. {$else}
  25. Windows,
  26. {$endif}
  27. Classes
  28. , SysUtils
  29. , zmq
  30. ;
  31. const
  32. ZMQEAGAIN = 11;
  33. {$ifdef UNIX}
  34. ZMQEINTR = ESysEINTR;
  35. {$endif}
  36. type
  37. {$ifdef zmq3}
  38. TZMQMonitorEvent = (
  39. meConnected,
  40. meConnectDelayed,
  41. meConnectRetried,
  42. meListening,
  43. meBindFailed,
  44. meAccepted,
  45. meAcceptFailed,
  46. meClosed,
  47. meCloseFailed,
  48. meDisconnected
  49. );
  50. TZMQMonitorEvents = set of TZMQMonitorEvent;
  51. const
  52. cZMQMonitorEventsAll = [ meConnected,
  53. meConnectDelayed,
  54. meConnectRetried,
  55. meListening,
  56. meBindFailed,
  57. meAccepted,
  58. meAcceptFailed,
  59. meClosed,
  60. meCloseFailed,
  61. meDisconnected
  62. ];
  63. type
  64. {$endif}
  65. UInt64 = Int64;
  66. EZMQException = class( Exception )
  67. private
  68. errnum: Integer;
  69. public
  70. constructor Create; overload;
  71. constructor Create( lerrn: Integer ); overload;
  72. property Num: Integer read errnum;
  73. end;
  74. TZMQContext = class;
  75. TZMQSocket = class;
  76. TZMQSendFlag = ( {$ifdef zmq3}sfDontWait{$else}sfNoBlock{$endif}, sfSndMore );
  77. TZMQSendFlags = set of TZMQSendFlag;
  78. TZMQRecvFlag = ( {$ifdef zmq3}rfDontWait{$else}rfNoBlock{$endif} );
  79. TZMQRecvFlags = set of TZMQRecvFlag;
  80. TZMQMessageProperty = ( mpMore );
  81. TZMQFrame = class
  82. private
  83. fMessage: zmq_msg_t;
  84. function getAsInteger: Integer;
  85. procedure setAsInteger(const Value: Integer);
  86. function getAsHexString: AnsiString;
  87. procedure setAsHexString(const Value: AnsiString);
  88. procedure CheckResult( rc: Integer );
  89. {$ifdef zmq3}
  90. function getProperty( prop: TZMQMessageProperty ): Integer;
  91. procedure setProperty( prop: TZMQMessageProperty; value: Integer );
  92. {$endif}
  93. function getAsUtf8String: Utf8String;
  94. procedure setAsUtf8String(const Value: Utf8String);
  95. public
  96. constructor create; overload;
  97. constructor create( size: size_t ); overload;
  98. constructor create( data: Pointer; size: size_t; ffn: free_fn; hint: Pointer = nil ); overload;
  99. destructor Destroy; override;
  100. procedure rebuild; overload;
  101. procedure rebuild( size: size_t ); overload;
  102. procedure rebuild( data: Pointer; size: size_t; ffn: free_fn; hint: Pointer = nil ); overload;
  103. procedure move( msg: TZMQFrame );
  104. procedure copy( msg: TZMQFrame );
  105. function data: Pointer;
  106. function size: size_t;
  107. {$ifdef zmq3}
  108. function more: Boolean;
  109. {$endif}
  110. function dup: TZMQFrame;
  111. // convert the data into a readable string.
  112. function dump: Utf8String;
  113. // copy the whole content of the stream to the message.
  114. procedure LoadFromStream( strm: TStream );
  115. procedure SaveToStream( strm: TStream );
  116. property asUtf8String: Utf8String read getAsUtf8String write setAsUtf8String;
  117. property asHexString: AnsiString read getAsHexString write setAsHexString;
  118. property asInteger: Integer read getAsInteger write setAsInteger;
  119. end;
  120. // for multipart message
  121. TZMQMsg = class
  122. private
  123. msgs: TList;
  124. csize: Cardinal;
  125. cursor: Integer;
  126. function getItem(indx: Integer): TZMQFrame;
  127. protected
  128. public
  129. constructor create;
  130. destructor Destroy; override;
  131. // Return size of message, i.e. number of frames (0 or more).
  132. function size: Integer;
  133. // Return size of message, i.e. number of frames (0 or more).
  134. function content_size: Integer;
  135. // Push frame to the front of the message, i.e. before all other frames.
  136. // Message takes ownership of frame, will destroy it when message is sent.
  137. // Set the cursor to 0
  138. // Returns 0 on success, -1 on error.
  139. function push( msg: TZMQFrame ): Integer;
  140. function pushstr( str: Utf8String ): Integer;
  141. // Remove first frame from message, if any. Returns frame, or NULL. Caller
  142. // now owns frame and must destroy it when finished with it.
  143. // Set the cursor to 0
  144. function pop: TZMQFrame;
  145. function popstr: Utf8String;
  146. function popint: Integer;
  147. // Add frame to the end of the message, i.e. after all other frames.
  148. // Message takes ownership of frame, will destroy it when message is sent.
  149. // Set the cursor to 0
  150. // Returns 0 on success
  151. function add( msg: TZMQFrame ): Integer;
  152. function addstr( msg: Utf8String ): Integer;
  153. function addint( msg: Integer ): Integer;
  154. // Push frame plus empty frame to front of message, before first frame.
  155. // Message takes ownership of frame, will destroy it when message is sent.
  156. procedure wrap( msg: TZMQFrame );
  157. // Pop frame off front of message, caller now owns frame
  158. // If next frame is empty, pops and destroys that empty frame.
  159. function unwrap: TZMQFrame;
  160. // Remove specified frame from list, if present. Does not destroy frame.
  161. // Set the cursor to 0
  162. procedure remove( msg: TZMQFrame );
  163. // Set cursor to first frame in message. Returns frame, or NULL.
  164. function first: TZMQFrame;
  165. // Return the next frame. If there are no more frames, returns NULL. To move
  166. // to the first frame call zmsg_first(). Advances the cursor.
  167. function next: TZMQFrame;
  168. // Return the last frame. If there are no frames, returns NULL.
  169. // Set the cursor to the last
  170. function last: TZMQFrame;
  171. // Create copy of message, as new message object
  172. function dup: TZMQMsg;
  173. // dumpt message
  174. function dump: Utf8String;
  175. function saveasHex: Utf8String;
  176. procedure loadfromHex( data: Utf8String );
  177. procedure Clear;
  178. property item[indx: Integer]: TZMQFrame read getItem; default;
  179. end;
  180. TZMQSocketType = ( stPair, stPub, stSub, stReq, stRep, stDealer,
  181. stRouter, stPull, stPush, stXPub, stXSub );
  182. TZMQPollEvent = ( pePollIn, pePollOut, pePollErr );
  183. TZMQPollEvents = set of TZMQPollEvent;
  184. {$ifdef zmq3}
  185. TZMQKeepAlive = ( kaDefault, kaFalse, kaTrue );
  186. TZMQEvent = record
  187. event: TZMQMonitorEvent;
  188. addr: AnsiString;
  189. case TZMQMonitorEvent of
  190. meConnected,
  191. meListening,
  192. meAccepted,
  193. meClosed,
  194. meDisconnected:
  195. (
  196. fd: Integer;
  197. );
  198. meConnectDelayed,
  199. meBindFailed,
  200. meAcceptFailed,
  201. meCloseFailed:
  202. (
  203. err: Integer;
  204. );
  205. meConnectRetried: ( //connect_retried
  206. interval: Integer;
  207. );
  208. end;
  209. TZMQMonitorProc = procedure( event: TZMQEvent ) of object;
  210. PZMQMonitorRec = ^TZMQMonitorRec;
  211. TZMQMonitorRec = record
  212. terminated: Boolean;
  213. context: TZMQContext;
  214. addr: AnsiString;
  215. proc: TZMQMonitorProc;
  216. end;
  217. {$endif}
  218. TZMQSocket = class
  219. // low level
  220. protected
  221. fSocket: Pointer;
  222. fContext: TZMQContext;
  223. private
  224. fRaiseEAgain: Boolean;
  225. {$ifdef zmq3}
  226. fAcceptFilter: TStringList;
  227. fMonitorRec: PZMQMonitorRec;
  228. fMonitorThread: THandle;
  229. {$endif}
  230. procedure close;
  231. procedure setSockOpt( option: Integer; optval: Pointer; optvallen: size_t );
  232. procedure getSockOpt( option: Integer; optval: Pointer; var optvallen: size_t );
  233. function send( var msg: TZMQFrame; flags: Integer = 0 ): Integer; overload;
  234. function recv( var msg: TZMQFrame; flags: Integer = 0 ): Integer; overload;
  235. public
  236. procedure bind( addr: AnsiString );
  237. procedure connect( addr: AnsiString );
  238. {$ifdef zmq3}
  239. procedure unbind( addr: AnsiString );
  240. procedure disconnect( addr: AnsiString );
  241. {$endif}
  242. // helpers
  243. private
  244. function CheckResult( rc: Integer ): Integer;
  245. function getSockOptInt64( option: Integer ): Int64;
  246. function getSockOptInteger( option: Integer ): Integer;
  247. procedure setSockOptInt64( option: Integer; const Value: Int64 );
  248. procedure setSockOptInteger( option: Integer; const Value: Integer );
  249. public
  250. constructor Create;
  251. destructor Destroy; override;
  252. function getSocketType: TZMQSocketType;
  253. function getrcvMore: Boolean;
  254. function getRcvTimeout: Integer;
  255. function getSndTimeout: Integer;
  256. function getAffinity: UInt64;
  257. function getIdentity: ShortString;
  258. function getRate: {$ifdef zmq3}Integer{$else}int64{$endif};
  259. function getRecoveryIvl: {$ifdef zmq3}Integer{$else}int64{$endif};
  260. function getSndBuf: {$ifdef zmq3}Integer{$else}UInt64{$endif};
  261. function getRcvBuf: {$ifdef zmq3}Integer{$else}UInt64{$endif};
  262. function getLinger: Integer;
  263. function getReconnectIvl: Integer;
  264. function getReconnectIvlMax: Integer;
  265. function getBacklog: Integer;
  266. function getFD: Pointer;
  267. function getEvents: TZMQPollEvents;
  268. function getHWM: {$ifdef zmq3}Integer{$else}UInt64{$endif};
  269. {$ifdef zmq3}
  270. function getSndHWM: Integer;
  271. function getRcvHWM: Integer;
  272. procedure setSndHWM( const Value: Integer );
  273. procedure setRcvHWM( const Value: Integer );
  274. procedure setMaxMsgSize( const Value: Int64 );
  275. function getMaxMsgSize: Int64;
  276. function getMulticastHops: Integer;
  277. procedure setMulticastHops( const Value: Integer );
  278. function getIPv4Only: Boolean;
  279. procedure setIPv4Only( const Value: Boolean );
  280. function getLastEndpoint: AnsiString;
  281. function getKeepAlive: TZMQKeepAlive;
  282. procedure setKeepAlive( const Value: TZMQKeepAlive );
  283. function getKeepAliveIdle: Integer;
  284. procedure setKeepAliveIdle( const Value: Integer );
  285. function getKeepAliveCnt: Integer;
  286. procedure setKeepAliveCnt( const Value: Integer );
  287. function getKeepAliveIntvl: Integer;
  288. procedure setKeepAliveIntvl( const Value: Integer );
  289. function getAcceptFilter( indx: Integer ): AnsiString;
  290. procedure setAcceptFilter( indx: Integer; const Value: AnsiString );
  291. procedure setRouterMandatory( const Value: Boolean );
  292. {$else}
  293. function getSwap: Int64;
  294. function getRecoveryIvlMSec: Int64;
  295. function getMCastLoop: Int64;
  296. procedure setSwap( const Value: Int64 );
  297. procedure setRecoveryIvlMSec( const Value: Int64 );
  298. procedure setMCastLoop( const Value: Int64 );
  299. {$endif}
  300. procedure setHWM( const Value: {$ifdef zmq3}Integer{$else}UInt64{$endif} );
  301. procedure setRcvTimeout( const Value: Integer );
  302. procedure setSndTimeout( const Value: Integer );
  303. procedure setAffinity( const Value: UInt64 );
  304. procedure setIdentity( const Value: ShortString );
  305. procedure setRate( const Value: {$ifdef zmq3}Integer{$else}int64{$endif} );
  306. procedure setRecoveryIvl( const Value: {$ifdef zmq3}Integer{$else}int64{$endif} );
  307. procedure setSndBuf( const Value: {$ifdef zmq3}Integer{$else}UInt64{$endif} );
  308. procedure setRcvBuf( const Value: {$ifdef zmq3}Integer{$else}UInt64{$endif} );
  309. procedure setLinger( const Value: Integer );
  310. procedure setReconnectIvl( const Value: Integer );
  311. procedure setReconnectIvlMax( const Value: Integer );
  312. procedure setBacklog( const Value: Integer );
  313. procedure Subscribe( filter: AnsiString );
  314. procedure unSubscribe( filter: AnsiString );
  315. function send( var msg: TZMQFrame; flags: TZMQSendFlags = [] ): Integer; overload;
  316. function send( strm: TStream; size: Integer; flags: TZMQSendFlags = [] ): Integer; overload;
  317. function send( msg: Utf8String; flags: TZMQSendFlags = [] ): Integer; overload;
  318. function send( var msgs: TZMQMsg; dontwait: Boolean = false ): Integer; overload;
  319. function send( msg: Array of Utf8String; dontwait: Boolean = false ): Integer; overload;
  320. function send( msg: TStrings; dontwait: Boolean = false ): Integer; overload;
  321. {$ifdef zmq3}
  322. function sendBuffer( const Buffer; len: Size_t; flags: TZMQSendFlags = [] ): Integer;
  323. {$endif}
  324. function recv( msg: TZMQFrame; flags: TZMQRecvFlags = [] ): Integer; overload;
  325. function recv( strm: TStream; flags: TZMQRecvFlags = [] ): Integer; overload;
  326. function recv( var msg: Utf8String; flags: TZMQRecvFlags = [] ): Integer; overload;
  327. function recv( var msgs: TZMQMsg; flags: TZMQRecvFlags = [] ): Integer; overload;
  328. function recv( msg: TStrings; flags: TZMQRecvFlags = [] ): Integer; overload;
  329. {$ifdef zmq3}
  330. function recvBuffer( var Buffer; len: size_t; flags: TZMQRecvFlags = [] ): Integer;
  331. procedure RegisterMonitor( proc: TZMQMonitorProc; events: TZMQMonitorEvents = cZMQMonitorEventsAll );
  332. procedure DeRegisterMonitor;
  333. {$endif}
  334. property SocketType: TZMQSocketType read getSocketType;
  335. property RcvMore: Boolean read getRcvMore;
  336. {$ifdef zmq3}
  337. property SndHWM: Integer read getSndHWM write setSndHwm;
  338. property RcvHWM: Integer read getRcvHWM write setRcvHwm;
  339. property MaxMsgSize: Int64 read getMaxMsgSize write setMaxMsgSize;
  340. property MulticastHops: Integer read getMulticastHops write setMulticastHops;
  341. property IPv4Only: Boolean read getIPv4Only write setIPv4Only;
  342. property LastEndpoint: AnsiString read getLastEndpoint;
  343. property KeepAlive: TZMQKeepAlive read getKeepAlive write setKeepAlive;
  344. property KeepAliveIdle: Integer read getKeepAliveIdle write setKeepAliveIdle;
  345. property KeepAliveCnt: Integer read getKeepAliveCnt write setKeepAliveCnt;
  346. property KeepAliveIntvl: Integer read getKeepAliveIntvl write setKeepAliveIntvl;
  347. procedure AddAcceptFilter( addr: AnsiString );
  348. property AcceptFilter[indx: Integer]: AnsiString read getAcceptFilter write setAcceptFilter;
  349. property RouterMandatory: Boolean write setRouterMandatory;
  350. {$else}
  351. property Swap: Int64 read getSwap write setSwap;
  352. property RecoveryIvlMSec: Int64 read getRecoveryIvlMSec write setRecoveryIvlMSec;
  353. property MCastLoop: Int64 read getMCastLoop write setMCastLoop;
  354. {$endif}
  355. property HWM: {$ifdef zmq3}Integer{$else}UInt64{$endif} read getHWM write setHWM;
  356. property RcvTimeout: Integer read getRcvTimeout write setRcvTimeout;
  357. property SndTimeout: Integer read getSndTimeout write setSndTimeout;
  358. property Affinity: UInt64 read getAffinity write setAffinity;
  359. property Identity: ShortString read getIdentity write setIdentity;
  360. property Rate: {$ifdef zmq3}Integer{$else}int64{$endif} read getRate write setRate;
  361. property RecoveryIvl: {$ifdef zmq3}Integer{$else}int64{$endif} read getRecoveryIvl write setRecoveryIvl;
  362. property SndBuf: {$ifdef zmq3}Integer{$else}UInt64{$endif} read getSndBuf write setSndBuf;
  363. property RcvBuf: {$ifdef zmq3}Integer{$else}UInt64{$endif} read getRcvBuf write setRcvBuf;
  364. property Linger: Integer read getLinger write setLinger;
  365. property ReconnectIvl: Integer read getReconnectIvl write setReconnectIvl;
  366. property ReconnectIvlMax: Integer read getReconnectIvlMax write setReconnectIvlMax;
  367. property Backlog: Integer read getBacklog write setBacklog;
  368. property FD: Pointer read getFD;
  369. property Events: TZMQPollEvents read getEvents;
  370. property Context: TZMQContext read fContext;
  371. property SocketPtr: Pointer read fSocket;
  372. property RaiseEAgain: Boolean read fRaiseEAgain write fRaiseEAgain;
  373. end;
  374. TZMQContext = class
  375. private
  376. fContext: Pointer;
  377. fSockets: TList;
  378. fLinger: Integer;
  379. {$ifdef zmq3}
  380. function getOption( option: Integer ): Integer;
  381. procedure setOption( option, optval: Integer );
  382. function getIOThreads: Integer;
  383. procedure setIOThreads( const Value: Integer );
  384. function getMaxSockets: Integer;
  385. procedure setMaxSockets( const Value: Integer );
  386. {$endif}
  387. protected
  388. fTerminated: Boolean;
  389. fMainThread: Boolean;
  390. constructor createShadow( context: TZMQContext );
  391. procedure CheckResult( rc: Integer );
  392. procedure RemoveSocket( lSocket: TZMQSocket );
  393. public
  394. constructor create{$ifndef zmq3}( io_threads: Integer = 1 ){$endif};
  395. destructor Destroy; override;
  396. function Shadow: TZMQContext;
  397. function Socket( stype: TZMQSocketType ): TZMQSocket;
  398. procedure Terminate;
  399. property ContextPtr: Pointer read fContext;
  400. // < -1 means dont change linger when destroy
  401. property Linger: Integer read fLinger write fLinger;
  402. property Terminated: Boolean read fTerminated;
  403. {$ifdef zmq3}
  404. property IOThreads: Integer read getIOThreads write setIOThreads;
  405. property MaxSockets: Integer read getMaxSockets write setMaxSockets;
  406. {$endif}
  407. end;
  408. type
  409. TZMQFree = zmq.free_fn;
  410. TZMQPollItem = record
  411. socket: TZMQSocket;
  412. events: TZMQPollEvents;
  413. revents: TZMQPollEvents;
  414. end;
  415. TZMQPollItemA = array of TZMQPollItem;
  416. TZMQPollEventProc = procedure( socket: TZMQSocket; event: TZMQPollEvents ) of object;
  417. TZMQExceptionProc = procedure( exception: Exception ) of object;
  418. TZMQPoller = class( TThread )
  419. private
  420. fContext: TZMQContext;
  421. fOwnContext: Boolean;
  422. sPair: TZMQSocket;
  423. fAddr: AnsiString;
  424. fPollItem: array of zmq.pollitem_t;
  425. fPollSocket: array of TZMQSocket;
  426. fPollItemCapacity,
  427. fPollItemCount: Integer;
  428. fTimeOut: Integer;
  429. fPollNumber: Integer;
  430. cs: TRTLCriticalSection;
  431. fSync: Boolean;
  432. fonException: TZMQExceptionProc;
  433. fonTimeOut: TNotifyEvent;
  434. fonEvent: TZMQPollEventProc;
  435. function getPollItem(indx: Integer): TZMQPollItem;
  436. procedure CheckResult( rc: Integer );
  437. procedure AddToPollItems( socket: TZMQSocket; events: TZMQPollEvents );
  438. procedure DelFromPollItems( socket: TZMQSocket; events: TZMQPollEvents; indx: Integer );
  439. function getPollResult(indx: Integer): TZMQPollItem;
  440. protected
  441. procedure Execute; override;
  442. public
  443. constructor Create( lSync: Boolean = false; lContext: TZMQContext = nil );
  444. destructor Destroy; override;
  445. procedure Register( socket: TZMQSocket; events: TZMQPollEvents; bWait: Boolean = false );
  446. procedure Deregister( socket: TZMQSocket; events: TZMQPollEvents; bWait: Boolean = false );
  447. procedure setPollNumber( const Value: Integer; bWait: Boolean = false );
  448. function poll( timeout: Longint = -1; lPollNumber: Integer = -1 ): Integer;
  449. property pollResult[indx: Integer]: TZMQPollItem read getPollResult;
  450. property PollNumber: Integer read fPollNumber;
  451. property PollItem[indx: Integer]: TZMQPollItem read getPollItem;
  452. property onEvent: TZMQPollEventProc read fonEvent write fonEvent;
  453. property onException: TZMQExceptionProc read fonException write fonException;
  454. property onTimeOut: TNotifyEvent read fonTimeOut write fonTimeOut;
  455. end;
  456. TZMQDevice = ( dStreamer, dForwarder, dQueue );
  457. TZMQPollRec = record
  458. socket: TZMQSocket;
  459. events: TZMQPollEvents;
  460. end;
  461. TZMQPollRecA = array of TZMQPollRec;
  462. function ZMQPoll( var pia: TZMQPollItemA; piaSize: Integer = -1; timeout: Integer = -1 ): Integer; overload;
  463. function ZMQPoll( var pia: TZMQPollItem; piaSize: Integer = 1; timeout: Integer = -1 ): Integer; overload;
  464. {$ifdef zmq3}
  465. procedure ZMQProxy( frontend, backend, capture: TZMQSocket );
  466. {$endif}
  467. procedure ZMQDevice( device: TZMQDevice; insocket, outsocket: TZMQSocket );
  468. procedure ZMQVersion(var major, minor, patch: Integer);
  469. procedure ZMQTerminate;
  470. var
  471. ZMQTerminated: Boolean = false;
  472. type
  473. // Thread related functions.
  474. TDetachedThreadMeth = procedure( args: Pointer; context: TZMQContext ) of object;
  475. TAttachedThreadMeth = procedure( args: Pointer; Context: TZMQContext; Pipe: TZMQSocket ) of object;
  476. TDetachedThreadProc = procedure( args: Pointer; context: TZMQContext );
  477. TAttachedThreadProc = procedure( args: Pointer; Context: TZMQContext; Pipe: TZMQSocket );
  478. TZMQThread = class( TThread )
  479. private
  480. //attached thread pipe
  481. fPipe: TZMQSocket;
  482. // attached thread pipe in the new thread.
  483. thrPipe: TZMQSocket;
  484. fDetachedMeth: TDetachedThreadMeth;
  485. fAttachedMeth: TAttachedThreadMeth;
  486. fDetachedProc: TDetachedThreadProc;
  487. fAttachedProc: TAttachedThreadProc;
  488. fContext: TZMQContext;
  489. fArgs: Pointer;
  490. public
  491. constructor Create( lArgs: Pointer; ctx: TZMQContext );
  492. constructor CreateAttached( lAttachedMeth: TAttachedThreadMeth; ctx: TZMQContext; lArgs: Pointer );
  493. constructor CreateDetached( lDetachedMeth: TDetachedThreadMeth; lArgs: Pointer );
  494. constructor CreateAttachedProc( lAttachedProc: TAttachedThreadProc; ctx: TZMQContext; lArgs: Pointer );
  495. constructor CreateDetachedProc( lDetachedProc: TDetachedThreadProc; lArgs: Pointer );
  496. destructor Destroy; override;
  497. protected
  498. procedure Execute; override;
  499. procedure DoExecute; virtual;
  500. public
  501. property Pipe: TZMQSocket read fPipe;
  502. property Args: Pointer read fArgs;
  503. property Context: TZMQContext read fContext;
  504. end;
  505. implementation
  506. var
  507. contexts: TList;
  508. cs: TRTLCriticalSection;
  509. {$ifndef UNIX}
  510. function console_handler( dwCtrlType: DWORD ): BOOL; stdcall; forward;
  511. {$endif}
  512. { EZMQException }
  513. constructor EZMQException.Create;
  514. begin
  515. errnum := zmq_errno;
  516. inherited Create( String( AnsiString( zmq_strerror( errnum ) ) ) );
  517. end;
  518. constructor EZMQException.Create( lerrn: Integer );
  519. begin
  520. errnum := lerrn;
  521. inherited Create( String( AnsiString( zmq_strerror( errnum ) ) ) );
  522. end;
  523. { TZMQMessage }
  524. constructor TZMQFrame.Create;
  525. begin
  526. CheckResult( zmq_msg_init( fMessage ) );
  527. end;
  528. constructor TZMQFrame.Create( size: size_t );
  529. begin
  530. CheckResult( zmq_msg_init_size( fMessage, size ) );
  531. end;
  532. constructor TZMQFrame.Create( data: Pointer; size: size_t;
  533. ffn: free_fn; hint: Pointer );
  534. begin
  535. CheckResult( zmq_msg_init_data( fMessage, data, size, ffn, hint ) );
  536. end;
  537. destructor TZMQFrame.Destroy;
  538. begin
  539. CheckResult( zmq_msg_close( fMessage ) );
  540. inherited;
  541. end;
  542. procedure TZMQFrame.CheckResult( rc: Integer );
  543. begin
  544. if rc = 0 then
  545. begin
  546. // ok
  547. end else
  548. if rc = -1 then
  549. begin
  550. raise EZMQException.Create;
  551. end else
  552. raise EZMQException.Create('Function result is not 0, or -1!');
  553. end;
  554. procedure TZMQFrame.rebuild;
  555. begin
  556. CheckResult( zmq_msg_close( fMessage ) );
  557. CheckResult( zmq_msg_init( fMessage ) );
  558. end;
  559. procedure TZMQFrame.rebuild( size: size_t );
  560. begin
  561. CheckResult( zmq_msg_close( fMessage ) );
  562. CheckResult( zmq_msg_init_size( fMessage, size ) );
  563. end;
  564. procedure TZMQFrame.rebuild( data: Pointer; size: size_t; ffn: free_fn; hint: Pointer = nil );
  565. begin
  566. CheckResult( zmq_msg_close( fMessage ) );
  567. CheckResult( zmq_msg_init_data( fMessage, data, size, ffn, hint ) );
  568. end;
  569. procedure TZMQFrame.move( msg: TZMQFrame );
  570. begin
  571. CheckResult( zmq_msg_move( fMessage, msg.fMessage ) );
  572. end;
  573. procedure TZMQFrame.copy( msg: TZMQFrame );
  574. begin
  575. CheckResult( zmq_msg_copy( fMessage, msg.fMessage ) );
  576. end;
  577. function TZMQFrame.data: Pointer;
  578. begin
  579. result := zmq_msg_data( fMessage );
  580. end;
  581. function TZMQFrame.size: size_t;
  582. begin
  583. result := zmq_msg_size( fMessage );
  584. end;
  585. {$ifdef zmq3}
  586. function TZMQFrame.getProperty( prop: TZMQMessageProperty ): Integer;
  587. begin
  588. result := zmq_msg_get( fMessage, Byte( prop ) );
  589. if result = -1 then
  590. raise EZMQException.Create
  591. else
  592. raise EZMQException.Create( 'zmq_msg_more return value undefined!' );
  593. end;
  594. procedure TZMQFrame.setProperty( prop: TZMQMessageProperty; value: Integer );
  595. begin
  596. CheckResult( zmq_msg_set( fMessage, Byte( prop ), value ) );
  597. end;
  598. function TZMQFrame.more: Boolean;
  599. var
  600. rc: Integer;
  601. begin
  602. rc := zmq_msg_more( fMessage );
  603. if rc = 0 then
  604. result := false else
  605. if rc = 1 then
  606. result := true else
  607. raise EZMQException.Create( 'zmq_msg_more return value undefined!' );
  608. end;
  609. {$endif}
  610. function TZMQFrame.dup: TZMQFrame;
  611. begin
  612. result := TZMQFrame.create( size );
  613. System.Move( data^, result.data^, size );
  614. end;
  615. function TZMQFrame.dump: Utf8String;
  616. var
  617. sUtf8: Utf8String;
  618. iSize: Integer;
  619. begin
  620. // not complete.
  621. iSize := size;
  622. if iSize = 0 then
  623. result := ''
  624. else if AnsiChar(data^) = #0 then
  625. begin
  626. SetLength( sutf8, iSize * 2 );
  627. BinToHex( data, PAnsiChar(sutf8), iSize );
  628. result := sutf8;
  629. end else
  630. result := asUtf8String;
  631. end;
  632. function TZMQFrame.getAsHexString: AnsiString;
  633. begin
  634. SetLength( result, size * 2 );
  635. BinToHex( data, PAnsiChar(result), size );
  636. end;
  637. function TZMQFrame.getAsInteger: Integer;
  638. begin
  639. result := Integer(data^);
  640. end;
  641. function TZMQFrame.getAsUtf8String: Utf8String;
  642. var
  643. t: AnsiString;
  644. begin
  645. SetString( t, PAnsiChar(data), size );
  646. result := t;
  647. end;
  648. procedure TZMQFrame.setAsHexString( const Value: AnsiString );
  649. var
  650. iSize: Integer;
  651. begin
  652. iSize := Length( Value ) div 2;
  653. rebuild( iSize );
  654. HexToBin( PAnsiChar( value ), data, iSize );
  655. end;
  656. procedure TZMQFrame.setAsInteger( const Value: Integer );
  657. var
  658. iSize: Integer;
  659. begin
  660. iSize := SizeOf( Value );
  661. rebuild( iSize );
  662. Integer(data^) := Value;
  663. end;
  664. procedure TZMQFrame.setAsUtf8String( const Value: Utf8String );
  665. var
  666. iSize: Integer;
  667. begin
  668. iSize := Length( Value );
  669. rebuild( iSize );
  670. System.Move( Value[1], data^, iSize );
  671. end;
  672. procedure TZMQFrame.LoadFromStream( strm: TStream );
  673. begin
  674. strm.Position := 0;
  675. if strm.size <> size then
  676. rebuild( strm.Size );
  677. strm.ReadBuffer( data^, strm.Size );
  678. end;
  679. procedure TZMQFrame.SaveToStream( strm: TStream );
  680. begin
  681. strm.WriteBuffer( data^, size );
  682. end;
  683. { TZMQMsg }
  684. constructor TZMQMsg.create;
  685. begin
  686. msgs := TList.Create;
  687. csize := 0;
  688. cursor := 0;
  689. end;
  690. destructor TZMQMsg.Destroy;
  691. begin
  692. Clear;
  693. msgs.Free;
  694. inherited;
  695. end;
  696. function TZMQMsg.size: Integer;
  697. begin
  698. result := msgs.Count;
  699. end;
  700. function TZMQMsg.content_size: Integer;
  701. begin
  702. result := csize;
  703. end;
  704. function TZMQMsg.push( msg: TZMQFrame ): Integer;
  705. begin
  706. try
  707. msgs.Insert( 0, msg );
  708. csize := csize + msg.size;
  709. result := 0;
  710. cursor := 0;
  711. except
  712. result := -1
  713. end;
  714. end;
  715. function TZMQMsg.pushstr( str: Utf8String ): Integer;
  716. var
  717. frm: TZMQFrame;
  718. begin
  719. frm := TZMQFrame.create;
  720. frm.asUtf8String := str;
  721. result := push( frm );
  722. end;
  723. function TZMQMsg.pop: TZMQFrame;
  724. begin
  725. if size > 0 then
  726. begin
  727. result := msgs[0];
  728. csize := csize - result.size;
  729. msgs.Delete( 0 );
  730. cursor := 0;
  731. end else
  732. result := nil;
  733. end;
  734. function TZMQMsg.popstr: Utf8String;
  735. var
  736. frame: TZMQFrame;
  737. begin
  738. frame := pop;
  739. try
  740. result := frame.asUtf8String;
  741. finally
  742. frame.Free;
  743. end;
  744. end;
  745. function TZMQMsg.popint: Integer;
  746. var
  747. frame: TZMQFrame;
  748. begin
  749. frame := pop;
  750. try
  751. result := frame.asInteger;
  752. finally
  753. frame.Free;
  754. end;
  755. end;
  756. function TZMQMsg.add( msg: TZMQFrame ): Integer;
  757. begin
  758. try
  759. msgs.Add( msg );
  760. csize := csize + msg.size;
  761. result := 0;
  762. cursor := 0;
  763. except
  764. result := -1;
  765. end;
  766. end;
  767. function TZMQMsg.addstr( msg: Utf8String ): Integer;
  768. var
  769. frame: TZMQFrame;
  770. begin
  771. frame := TZMQFrame.create;
  772. frame.asUtf8String := msg;
  773. result := add( frame );
  774. end;
  775. function TZMQMsg.addint( msg: Integer ): Integer;
  776. var
  777. frame: TZMQFrame;
  778. begin
  779. frame := TZMQFrame.create( sizeOf( Integer ) );
  780. frame.asInteger := msg;
  781. result := add( frame );
  782. end;
  783. procedure TZMQMsg.wrap( msg: TZMQFrame );
  784. begin
  785. push( TZMQFrame.create( 0 ) );
  786. push( msg );
  787. end;
  788. function TZMQMsg.unwrap: TZMQFrame;
  789. begin
  790. result := pop;
  791. if ( size > 0 ) and ( Item[0].size = 0 ) then
  792. pop.Free;
  793. end;
  794. procedure TZMQMsg.remove( msg: TZMQFrame );
  795. var
  796. i: Integer;
  797. begin
  798. i := msgs.IndexOf( msg );
  799. if i > 0 then
  800. begin
  801. csize := csize - Item[i].size;
  802. msgs.Delete( i );
  803. cursor := 0;
  804. end;
  805. end;
  806. function TZMQMsg.first: TZMQFrame;
  807. begin
  808. if size > 0 then
  809. begin
  810. result := msgs[0];
  811. cursor := 1;
  812. end else begin
  813. result := nil;
  814. cursor := 0;
  815. end;
  816. end;
  817. function TZMQMsg.next: TZMQFrame;
  818. begin
  819. if cursor < size then
  820. begin
  821. result := msgs[cursor];
  822. inc( cursor );
  823. end else
  824. result := nil;
  825. end;
  826. function TZMQMsg.last: TZMQFrame;
  827. begin
  828. if size > 0 then
  829. result := msgs[size - 1]
  830. else
  831. result := nil;
  832. cursor := size;
  833. end;
  834. function TZMQMsg.dup: TZMQMsg;
  835. var
  836. msg,
  837. msgnew: TZMQFrame;
  838. iSize: Integer;
  839. begin
  840. result := TZMQMsg.create;
  841. msg := first;
  842. while msg <> nil do
  843. begin
  844. iSize := msg.size;
  845. msgnew := TZMQFrame.create( iSize );
  846. {$ifdef UNIX}
  847. Move( msg.data^, msgnew.data^, iSize );
  848. {$else}
  849. CopyMemory( msgnew.data, msg.data, iSize );
  850. {$endif}
  851. result.add( msgnew );
  852. msg := next;
  853. end;
  854. result.csize := csize;
  855. result.cursor := cursor;
  856. end;
  857. procedure TZMQMsg.Clear;
  858. var
  859. i: Integer;
  860. begin
  861. for i := 0 to size - 1 do
  862. Item[i].Free;
  863. msgs.Clear;
  864. csize := 0;
  865. cursor := 0;
  866. end;
  867. function TZMQMsg.getItem( indx: Integer ): TZMQFrame;
  868. begin
  869. result := msgs[indx];
  870. end;
  871. function TZMQMsg.dump: Utf8String;
  872. var
  873. i: Integer;
  874. begin
  875. result := '';
  876. for i := 0 to size - 1 do
  877. begin
  878. if i > 0 then
  879. result := result + #13 + #10;
  880. result := result + item[i].dump;
  881. end;
  882. end;
  883. function TZMQMsg.saveasHex: Utf8String;
  884. var
  885. i: Integer;
  886. begin
  887. for i := 0 to size - 1 do
  888. begin
  889. result := result + item[i].asHexString;
  890. if i < size - 1 then
  891. result := result + #13 + #10;
  892. end;
  893. end;
  894. procedure TZMQMsg.loadfromHex( data: Utf8String );
  895. var
  896. tsl: TStringList;
  897. i: Integer;
  898. frame: TZMQFrame;
  899. begin
  900. Clear;
  901. tsl := TStringList.Create;
  902. try
  903. tsl.Text := data;
  904. for i := 0 to tsl.Count - 1 do
  905. begin
  906. frame := TZMQFrame.create;
  907. frame.asHexString := tsl[i];
  908. add( frame );
  909. end;
  910. finally
  911. tsl.Free;
  912. end;
  913. end;
  914. { TZMQSocket }
  915. constructor TZMQSocket.Create;
  916. begin
  917. fRaiseEAgain := False;
  918. {$ifdef zmq3}
  919. fAcceptFilter := TStringList.Create;
  920. fMonitorRec := nil;
  921. {$endif}
  922. end;
  923. destructor TZMQSocket.destroy;
  924. begin
  925. {$ifdef zmq3}
  926. if fMonitorRec <> nil then
  927. DeRegisterMonitor;
  928. {$endif}
  929. close;
  930. fContext.RemoveSocket( Self );
  931. {$ifdef zmq3}
  932. fAcceptFilter.Free;
  933. {$endif}
  934. inherited;
  935. end;
  936. procedure TZMQSocket.close;
  937. begin
  938. if SocketPtr = nil then
  939. exit;
  940. CheckResult( zmq_close( SocketPtr ) );
  941. fSocket := nil;
  942. end;
  943. function TZMQSocket.CheckResult( rc: Integer ): Integer;
  944. var
  945. errn: Integer;
  946. begin
  947. result := rc;
  948. if rc = -1 then
  949. begin
  950. errn := zmq_errno;
  951. if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
  952. raise EZMQException.Create( errn );
  953. end else
  954. if rc <> 0 then
  955. raise EZMQException.Create('Function result is not 0, or -1!');
  956. end;
  957. procedure TZMQSocket.setSockOpt( option: Integer; optval: Pointer;
  958. optvallen: size_t );
  959. begin
  960. CheckResult( zmq_setsockopt( SocketPtr, option, optval, optvallen ) );
  961. end;
  962. procedure TZMQSocket.getSockOpt( option: Integer; optval: Pointer; var optvallen: size_t );
  963. begin
  964. CheckResult( zmq_getsockopt( SocketPtr, option, optval, optvallen ) );
  965. end;
  966. procedure TZMQSocket.bind( addr: AnsiString );
  967. begin
  968. CheckResult( zmq_bind( SocketPtr, PAnsiChar( addr ) ) );
  969. end;
  970. procedure TZMQSocket.connect( addr: AnsiString );
  971. begin
  972. CheckResult( zmq_connect( SocketPtr, PAnsiChar( addr ) ) );
  973. end;
  974. {$ifdef zmq3}
  975. procedure TZMQSocket.unbind( addr: AnsiString );
  976. begin
  977. CheckResult( zmq_unbind( SocketPtr, PAnsiChar( addr ) ) );
  978. end;
  979. procedure TZMQSocket.disconnect( addr: AnsiString );
  980. begin
  981. CheckResult( zmq_disconnect( SocketPtr, PAnsiChar( addr ) ) );
  982. end;
  983. {$endif}
  984. function TZMQSocket.getSockOptInt64( option: Integer ): Int64;
  985. var
  986. optvallen: size_t;
  987. begin
  988. optvallen := SizeOf( result );
  989. getSockOpt( option, @result, optvallen );
  990. end;
  991. function TZMQSocket.getSockOptInteger( option: Integer ): Integer;
  992. var
  993. optvallen: size_t;
  994. begin
  995. optvallen := SizeOf( result );
  996. getSockOpt( option, @result, optvallen );
  997. end;
  998. procedure TZMQSocket.setSockOptInt64( option: Integer; const Value: Int64 );
  999. var
  1000. optvallen: size_t;
  1001. begin
  1002. optvallen := SizeOf( Value );
  1003. setSockOpt( option, @Value, optvallen );
  1004. end;
  1005. procedure TZMQSocket.setSockOptInteger( option: Integer; const Value: Integer );
  1006. var
  1007. optvallen: size_t;
  1008. begin
  1009. optvallen := SizeOf( Value );
  1010. setSockOpt( option, @Value, optvallen );
  1011. end;
  1012. function TZMQSocket.getSocketType: TZMQSocketType;
  1013. begin
  1014. Result := TZMQSocketType( getSockOptInteger( ZMQ_TYPE ) );
  1015. end;
  1016. function TZMQSocket.getRcvMore: Boolean;
  1017. begin
  1018. {$ifdef zmq3}
  1019. result := getSockOptInteger( ZMQ_RCVMORE ) = 1;
  1020. {$else}
  1021. result := getSockOptInt64( ZMQ_RCVMORE ) = 1;
  1022. {$endif}
  1023. end;
  1024. function TZMQSocket.getRcvTimeout: Integer;
  1025. begin
  1026. result := getSockOptInteger( ZMQ_RCVTIMEO );
  1027. end;
  1028. function TZMQSocket.getSndTimeout: Integer;
  1029. begin
  1030. result := getSockOptInteger( ZMQ_SNDTIMEO );
  1031. end;
  1032. function TZMQSocket.getAffinity: UInt64;
  1033. begin
  1034. result := getSockOptInt64( ZMQ_AFFINITY );
  1035. end;
  1036. function TZMQSocket.getIdentity: ShortString;
  1037. var
  1038. s: ShortString;
  1039. optvallen: size_t;
  1040. begin
  1041. optvallen := 255;
  1042. getSockOpt( ZMQ_IDENTITY, @s[1], optvallen );
  1043. SetLength( s, optvallen );
  1044. result := s;
  1045. end;
  1046. function TZMQSocket.getRate: {$ifdef zmq3}Integer{$else}int64{$endif};
  1047. begin
  1048. {$ifdef zmq3}
  1049. result := getSockOptInteger( ZMQ_RATE );
  1050. {$else}
  1051. result := getSockOptInt64( ZMQ_RATE );
  1052. {$endif}
  1053. end;
  1054. function TZMQSocket.getRecoveryIVL: {$ifdef zmq3}Integer{$else}int64{$endif};
  1055. begin
  1056. {$ifdef zmq3}
  1057. result := getSockOptInteger( ZMQ_RECOVERY_IVL );
  1058. {$else}
  1059. result := getSockOptInt64( ZMQ_RECOVERY_IVL );
  1060. {$endif}
  1061. end;
  1062. function TZMQSocket.getSndBuf: {$ifdef zmq3}Integer{$else}UInt64{$endif};
  1063. begin
  1064. {$ifdef zmq3}
  1065. result := getSockOptInteger( ZMQ_SNDBUF );
  1066. {$else}
  1067. result := getSockOptInt64( ZMQ_SNDBUF );
  1068. {$endif}
  1069. end;
  1070. function TZMQSocket.getRcvBuf: {$ifdef zmq3}Integer{$else}UInt64{$endif};
  1071. begin
  1072. {$ifdef zmq3}
  1073. result := getSockOptInteger( ZMQ_RCVBUF );
  1074. {$else}
  1075. result := getSockOptInt64( ZMQ_RCVBUF );
  1076. {$endif}
  1077. end;
  1078. function TZMQSocket.getLinger: Integer;
  1079. begin
  1080. result := getSockOptInteger( ZMQ_LINGER );
  1081. end;
  1082. function TZMQSocket.getReconnectIvl: Integer;
  1083. begin
  1084. result := getSockOptInteger( ZMQ_RECONNECT_IVL );
  1085. end;
  1086. function TZMQSocket.getReconnectIvlMax: Integer;
  1087. begin
  1088. result := getSockOptInteger( ZMQ_RECONNECT_IVL_MAX );
  1089. end;
  1090. function TZMQSocket.getBacklog: Integer;
  1091. begin
  1092. result := getSockOptInteger( ZMQ_BACKLOG );
  1093. end;
  1094. function TZMQSocket.getFD: Pointer;
  1095. var
  1096. optvallen: size_t;
  1097. begin
  1098. // Not sure this works, haven't tested.
  1099. optvallen := SizeOf( result );
  1100. getSockOpt( ZMQ_FD, @result, optvallen );
  1101. end;
  1102. function TZMQSocket.getEvents: TZMQPollEvents;
  1103. var
  1104. optvallen: size_t;
  1105. i: Cardinal;
  1106. begin
  1107. optvallen := SizeOf( i );
  1108. getSockOpt( ZMQ_EVENTS, @i, optvallen );
  1109. Result := TZMQPollEvents( Byte(i) );
  1110. end;
  1111. function TZMQSocket.getHWM: {$ifdef zmq3}Integer{$else}UInt64{$endif};
  1112. begin
  1113. {$ifdef zmq3}
  1114. result := RcvHWM;
  1115. // warning deprecated.
  1116. {$else}
  1117. result := getSockOptInt64( ZMQ_HWM );
  1118. {$endif}
  1119. end;
  1120. {$ifdef zmq3}
  1121. function TZMQSocket.getSndHWM: Integer;
  1122. begin
  1123. result := getSockOptInteger( ZMQ_SNDHWM );
  1124. end;
  1125. function TZMQSocket.getRcvHWM: Integer;
  1126. begin
  1127. result := getSockOptInteger( ZMQ_RCVHWM );
  1128. end;
  1129. procedure TZMQSocket.setSndHWM( const Value: Integer );
  1130. begin
  1131. setSockOptInteger( ZMQ_SNDHWM, Value );
  1132. end;
  1133. procedure TZMQSocket.setRcvHWM( const Value: Integer );
  1134. begin
  1135. setSockOptInteger( ZMQ_RCVHWM, Value );
  1136. end;
  1137. procedure TZMQSocket.setMaxMsgSize( const Value: Int64 );
  1138. begin
  1139. setSockOptInt64( ZMQ_MAXMSGSIZE, Value );
  1140. end;
  1141. function TZMQSocket.getMaxMsgSize: Int64;
  1142. begin
  1143. result := getSockOptInt64( ZMQ_MAXMSGSIZE );
  1144. end;
  1145. function TZMQSocket.getMulticastHops: Integer;
  1146. begin
  1147. result := getSockOptInteger( ZMQ_MULTICAST_HOPS );
  1148. end;
  1149. procedure TZMQSocket.setMulticastHops( const Value: Integer );
  1150. begin
  1151. setSockOptInteger( ZMQ_MULTICAST_HOPS, Value );
  1152. end;
  1153. function TZMQSocket.getIPv4Only: Boolean;
  1154. begin
  1155. result := getSockOptInteger( ZMQ_IPV4ONLY ) <> 0;
  1156. end;
  1157. procedure TZMQSocket.setIPv4Only( const Value: Boolean );
  1158. begin
  1159. setSockOptInteger( ZMQ_IPV4ONLY, Integer(Value) );
  1160. end;
  1161. function TZMQSocket.getLastEndpoint: AnsiString;
  1162. var
  1163. s: ShortString;
  1164. optvallen: size_t;
  1165. begin
  1166. optvallen := 255;
  1167. getSockOpt( ZMQ_LAST_ENDPOINT, @s[1], optvallen );
  1168. SetLength( s, optvallen - 1);
  1169. result := s;
  1170. end;
  1171. function TZMQSocket.getKeepAlive: TZMQKeepAlive;
  1172. begin
  1173. result := TZMQKeepAlive( getSockOptInteger( ZMQ_TCP_KEEPALIVE ) + 1 );
  1174. end;
  1175. procedure TZMQSocket.setKeepAlive( const Value: TZMQKeepAlive );
  1176. begin
  1177. setSockOptInteger( ZMQ_TCP_KEEPALIVE, Byte(Value) - 1 );
  1178. end;
  1179. function TZMQSocket.getKeepAliveIdle: Integer;
  1180. begin
  1181. result := getSockOptInteger( ZMQ_TCP_KEEPALIVE_IDLE );
  1182. end;
  1183. procedure TZMQSocket.setKeepAliveIdle( const Value: Integer );
  1184. begin
  1185. setSockOptInteger( ZMQ_TCP_KEEPALIVE_IDLE, Value );
  1186. end;
  1187. function TZMQSocket.getKeepAliveCnt: Integer;
  1188. begin
  1189. result := getSockOptInteger( ZMQ_TCP_KEEPALIVE_CNT );
  1190. end;
  1191. procedure TZMQSocket.setKeepAliveCnt( const Value: Integer );
  1192. begin
  1193. setSockOptInteger( ZMQ_TCP_KEEPALIVE_CNT, Value );
  1194. end;
  1195. function TZMQSocket.getKeepAliveIntvl: Integer;
  1196. begin
  1197. result := getSockOptInteger( ZMQ_TCP_KEEPALIVE_INTVL );
  1198. end;
  1199. procedure TZMQSocket.setKeepAliveIntvl( const Value: Integer );
  1200. begin
  1201. setSockOptInteger( ZMQ_TCP_KEEPALIVE_INTVL, Value );
  1202. end;
  1203. procedure TZMQSocket.AddAcceptFilter( addr: AnsiString );
  1204. begin
  1205. try
  1206. setSockOpt( ZMQ_TCP_ACCEPT_FILTER, @addr[1], Length( addr ) );
  1207. fAcceptFilter.Add( addr );
  1208. except
  1209. raise;
  1210. end;
  1211. end;
  1212. function TZMQSocket.getAcceptFilter( indx: Integer ): AnsiString;
  1213. begin
  1214. if ( indx < 0 ) or ( indx >= fAcceptFilter.Count ) then
  1215. raise EZMQException.Create( '[getAcceptFilter] Index out of bounds.' );
  1216. result := fAcceptFilter[indx];
  1217. end;
  1218. procedure TZMQSocket.setAcceptFilter( indx: Integer; const Value: AnsiString );
  1219. var
  1220. i,num: Integer;
  1221. begin
  1222. num := 0;
  1223. if ( indx < 0 ) or ( indx >= fAcceptFilter.Count ) then
  1224. raise EZMQException.Create( '[getAcceptFilter] Index out of bounds.' );
  1225. setSockOpt( ZMQ_TCP_ACCEPT_FILTER, nil, 0 );
  1226. for i := 0 to fAcceptFilter.Count - 1 do
  1227. begin
  1228. try
  1229. if i <> indx then
  1230. setSockOpt( ZMQ_TCP_ACCEPT_FILTER, @fAcceptFilter[i][1], Length( fAcceptFilter[i] ) )
  1231. else begin
  1232. setSockOpt( ZMQ_TCP_ACCEPT_FILTER, @Value[1], Length( Value ) );
  1233. fAcceptFilter[i] := Value;
  1234. end;
  1235. except
  1236. on e: EZMQException do
  1237. begin
  1238. num := e.Num;
  1239. if i = indx then
  1240. setSockOpt( ZMQ_TCP_ACCEPT_FILTER, @fAcceptFilter[i][1], Length( fAcceptFilter[i] ) )
  1241. end else
  1242. raise;
  1243. end;
  1244. end;
  1245. if num <> 0 then
  1246. raise EZMQException.Create( num );
  1247. end;
  1248. procedure TZMQSocket.setRouterMandatory( const Value: Boolean );
  1249. var
  1250. i: Integer;
  1251. begin
  1252. if Value then
  1253. i := 1
  1254. else
  1255. i := 0;
  1256. setSockOptInteger( ZMQ_ROUTER_MANDATORY, i );
  1257. end;
  1258. {$else}
  1259. function TZMQSocket.getSwap: Int64;
  1260. begin
  1261. result := getSockOptInt64( ZMQ_SWAP );
  1262. end;
  1263. function TZMQSocket.getRecoveryIVLMSec: Int64;
  1264. begin
  1265. result := getSockOptInt64( ZMQ_RECOVERY_IVL_MSEC );
  1266. end;
  1267. function TZMQSocket.getMCastLoop: Int64;
  1268. begin
  1269. result := getSockOptInt64( ZMQ_MCAST_LOOP );
  1270. end;
  1271. procedure TZMQSocket.setSwap( const Value: Int64 );
  1272. begin
  1273. setSockOptInt64( ZMQ_SWAP, Value );
  1274. end;
  1275. procedure TZMQSocket.setRecoveryIvlMSec( const Value: Int64 );
  1276. begin
  1277. setSockOptInt64( ZMQ_RECOVERY_IVL_MSEC, Value );
  1278. end;
  1279. procedure TZMQSocket.setMCastLoop( const Value: Int64 );
  1280. begin
  1281. setSockOptInt64( ZMQ_MCAST_LOOP, Value );
  1282. end;
  1283. {$endif}
  1284. procedure TZMQSocket.setHWM( const Value: {$ifdef zmq3}Integer{$else}UInt64{$endif} );
  1285. begin
  1286. {$ifdef zmq3}
  1287. SndHWM := Value;
  1288. RcvHWM := Value;
  1289. {$else}
  1290. setSockOptInt64( ZMQ_HWM, Value );
  1291. {$endif}
  1292. end;
  1293. procedure TZMQSocket.setAffinity( const Value: UInt64 );
  1294. begin
  1295. setSockOptInt64( ZMQ_AFFINITY, Value );
  1296. end;
  1297. procedure TZMQSocket.setIdentity( const Value: ShortString );
  1298. begin
  1299. setSockOpt( ZMQ_IDENTITY, @Value[1], Length( Value ) );
  1300. end;
  1301. procedure TZMQSocket.setRcvTimeout( const Value: Integer );
  1302. begin
  1303. setSockOptInteger( ZMQ_RCVTIMEO, Value );
  1304. end;
  1305. procedure TZMQSocket.setSndTimeout( const Value: Integer );
  1306. begin
  1307. setSockOptInteger( ZMQ_SNDTIMEO, Value );
  1308. end;
  1309. procedure TZMQSocket.setRate( const Value: {$ifdef zmq3}Integer{$else}int64{$endif} );
  1310. begin
  1311. {$ifdef zmq3}
  1312. setSockOptInteger( ZMQ_RATE, Value );
  1313. {$else}
  1314. setSockOptInt64( ZMQ_RATE, Value );
  1315. {$endif}
  1316. end;
  1317. procedure TZMQSocket.setRecoveryIvl( const Value: {$ifdef zmq3}Integer{$else}int64{$endif} );
  1318. begin
  1319. {$ifdef zmq3}
  1320. setSockOptInteger( ZMQ_RECOVERY_IVL, Value );
  1321. {$else}
  1322. setSockOptInt64( ZMQ_RECOVERY_IVL, Value );
  1323. {$endif}
  1324. end;
  1325. procedure TZMQSocket.setSndBuf( const Value: {$ifdef zmq3}Integer{$else}UInt64{$endif} );
  1326. begin
  1327. {$ifdef zmq3}
  1328. setSockOptInteger( ZMQ_SNDBUF, Value );
  1329. {$else}
  1330. setSockOptInt64( ZMQ_SNDBUF, Value );
  1331. {$endif}
  1332. end;
  1333. procedure TZMQSocket.setRcvBuf( const Value: {$ifdef zmq3}Integer{$else}UInt64{$endif} );
  1334. begin
  1335. {$ifdef zmq3}
  1336. setSockOptInteger( ZMQ_RCVBUF, Value );
  1337. {$else}
  1338. setSockOptInt64( ZMQ_RCVBUF, Value );
  1339. {$endif}
  1340. end;
  1341. procedure TZMQSocket.setLinger( const Value: Integer );
  1342. begin
  1343. setSockOptInteger( ZMQ_LINGER, Value );
  1344. end;
  1345. procedure TZMQSocket.setReconnectIvl( const Value: Integer );
  1346. begin
  1347. setSockOptInteger( ZMQ_RECONNECT_IVL, Value );
  1348. end;
  1349. procedure TZMQSocket.setReconnectIvlMax( const Value: Integer );
  1350. begin
  1351. setSockOptInteger( ZMQ_RECONNECT_IVL_MAX, Value );
  1352. end;
  1353. procedure TZMQSocket.setBacklog( const Value: Integer );
  1354. begin
  1355. setSockOptInteger( ZMQ_BACKLOG, Value );
  1356. end;
  1357. procedure TZMQSocket.subscribe( filter: AnsiString );
  1358. begin
  1359. if filter = '' then
  1360. setSockOpt( ZMQ_SUBSCRIBE, nil, 0 )
  1361. else
  1362. setSockOpt( ZMQ_SUBSCRIBE, @filter[1], Length( filter ) );
  1363. end;
  1364. procedure TZMQSocket.unSubscribe( filter: AnsiString );
  1365. begin
  1366. if filter = '' then
  1367. setSockOpt( ZMQ_UNSUBSCRIBE, nil, 0 )
  1368. else
  1369. setSockOpt( ZMQ_UNSUBSCRIBE, @filter[1], Length( filter ) );
  1370. end;
  1371. {$ifdef zmq3}
  1372. function TZMQSocket.sendBuffer( const Buffer; len: Size_t; flags: TZMQSendFlags = [] ): Integer;
  1373. var
  1374. errn: Integer;
  1375. begin
  1376. result := zmq_send( SocketPtr, Buffer, len, Byte( flags ) );
  1377. if result < -1 then
  1378. raise EZMQException.Create('zmq_send return value less than -1.')
  1379. else if result = -1 then
  1380. begin
  1381. errn := zmq_errno;
  1382. if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
  1383. raise EZMQException.Create( errn );
  1384. end;
  1385. end;
  1386. {$endif}
  1387. // sends the msg, and FreeAndNils it if successful. the return value is the number of
  1388. // bytes in the msg if successful, if not returns -1, and the msgs is not discarded.
  1389. function TZMQSocket.send( var msg: TZMQFrame; flags: Integer = 0 ): Integer;
  1390. var
  1391. errn: Integer;
  1392. begin
  1393. {$ifdef zmq3}
  1394. result := zmq_sendmsg( SocketPtr, msg.fMessage, flags );
  1395. //result := zmq_msg_send( msg.fMessage, SocketPtr, flags );
  1396. if result < -1 then
  1397. raise EZMQException.Create('zmq_sendmsg return value less than -1.')
  1398. else if result = -1 then
  1399. begin
  1400. errn := zmq_errno;
  1401. if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
  1402. raise EZMQException.Create( errn );
  1403. end else
  1404. FreeAndNil( msg );
  1405. {$else}
  1406. result := msg.size;
  1407. try
  1408. if CheckResult( zmq_send( SocketPtr, msg.fMessage, flags ) ) = 0 then
  1409. FreeAndNil( msg )
  1410. else
  1411. result := -1;
  1412. except
  1413. on e: Exception do
  1414. begin
  1415. result := -1;
  1416. raise;
  1417. end;
  1418. end;
  1419. {$endif}
  1420. end;
  1421. // send single or multipart message, in blocking or nonblocking mode,
  1422. // depending on the flags.
  1423. function TZMQSocket.send( var msg: TZMQFrame; flags: TZMQSendFlags = [] ): Integer;
  1424. begin
  1425. result := send( msg, Byte( flags ) );
  1426. end;
  1427. // send single or multipart message, in blocking or nonblocking mode,
  1428. // depending on the flags.
  1429. function TZMQSocket.send( strm: TStream; size: Integer; flags: TZMQSendFlags = [] ): Integer;
  1430. var
  1431. frame: TZMQFrame;
  1432. begin
  1433. frame := TZMQFrame.Create( size );
  1434. try
  1435. strm.Read( frame.data^, size );
  1436. result := send( frame, flags );
  1437. finally
  1438. if frame <> nil then
  1439. frame.Free;
  1440. end;
  1441. end;
  1442. // send single or multipart message, in blocking or nonblocking mode,
  1443. // depending on the flags.
  1444. function TZMQSocket.send( msg: Utf8String; flags: TZMQSendFlags = [] ): Integer;
  1445. var
  1446. frame: TZMQFrame;
  1447. begin
  1448. frame := TZMQFrame.create;
  1449. try
  1450. frame.asUtf8String := msg;
  1451. result := send( frame, flags );
  1452. finally
  1453. if frame <> nil then
  1454. frame.Free;
  1455. end;
  1456. end;
  1457. // sends multipart message, the result is the successfully sent frame count.
  1458. function TZMQSocket.send( var msgs: TZMQMsg; dontwait: Boolean = false ): Integer;
  1459. var
  1460. flags: TZMQSendFlags;
  1461. frame: TZMQFrame;
  1462. rc: Integer;
  1463. begin
  1464. Result := 0;
  1465. if dontwait then
  1466. flags := [{$ifdef zmq3}sfDontWait{$else}sfNoBlock{$endif}]
  1467. else
  1468. flags := [];
  1469. while msgs.size > 0 do
  1470. begin
  1471. frame := msgs.pop;
  1472. if msgs.size = 0 then
  1473. rc := send( frame, flags )
  1474. else
  1475. rc := send( frame, flags + [sfSndMore] );
  1476. if rc = -1 then
  1477. begin
  1478. result := -1;
  1479. break;
  1480. end else
  1481. inc( result )
  1482. end;
  1483. if result <> -1 then
  1484. FreeAndNil( msgs );
  1485. end;
  1486. // send multipart message in blocking or nonblocking mode, depending on the
  1487. // dontwait parameter. The return value is the nmber of messages sent if
  1488. // successful, if not return -1, and may raise an exception.
  1489. function TZMQSocket.send( msg: Array of Utf8String; dontwait: Boolean = false ): Integer;
  1490. var
  1491. msgs: TZMQMsg;
  1492. frame: TZMQFrame;
  1493. i: Integer;
  1494. begin
  1495. msgs := TZMQMsg.create;
  1496. try
  1497. for i := 0 to Length( msg ) - 1 do
  1498. begin
  1499. frame := TZMQFrame.create;
  1500. frame.asUtf8String := msg[i];
  1501. msgs.add( frame );
  1502. end;
  1503. result := send( msgs, dontwait );
  1504. finally
  1505. if msgs <> nil then
  1506. msgs.Free;
  1507. end;
  1508. end;
  1509. // send multipart message in blocking or nonblocking mode, depending on the
  1510. // dontwait parameter.
  1511. function TZMQSocket.send( msg: TStrings; dontwait: Boolean = false ): Integer;
  1512. var
  1513. msgs: TZMQMsg;
  1514. frame: TZMQFrame;
  1515. i: Integer;
  1516. begin
  1517. msgs := TZMQMsg.create;
  1518. try
  1519. for i := 0 to msg.Count - 1 do
  1520. begin
  1521. frame := TZMQFrame.create;
  1522. frame.asUtf8String := msg[i];
  1523. msgs.add( frame );
  1524. end;
  1525. result := send( msgs, dontwait );
  1526. finally
  1527. if msgs <> nil then
  1528. msgs.Free;
  1529. end;
  1530. end;
  1531. {$ifdef zmq3}
  1532. function TZMQSocket.recvBuffer( var Buffer; len: size_t; flags: TZMQRecvFlags = [] ): Integer;
  1533. var
  1534. errn: Integer;
  1535. begin
  1536. result := zmq_recv( SocketPtr, Buffer, len, Byte( flags ) );
  1537. if result < -1 then
  1538. raise EZMQException.Create('zmq_recv return value less than -1.')
  1539. else if result = -1 then
  1540. begin
  1541. errn := zmq_errno;
  1542. if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
  1543. raise EZMQException.Create( errn );
  1544. end;
  1545. end;
  1546. procedure MonitorProc( ZMQMonitorRec: PZMQMonitorRec );
  1547. var
  1548. socket: TZMQSocket;
  1549. msg: TZMQFrame;
  1550. msgsize: Integer;
  1551. event: zmq_event_t;
  1552. zmqEvent: TZMQEvent;
  1553. i: Integer;
  1554. begin
  1555. socket := ZMQMonitorRec.context.Socket( stPair );
  1556. socket.RcvTimeout := 100; // 1 sec.
  1557. socket.connect( ZMQMonitorRec.Addr );
  1558. msg := TZMQFrame.create;
  1559. while not ZMQMonitorRec.Terminated do
  1560. begin
  1561. try
  1562. msgsize := socket.recv( msg, [] );
  1563. if msgsize > -1 then
  1564. begin
  1565. {$ifdef UNIX}
  1566. Move( msg.data^, event, SizeOf(event) );
  1567. {$else}
  1568. CopyMemory( @event, msg.data, SizeOf(event) );
  1569. {$endif}
  1570. i := 0;
  1571. while event.event <> 0 do
  1572. begin
  1573. event.event := event.event shr 1;
  1574. inc( i );
  1575. end;
  1576. zmqEvent.event := TZMQMonitorEvent( i - 1 );
  1577. zmqEvent.addr := event.addr;
  1578. zmqEvent.fd := event.fd;
  1579. ZMQMonitorRec.proc( zmqEvent );
  1580. msg.rebuild;
  1581. end;
  1582. except
  1583. on e: EZMQException do
  1584. if e.Num <> ZMQEAGAIN then
  1585. raise;
  1586. end;
  1587. end;
  1588. msg.Free;
  1589. socket.Free;
  1590. end;
  1591. procedure TZMQSocket.RegisterMonitor( proc: TZMQMonitorProc; events: TZMQMonitorEvents = cZMQMonitorEventsAll );
  1592. var
  1593. {$ifdef UNIX}
  1594. tid: QWord;
  1595. {$else}
  1596. tid: Cardinal;
  1597. {$endif}
  1598. begin
  1599. if fMonitorRec <> nil then
  1600. DeRegisterMonitor;
  1601. New( fMonitorRec );
  1602. fMonitorRec.Terminated := False;
  1603. fMonitorRec.context := fContext;
  1604. fMonitorRec.Addr := 'inproc://monitor.' + IntToHex( Integer( SocketPtr ),8 );
  1605. fMonitorRec.Proc := proc;
  1606. CheckResult( zmq_socket_monitor( SocketPtr,
  1607. PAnsiChar( AnsiString( fMonitorRec.Addr ) ), Word( events ) ) );
  1608. fMonitorThread := BeginThread( nil, 0, @MonitorProc, fMonitorRec, 0, tid );
  1609. sleep(1);
  1610. end;
  1611. procedure TZMQSocket.DeRegisterMonitor;
  1612. var
  1613. rc: Cardinal;
  1614. begin
  1615. {$ifdef UNIX}
  1616. raise Exception.Create(Self.ClassName+'.DeRegisterMonitor not implemented');
  1617. { TODO : implement equivalent to WaitForSingleObject like pthread_join() ? }
  1618. {$else}
  1619. if fMonitorRec <> nil then
  1620. begin
  1621. fMonitorRec.Terminated := True;
  1622. rc := WaitForSingleObject( fMonitorThread, INFINITE );
  1623. if rc = WAIT_FAILED then
  1624. raise Exception.Create( 'error in WaitForSingleObject for Monitor Thread' );
  1625. CheckResult( zmq_socket_monitor( SocketPtr, nil ,0 ) );
  1626. Dispose( fMonitorRec );
  1627. fMonitorRec := nil;
  1628. end;
  1629. {$endif}
  1630. end;
  1631. {$endif}
  1632. function TZMQSocket.recv( var msg: TZMQFrame; flags: Integer = 0 ): Integer;
  1633. var
  1634. errn: Integer;
  1635. begin
  1636. if msg = nil then
  1637. msg := TZMQFrame.Create;
  1638. if msg.size > 0 then
  1639. msg.rebuild;
  1640. {$ifdef zmq3}
  1641. result := zmq_recvmsg( SocketPtr, msg.fMessage, flags );
  1642. // result := zmq_msg_recv( msg.fMessage, SocketPtr, flags );
  1643. if result < -1 then
  1644. raise EZMQException.Create('zmq_recvmsg return value less than -1.')
  1645. else if result = -1 then
  1646. begin
  1647. errn := zmq_errno;
  1648. if ( errn <> ZMQEAGAIN ) or fRaiseEAgain then
  1649. raise EZMQException.Create( errn );
  1650. end;
  1651. {$else}
  1652. result := -1;
  1653. if CheckResult( zmq_recv( SocketPtr, msg.fMessage, flags ) ) = 0 then
  1654. result := msg.size;
  1655. {$endif}
  1656. end;
  1657. function TZMQSocket.recv( msg: TZMQFrame; flags: TZMQRecvFlags = [] ): Integer;
  1658. begin
  1659. result := recv( msg, Byte( flags ) );
  1660. end;
  1661. function TZMQSocket.recv( strm: TStream; flags: TZMQRecvFlags = [] ): Integer;
  1662. var
  1663. frame: TZMQFrame;
  1664. begin
  1665. frame := TZMQFrame.Create;
  1666. try
  1667. result := recv( frame, flags );
  1668. strm.Write( frame.data^, result );
  1669. finally
  1670. frame.Free;
  1671. end;
  1672. end;
  1673. function TZMQSocket.recv( var msg: Utf8String; flags: TZMQRecvFlags = [] ): Integer;
  1674. var
  1675. frame: TZMQFrame;
  1676. begin
  1677. frame := TZMQFrame.Create;
  1678. try
  1679. Result := recv( frame, flags );
  1680. msg := frame.asUtf8String;
  1681. finally
  1682. frame.Free;
  1683. end;
  1684. end;
  1685. function TZMQSocket.recv( var msgs: TZMQMsg; flags: TZMQRecvFlags = [] ): Integer;
  1686. var
  1687. msg: TZMQFrame;
  1688. bRcvMore: Boolean;
  1689. rc: Integer;
  1690. begin
  1691. if msgs = nil then
  1692. msgs := TZMQMsg.Create;
  1693. bRcvMore := True;
  1694. result := 0;
  1695. while bRcvMore do
  1696. begin
  1697. msg := TZMQFrame.create;
  1698. rc := recv( msg, flags );
  1699. if rc <> -1 then
  1700. begin
  1701. msgs.Add( msg );
  1702. inc( result );
  1703. end else
  1704. begin
  1705. result := -1;
  1706. msg.Free;
  1707. break;
  1708. end;
  1709. bRcvMore := RcvMore;
  1710. end;
  1711. end;
  1712. // receive multipart message. the result is the number of messages received.
  1713. function TZMQSocket.recv( msg: TStrings; flags: TZMQRecvFlags = [] ): Integer;
  1714. var
  1715. msgs: TZMQMsg;
  1716. i: Integer;
  1717. begin
  1718. msgs := TZMQMsg.Create;
  1719. try
  1720. result := recv( msgs, flags );
  1721. for i := 0 to result - 1 do
  1722. msg.Add( msgs[i].asUtf8String );
  1723. finally
  1724. msgs.Free;
  1725. end;
  1726. end;
  1727. { TZMQContext }
  1728. constructor TZMQContext.create{$ifndef zmq3}( io_threads: Integer ){$endif};
  1729. begin
  1730. fTerminated := false;
  1731. fMainThread := true;
  1732. contexts.Add( Self );
  1733. {$ifdef zmq3}
  1734. fContext := zmq_ctx_new;
  1735. {$else}
  1736. fContext := zmq_init( io_threads );
  1737. {$endif}
  1738. fLinger := -2;
  1739. //fLinger := 0;
  1740. if ContextPtr = nil then
  1741. raise EZMQException.Create;
  1742. fSockets := TList.Create;
  1743. end;
  1744. constructor TZMQContext.createShadow( context: TZMQContext );
  1745. begin
  1746. fTerminated := false;
  1747. fMainThread := false;
  1748. contexts.Add( Self );
  1749. fContext := context.ContextPtr;
  1750. fLinger := context.Linger;
  1751. fSockets := TList.Create;
  1752. end;
  1753. destructor TZMQContext.destroy;
  1754. var
  1755. i: Integer;
  1756. begin
  1757. if fLinger >= -1 then
  1758. for i:= 0 to fSockets.Count - 1 do
  1759. TZMQSocket(fSockets[i]).Linger := Linger;
  1760. while fSockets.Count > 0 do
  1761. TZMQSocket(fSockets[0]).Free;
  1762. if ( fContext <> nil ) and fMainThread then
  1763. begin
  1764. {$ifdef zmq3}
  1765. CheckResult( zmq_ctx_destroy( ContextPtr ) );
  1766. {$else}
  1767. CheckResult( zmq_term( ContextPtr ) );
  1768. {$endif}
  1769. end;
  1770. fContext := nil;
  1771. fSockets.Free;
  1772. contexts.Delete( contexts.IndexOf(Self) );
  1773. inherited;
  1774. end;
  1775. procedure TZMQContext.Terminate;
  1776. var
  1777. p: Pointer;
  1778. begin
  1779. if not Terminated then
  1780. begin
  1781. fTerminated := true;
  1782. {$ifndef unix}
  1783. p := ContextPtr;
  1784. fContext := nil;
  1785. if fMainThread then
  1786. begin
  1787. {$ifdef zmq3}
  1788. CheckResult( zmq_ctx_destroy( p ) );
  1789. {$else}
  1790. CheckResult( zmq_term( p ) );
  1791. {$endif}
  1792. end;
  1793. {$endif}
  1794. end;
  1795. end;
  1796. procedure TZMQContext.CheckResult( rc: Integer );
  1797. begin
  1798. if rc = 0 then
  1799. begin
  1800. // ok
  1801. end else
  1802. if rc = -1 then
  1803. begin
  1804. raise EZMQException.Create;
  1805. end else
  1806. raise EZMQException.Create('Function result is not 0, or -1!');
  1807. end;
  1808. {$ifdef zmq3}
  1809. function TZMQContext.getOption( option: Integer ): Integer;
  1810. begin
  1811. result := zmq_ctx_get( ContextPtr, option );
  1812. if result = -1 then
  1813. raise EZMQException.Create
  1814. else if result < -1 then
  1815. raise EZMQException.Create('Function result is less than -1!');
  1816. end;
  1817. procedure TZMQContext.setOption( option, optval: Integer );
  1818. begin
  1819. CheckResult( zmq_ctx_set( ContextPtr, option, optval ) );
  1820. end;
  1821. function TZMQContext.getIOThreads: Integer;
  1822. begin
  1823. result := getOption( ZMQ_IO_THREADS );
  1824. end;
  1825. procedure TZMQContext.setIOThreads( const Value: Integer );
  1826. begin
  1827. setOption( ZMQ_IO_THREADS, Value );
  1828. end;
  1829. function TZMQContext.getMaxSockets: Integer;
  1830. begin
  1831. result := getOption( ZMQ_MAX_SOCKETS );
  1832. end;
  1833. procedure TZMQContext.setMaxSockets( const Value: Integer );
  1834. begin
  1835. setOption( ZMQ_MAX_SOCKETS, Value );
  1836. end;
  1837. {$endif}
  1838. function TZMQContext.Shadow: TZMQContext;
  1839. begin
  1840. result := TZMQContext.createShadow( self );
  1841. end;
  1842. function TZMQContext.Socket( stype: TZMQSocketType ): TZMQSocket;
  1843. begin
  1844. EnterCriticalSection( cs );
  1845. try
  1846. result := TZMQSocket.Create;
  1847. result.fSocket := zmq_socket( ContextPtr, Byte( stype ) );
  1848. if result.fSocket = nil then
  1849. begin
  1850. result.Free;
  1851. result := nil;
  1852. raise EZMQException.Create;
  1853. end;
  1854. result.fContext := self;
  1855. fSockets.Add( result );
  1856. finally
  1857. LeaveCriticalSection( cs );
  1858. end;
  1859. end;
  1860. procedure TZMQContext.RemoveSocket( lSocket: TZMQSocket );
  1861. var
  1862. i: Integer;
  1863. begin
  1864. EnterCriticalSection( cs );
  1865. try
  1866. i := fSockets.IndexOf( lSocket );
  1867. if i < 0 then
  1868. raise EZMQException.Create( 'Socket not in context' );
  1869. fSockets.Delete( i );
  1870. finally
  1871. LeaveCriticalSection( cs );
  1872. end;
  1873. end;
  1874. const
  1875. cZMQPoller_Register = 'reg';
  1876. cZMQPoller_SyncRegister = 'syncreg';
  1877. cZMQPoller_DeRegister = 'dereg';
  1878. cZMQPoller_SyncDeRegister = 'syncdereg';
  1879. cZMQPoller_Terminate = 'term';
  1880. cZMQPoller_PollNumber = 'pollno';
  1881. cZMQPoller_SyncPollNumber = 'syncpollno';
  1882. { TZMQPoller }
  1883. constructor TZMQPoller.Create( lSync: Boolean = false; lContext: TZMQContext = nil );
  1884. begin
  1885. fSync := lSync;
  1886. {$ifdef UNIX}
  1887. InitCriticalSection( cs );
  1888. {$else}
  1889. InitializeCriticalSection( cs );
  1890. {$endif}
  1891. fonException := nil;
  1892. if not fSync then
  1893. begin
  1894. fOwnContext := lContext = nil;
  1895. if fOwnContext then
  1896. fContext := TZMQContext.create
  1897. else
  1898. fContext := lContext;
  1899. fAddr := 'inproc://poller' + IntToHex( Integer( Self ), 8 );
  1900. sPair := fContext.Socket( stPair );
  1901. sPair.bind( fAddr );
  1902. end;
  1903. fPollItemCapacity := 10;
  1904. fPollItemCount := 0;
  1905. fPollNumber := 0;
  1906. SetLength( fPollItem, fPollItemCapacity );
  1907. SetLength( fPollSocket, fPollItemCapacity );
  1908. fTimeOut := -1;
  1909. inherited Create( fSync );
  1910. end;
  1911. destructor TZMQPoller.Destroy;
  1912. begin
  1913. if not fSync then
  1914. begin
  1915. sPair.send( cZMQPoller_Terminate );
  1916. sPair.Free;
  1917. if fOwnContext then
  1918. fContext.Free;
  1919. end;
  1920. {$ifdef UNIX}
  1921. DoneCriticalSection( cs );
  1922. {$else}
  1923. DeleteCriticalSection( cs );
  1924. {$endif}
  1925. inherited;
  1926. end;
  1927. procedure TZMQPoller.CheckResult( rc: Integer );
  1928. begin
  1929. if rc = -1 then
  1930. raise EZMQException.Create else
  1931. if rc < -1 then
  1932. raise EZMQException.Create('Function result is less than -1!');
  1933. end;
  1934. procedure TZMQPoller.AddToPollItems( socket: TZMQSocket; events: TZMQPollEvents );
  1935. begin
  1936. EnterCriticalSection( cs );
  1937. try
  1938. if fPollItemCapacity = fPollItemCount then
  1939. begin
  1940. fPollItemCapacity := fPollItemCapacity + 10;
  1941. SetLength( fPollItem, fPollItemCapacity );
  1942. SetLength( fPollSocket, fPollItemCapacity );
  1943. end;
  1944. fPollSocket[fPollItemCount] := socket;
  1945. fPollItem[fPollItemCount].socket := socket.SocketPtr;
  1946. fPollItem[fPollItemCount].fd := 0;
  1947. fPollItem[fPollItemCount].events := Byte( events );
  1948. fPollItem[fPollItemCount].revents := 0;
  1949. fPollItemCount := fPollItemCount + 1;
  1950. fPollNumber := fPollItemCount;
  1951. finally
  1952. LeaveCriticalSection( cs );
  1953. end;
  1954. end;
  1955. procedure TZMQPoller.DelFromPollItems( socket: TZMQSocket; events: TZMQPollEvents; indx: Integer );
  1956. var
  1957. i: Integer;
  1958. begin
  1959. EnterCriticalSection( cs );
  1960. try
  1961. fPollItem[indx].events := fPollItem[indx].events and not Byte( events );
  1962. if fPollItem[indx].events = 0 then
  1963. begin
  1964. for i := indx to fPollItemCount - 2 do
  1965. begin
  1966. fPollItem[i] := fPollItem[i + 1];
  1967. fPollSocket[i] := fPollSocket[i + 1];
  1968. end;
  1969. Dec( fPollItemCount );
  1970. end;
  1971. finally
  1972. LeaveCriticalSection( cs );
  1973. end;
  1974. end;
  1975. function TZMQPoller.getPollItem( indx: Integer ): TZMQPollItem;
  1976. begin
  1977. EnterCriticalSection( cs );
  1978. try
  1979. result.socket := fPollSocket[indx];
  1980. Byte(result.events) := fPollItem[indx].events;
  1981. Byte(result.revents) := fPollItem[indx].revents;
  1982. finally
  1983. LeaveCriticalSection( cs );
  1984. end;
  1985. end;
  1986. type
  1987. TTempRec = record
  1988. socket: TZMQSocket;
  1989. events: TZMQPollEvents;
  1990. reg, // true if reg, false if dereg.
  1991. sync: Boolean; // if true, socket should send back a message
  1992. end;
  1993. procedure TZMQPoller.Execute;
  1994. var
  1995. sPairThread: TZMQSocket;
  1996. rc: Integer;
  1997. i,j: Integer;
  1998. pes: TZMQPollEvents;
  1999. msg: TStringList;
  2000. reglist: Array of TTempRec;
  2001. reglistcap,
  2002. reglistcount: Integer;
  2003. procedure AddToRegList( so: TZMQSocket; ev: TZMQPollEvents; reg: Boolean; sync: Boolean );
  2004. begin
  2005. if reglistcap = reglistcount then
  2006. begin
  2007. reglistcap := reglistcap + 10;
  2008. SetLength( reglist, reglistcap );
  2009. end;
  2010. reglist[reglistcount].socket := so;
  2011. reglist[reglistcount].events := ev;
  2012. reglist[reglistcount].reg := reg;
  2013. reglist[reglistcount].sync := sync;
  2014. inc( reglistcount );
  2015. end;
  2016. begin
  2017. reglistcap := 10;
  2018. reglistcount := 0;
  2019. SetLength( reglist, reglistcap );
  2020. sPairThread := fContext.Socket( stPair );
  2021. sPairThread.connect( fAddr );
  2022. fPollItemCount := 1;
  2023. fPollNumber := 1;
  2024. fPollSocket[0] := sPairThread;
  2025. fPollItem[0].socket := sPairThread.SocketPtr;
  2026. fPollItem[0].fd := 0;
  2027. pes := [pePollIn];
  2028. fPollItem[0].events := Byte( pes );
  2029. fPollItem[0].revents := 0;
  2030. msg := TStringList.Create;
  2031. while not Terminated do
  2032. try
  2033. rc := zmq_poll( fPollItem[0], fPollNumber, fTimeOut );
  2034. CheckResult( rc );
  2035. if rc = 0 then
  2036. begin
  2037. if Assigned( fonTimeOut ) then
  2038. fonTimeOut( self );
  2039. end else
  2040. begin
  2041. for i := 0 to fPollNumber - 1 do
  2042. if fPollItem[i].revents > 0 then
  2043. begin
  2044. if i = 0 then
  2045. begin
  2046. // control messages.
  2047. msg.Clear;
  2048. fPollSocket[0].recv( msg );
  2049. if ( msg[0] = cZMQPoller_Register ) or
  2050. ( msg[0] = cZMQPoller_SyncRegister )then
  2051. begin
  2052. Byte(pes) := StrToInt( msg[2] );
  2053. AddToRegList( TZMQSocket( StrToInt( msg[1] ) ), pes, True,
  2054. msg[0] = cZMQPoller_SyncRegister );
  2055. end else
  2056. if ( msg[0] = cZMQPoller_DeRegister ) or
  2057. ( msg[0] = cZMQPoller_SyncDeRegister ) then
  2058. begin
  2059. Byte(pes) := StrToInt( msg[2] );
  2060. AddToRegList( TZMQSocket( StrToInt( msg[1] ) ), pes, False,
  2061. msg[0] = cZMQPoller_SyncDeRegister );
  2062. end else
  2063. if ( msg[0] = cZMQPoller_PollNumber ) or
  2064. ( msg[0] = cZMQPoller_SyncPollNumber ) then
  2065. begin
  2066. fPollNumber := StrToInt( msg[1] );
  2067. if msg[0] = cZMQPoller_SyncPollNumber then
  2068. sPairThread.send('');
  2069. end;
  2070. if msg[0] = cZMQPoller_Terminate then
  2071. Terminate;
  2072. end else
  2073. if Assigned( fOnEvent ) then
  2074. begin
  2075. Byte(pes) := fPollItem[i].revents;
  2076. fOnEvent( fPollSocket[i], pes );
  2077. end;
  2078. end;
  2079. if reglistcount > 0 then
  2080. begin
  2081. for i := 0 to reglistcount - 1 do
  2082. begin
  2083. j := 1;
  2084. while ( j < fPollItemCount ) and ( fPollSocket[j] <> reglist[i].socket ) do
  2085. inc( j );
  2086. if j < fPollItemCount then
  2087. begin
  2088. if reglist[i].reg then
  2089. begin
  2090. fPollItem[j].events := fPollItem[j].events or Byte( reglist[i].events );
  2091. end else
  2092. DelFromPollItems( reglist[i].socket, reglist[i].events, j );
  2093. end else
  2094. begin
  2095. if reglist[i].reg then
  2096. AddToPollItems( reglist[i].socket, reglist[i].events )
  2097. //else
  2098. //warn not found, but want to delete.
  2099. end;
  2100. if reglist[i].sync then
  2101. sPairThread.send( '' );
  2102. end;
  2103. reglistcount := 0;
  2104. end;
  2105. end;
  2106. except
  2107. on e: Exception do
  2108. begin
  2109. if ( e is EZMQException ) and
  2110. ( EZMQException(e).Num = ETERM ) then
  2111. Terminate;
  2112. if Assigned( fOnException ) then
  2113. fOnException( e );
  2114. end;
  2115. end;
  2116. msg.Free;
  2117. sPairThread.Free;
  2118. end;
  2119. procedure TZMQPoller.Register( socket: TZMQSocket; events: TZMQPollEvents; bWait: Boolean = false );
  2120. var
  2121. s: Utf8String;
  2122. begin
  2123. if fSync then
  2124. AddToPollItems( socket, events )
  2125. else
  2126. begin
  2127. if bWait then
  2128. s := cZMQPoller_SyncRegister
  2129. else
  2130. s := cZMQPoller_Register;
  2131. sPair.send( [ s, IntToStr( Integer(socket) ), IntToStr( Byte( events ) )] );
  2132. if bWait then
  2133. sPair.recv( s );
  2134. end;
  2135. end;
  2136. procedure TZMQPoller.DeRegister( socket: TZMQSocket; events: TZMQPollEvents; bWait: Boolean = false );
  2137. var
  2138. s: Utf8String;
  2139. i: Integer;
  2140. begin
  2141. if fSync then
  2142. begin
  2143. i := 0;
  2144. while ( i < fPollItemCount ) and ( fPollSocket[i] <> socket ) do
  2145. inc( i );
  2146. if i = fPollItemCount then
  2147. raise EZMQException.Create( 'socket not in pollitems!' );
  2148. DelFromPollItems( socket, events, i );
  2149. end else begin
  2150. if bWait then
  2151. s := cZMQPoller_SyncDeregister
  2152. else
  2153. s := cZMQPoller_Deregister;
  2154. sPair.send( [ s, IntToStr( Integer(socket) ), IntToStr( Byte( events ) )] );
  2155. if bWait then
  2156. sPair.recv( s );
  2157. end;
  2158. end;
  2159. procedure TZMQPoller.setPollNumber( const Value: Integer; bWait: Boolean = false );
  2160. var
  2161. s: Utf8String;
  2162. begin
  2163. if fSync then
  2164. fPollNumber := Value
  2165. else begin
  2166. if bWait then
  2167. s := cZMQPoller_PollNumber
  2168. else
  2169. s := cZMQPoller_SyncPollNumber;
  2170. sPair.send( [ s, IntToStr( Value ) ] );
  2171. if bWait then
  2172. sPair.recv( s );
  2173. end;
  2174. end;
  2175. /// if the second parameter specified, than only the first "pollCount"
  2176. /// sockets polled
  2177. function TZMQPoller.poll( timeout: Integer = -1; lPollNumber: Integer = -1 ): Integer;
  2178. var
  2179. pc, i: Integer;
  2180. begin
  2181. if not fSync then
  2182. raise EZMQException.Create('Poller hasn''t created in Synchronous mode');
  2183. if fPollItemCount = 0 then
  2184. raise EZMQException.Create( 'Nothing to poll!' );
  2185. if lPollNumber = -1 then
  2186. pc := fPollItemCount
  2187. else
  2188. if ( lpollNumber > -1 ) and ( lpollNumber <= fPollItemCount ) then
  2189. pc := lpollNumber
  2190. else
  2191. raise EZMQException.Create( 'wrong pollCount parameter.' );
  2192. {$ifndef zmq3}
  2193. if timeout <> -1 then
  2194. timeout := timeout * 1000;
  2195. {$endif}
  2196. for i := 0 to fPollItemCount - 1 do
  2197. fPollItem[i].revents := 0;
  2198. result := zmq_poll( fPollItem[0], pc, timeout );
  2199. if result < 0 then
  2200. raise EZMQException.Create
  2201. end;
  2202. function TZMQPoller.getPollResult( indx: Integer ): TZMQPollItem;
  2203. var
  2204. i,j: Integer;
  2205. begin
  2206. if not fSync then
  2207. raise EZMQException.Create('Poller created in Synchronous mode');
  2208. i := 0;
  2209. j := -1;
  2210. while ( i < fPollItemCount) and ( j < indx ) do
  2211. begin
  2212. if ( fPollItem[i].revents and fPollItem[i].events ) > 0 then
  2213. inc( j );
  2214. if j < indx then
  2215. inc( i );
  2216. end;
  2217. result.socket := fPollSocket[i];
  2218. Byte(result.events) := fPollItem[i].revents;
  2219. end;
  2220. function ZMQPoll( var pia: TZMQPollItemA; piaSize: Integer = -1; timeout: Integer = -1 ): Integer;
  2221. var
  2222. PollItem: array of zmq.pollitem_t;
  2223. i,l,n: Integer;
  2224. begin
  2225. l := Length( pia );
  2226. if l = 0 then
  2227. raise EZMQException.Create( 'Nothing to poll!' );
  2228. SetLength( PollItem, l );
  2229. try
  2230. for i := 0 to l - 1 do
  2231. begin
  2232. PollItem[i].socket := pia[i].Socket.SocketPtr;
  2233. PollItem[i].fd := 0;
  2234. PollItem[i].events := Byte( pia[i].events );
  2235. PollItem[i].revents := 0;
  2236. end;
  2237. if piaSize = -1 then
  2238. n := l
  2239. else
  2240. n := piaSize;
  2241. result := zmq_poll( PollItem[0], n, timeout );
  2242. if result < 0 then
  2243. raise EZMQException.Create;
  2244. for i := 0 to l - 1 do
  2245. Byte(pia[i].revents) := PollItem[i].revents;
  2246. finally
  2247. PollItem := nil;
  2248. end;
  2249. end;
  2250. function ZMQPoll( var pia: TZMQPollItem; piaSize: Integer = 1; timeout: Integer = -1 ): Integer; overload;
  2251. var
  2252. PollItem: zmq.pollitem_t;
  2253. begin
  2254. PollItem.socket := pia.Socket.SocketPtr;
  2255. PollItem.fd := 0;
  2256. PollItem.events := Byte( pia.events );
  2257. PollItem.revents := 0;
  2258. result := zmq_poll( PollItem, piaSize, timeout );
  2259. if result < 0 then
  2260. raise EZMQException.Create;
  2261. Byte(pia.revents) := PollItem.revents;
  2262. end;
  2263. // Thread related functions.
  2264. procedure ZMQProxy( frontend, backend, capture: TZMQSocket );
  2265. var
  2266. p: Pointer;
  2267. begin
  2268. if capture <> nil then
  2269. p := capture.SocketPtr
  2270. else
  2271. p := nil;
  2272. {$ifdef zmq3}
  2273. if zmq_proxy( frontend.SocketPtr, backend.SocketPtr, p ) <> -1 then
  2274. raise EZMQException.Create( 'Proxy does not return -1' );
  2275. {$endif}
  2276. //raise EZMQException.Create;
  2277. end;
  2278. procedure ZMQDevice( device: TZMQDevice; insocket, outsocket: TZMQSocket );
  2279. begin
  2280. if zmq_device( Ord( device ), insocket.SocketPtr, outsocket.SocketPtr ) <> -1 then
  2281. raise EZMQException.Create( 'Device does not return -1' );
  2282. end;
  2283. procedure ZMQVersion(var major, minor, patch: Integer);
  2284. begin
  2285. zmq_version( major, minor, patch );
  2286. end;
  2287. {$ifdef UNIX}
  2288. procedure InterruptContexts;
  2289. var
  2290. i: Integer;
  2291. begin
  2292. ZMQTerminated := true;
  2293. for i := 0 to contexts.Count - 1 do
  2294. TZMQContext(contexts[i]).Terminate;
  2295. end;
  2296. procedure HandleSignal(signum: longint; si: psiginfo; sc: PSigcontext); cdecl;
  2297. begin
  2298. InterruptContexts;
  2299. Writeln('zmqapi handling signal: ' + IntToStr(signum));
  2300. end;
  2301. procedure InstallSigHandler(sig: cint); cdecl;
  2302. var
  2303. k : integer;
  2304. oa, na : PSigActionRec;
  2305. begin
  2306. new(na);
  2307. new(oa);
  2308. na^.sa_handler := @HandleSignal;
  2309. fillchar(na^.sa_mask,sizeof(na^.sa_mask),#0);
  2310. na^.sa_flags := 0;
  2311. na^.sa_restorer := nil;
  2312. k := fpSigaction(sig,na,oa);
  2313. if k<>0 then
  2314. begin
  2315. Writeln('signal handler install error '+IntToStr(k)+' '+IntToStr(fpgeterrno));
  2316. halt(1);
  2317. end;
  2318. Freemem(oa);
  2319. Freemem(na);
  2320. end;
  2321. {$else}
  2322. {
  2323. This function is called when a CTRL_C_EVENT received, important that this
  2324. function is executed in a separate thread, because Terminate terminates the
  2325. context, which blocks until there are open sockets.
  2326. }
  2327. function console_handler( dwCtrlType: DWORD ): BOOL;
  2328. var
  2329. i: Integer;
  2330. begin
  2331. if CTRL_C_EVENT = dwCtrlType then
  2332. begin
  2333. ZMQTerminated := true;
  2334. for i := contexts.Count - 1 downto 0 do
  2335. TZMQContext(contexts[i]).Terminate;
  2336. result := True;
  2337. // if I set to True than the app won't exit,
  2338. // but it's not the solution.
  2339. // ZMQTerminate;
  2340. end else begin
  2341. result := False;
  2342. end;
  2343. end;
  2344. {$endif}
  2345. procedure ZMQTerminate;
  2346. begin
  2347. {$ifndef UNIX}
  2348. GenerateConsoleCtrlEvent( CTRL_C_EVENT, 0 );
  2349. {$endif}
  2350. end;
  2351. { TZMQThread }
  2352. constructor TZMQThread.Create( lArgs: Pointer; ctx: TZMQContext );
  2353. begin
  2354. inherited Create( true );
  2355. fArgs := lArgs;
  2356. if ctx = nil then
  2357. fContext := TZMQContext.Create
  2358. else begin
  2359. fContext := ctx.Shadow;
  2360. fPipe := Context.Socket( stPair );
  2361. fPipe.bind( Format( 'inproc://zmqthread-pipe-%p', [@fPipe] ) );
  2362. end;
  2363. end;
  2364. constructor TZMQThread.CreateAttached( lAttachedMeth: TAttachedThreadMeth; ctx: TZMQContext;
  2365. lArgs: Pointer);
  2366. begin
  2367. Create( lArgs, ctx );
  2368. fAttachedMeth := lAttachedMeth;
  2369. end;
  2370. constructor TZMQThread.CreateDetached( lDetachedMeth: TDetachedThreadMeth; lArgs: Pointer);
  2371. begin
  2372. Create( lArgs, nil );
  2373. fDetachedMeth := lDetachedMeth;
  2374. end;
  2375. constructor TZMQThread.CreateAttachedProc( lAttachedProc: TAttachedThreadProc; ctx: TZMQContext; lArgs: Pointer );
  2376. begin
  2377. Create( lArgs, ctx );
  2378. fAttachedProc := lAttachedProc;
  2379. end;
  2380. constructor TZMQThread.CreateDetachedProc( lDetachedProc: TDetachedThreadProc; lArgs: Pointer );
  2381. begin
  2382. Create( lArgs, nil );
  2383. fDetachedProc := lDetachedProc;
  2384. end;
  2385. destructor TZMQThread.Destroy;
  2386. begin
  2387. if Context <> nil then
  2388. Context.Free;
  2389. inherited;
  2390. end;
  2391. procedure TZMQThread.DoExecute;
  2392. begin
  2393. if Assigned( fAttachedMeth ) then
  2394. fAttachedMeth( fArgs, Context, thrPipe )
  2395. else
  2396. if Assigned( fDetachedMeth ) then
  2397. fDetachedMeth( fArgs, Context )
  2398. else
  2399. if Assigned( fAttachedProc ) then
  2400. fAttachedProc( fArgs, Context, thrPipe )
  2401. else
  2402. if Assigned( fDetachedProc ) then
  2403. fDetachedProc( fArgs, Context );
  2404. end;
  2405. procedure TZMQThread.Execute;
  2406. begin
  2407. if Assigned( fAttachedProc ) or Assigned( fAttachedMeth ) then
  2408. begin // attached thread
  2409. thrPipe := Context.Socket( stPair );
  2410. thrPipe.connect( Format( 'inproc://zmqthread-pipe-%p', [@fPipe] ) );
  2411. end;
  2412. DoExecute;
  2413. end;
  2414. initialization
  2415. {$ifdef UNIX}
  2416. InitCriticalSection( cs );
  2417. {$else}
  2418. InitializeCriticalSection( cs );
  2419. {$endif}
  2420. contexts := TList.Create;
  2421. {$ifdef UNIX}
  2422. { TODO : Signal handling should normally be installed at application level, not in library }
  2423. InstallSigHandler(SIGTERM);
  2424. InstallSigHandler(SIGINT);
  2425. {$else}
  2426. Windows.SetConsoleCtrlHandler( @console_handler, True );
  2427. {$endif}
  2428. finalization
  2429. contexts.Free;
  2430. {$ifdef UNIX}
  2431. DoneCriticalSection( cs );
  2432. {$else}
  2433. DeleteCriticalSection( cs );
  2434. {$endif}
  2435. end.