eventq.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. # include <config.h>
  11. #endif
  12. /* ********************************************************
  13. eventq.c - Event queue/scheduling system.
  14. There are 3 publicly-accessible entry points:
  15. slapi_eq_once(): cause an event to happen exactly once
  16. slapi_eq_repeat(): cause an event to happen repeatedly
  17. slapi_eq_cancel(): cancel a pending event
  18. There is also an initialization point which must be
  19. called by the server to initialize the event queue system:
  20. eq_start(), and an entry point used to shut down the system:
  21. eq_stop().
  22. *********************************************************** */
  23. #include "slap.h"
  24. #include "prlock.h"
  25. #include "prcvar.h"
  26. #include "prinit.h"
  27. /*
  28. * Private definition of slapi_eq_context. Only this
  29. * module (eventq.c) should know about the layout of
  30. * this structure.
  31. */
  32. typedef struct _slapi_eq_context {
  33. time_t ec_when;
  34. time_t ec_interval;
  35. slapi_eq_fn_t ec_fn;
  36. void *ec_arg;
  37. Slapi_Eq_Context ec_id;
  38. struct _slapi_eq_context *ec_next;
  39. } slapi_eq_context;
  40. /*
  41. * Definition of the event queue.
  42. */
  43. typedef struct _event_queue {
  44. PRLock *eq_lock;
  45. PRCondVar *eq_cv;
  46. slapi_eq_context *eq_queue;
  47. } event_queue;
  48. /*
  49. * The event queue itself.
  50. */
  51. static event_queue eqs = {0};
  52. static event_queue *eq = &eqs;
  53. /*
  54. * Thread ID of the main thread loop
  55. */
  56. static PRThread *eq_loop_tid = NULL;
  57. /*
  58. * Flags used to control startup/shutdown of the event queue
  59. */
  60. static int eq_running = 0;
  61. static int eq_stopped = 0;
  62. static int eq_initialized = 0;
  63. PRLock *ss_lock = NULL;
  64. PRCondVar *ss_cv = NULL;
  65. PRCallOnceType init_once = {0};
  66. /* Forward declarations */
  67. static slapi_eq_context *eq_new(slapi_eq_fn_t fn, void *arg,
  68. time_t when, unsigned long interval);
  69. static void eq_enqueue(slapi_eq_context *newec);
  70. static slapi_eq_context *eq_dequeue(time_t now);
  71. static PRStatus eq_create(void);
  72. /* ******************************************************** */
  73. /*
  74. * slapi_eq_once: cause an event to happen exactly once.
  75. *
  76. * Arguments:
  77. * fn: the function to call
  78. * arg: an argument to pass to the called function
  79. * when: the time that the function should be called
  80. * Returns:
  81. * slapi_eq_context - a handle to an opaque object which
  82. * the caller can use to refer to this particular scheduled
  83. * event.
  84. */
  85. Slapi_Eq_Context
  86. slapi_eq_once(slapi_eq_fn_t fn, void *arg, time_t when)
  87. {
  88. slapi_eq_context *tmp;
  89. PR_ASSERT(eq_initialized);
  90. if (!eq_stopped) {
  91. Slapi_Eq_Context id;
  92. tmp = eq_new(fn, arg, when, 0UL);
  93. id = tmp->ec_id;
  94. eq_enqueue(tmp);
  95. /* After this point, <tmp> may have */
  96. /* been freed, depending on the thread */
  97. /* scheduling. Too bad */
  98. slapi_log_error(SLAPI_LOG_HOUSE, NULL,
  99. "added one-time event id %p at time %ld\n",
  100. id, when);
  101. return(id);
  102. }
  103. return NULL; /* JCM - Not sure if this should be 0 or something else. */
  104. }
  105. /*
  106. * slapi_eq_repeat: cause an event to happen repeatedly.
  107. *
  108. * Arguments:
  109. * fn: the function to call
  110. * arg: an argument to pass to the called function
  111. * when: the time that the function should first be called
  112. * interval: the amount of time (in milliseconds) between
  113. * successive calls to the function
  114. * Returns:
  115. * slapi_eq_context - a handle to an opaque object which
  116. * the caller can use to refer to this particular scheduled
  117. */
  118. Slapi_Eq_Context
  119. slapi_eq_repeat(slapi_eq_fn_t fn, void *arg, time_t when, unsigned long interval)
  120. {
  121. slapi_eq_context *tmp ;
  122. PR_ASSERT(eq_initialized);
  123. if (!eq_stopped) {
  124. tmp = eq_new(fn, arg, when, interval);
  125. eq_enqueue(tmp);
  126. slapi_log_error(SLAPI_LOG_HOUSE, NULL,
  127. "added repeating event id %p at time %ld, interval %lu\n",
  128. tmp->ec_id, when, interval);
  129. return(tmp->ec_id);
  130. }
  131. return NULL; /* JCM - Not sure if this should be 0 or something else. */
  132. }
  133. /*
  134. * slapi_eq_cancel: cancel a pending event.
  135. * Arguments:
  136. * ctx: the context of the event which should be de-scheduled
  137. */
  138. int
  139. slapi_eq_cancel(Slapi_Eq_Context ctx)
  140. {
  141. slapi_eq_context **p, *tmp = NULL;
  142. int found = 0;
  143. PR_ASSERT(eq_initialized);
  144. if (!eq_stopped) {
  145. PR_Lock(eq->eq_lock);
  146. p = &(eq->eq_queue);
  147. while (!found && *p != NULL) {
  148. if ((*p)->ec_id == ctx) {
  149. tmp = *p;
  150. *p = (*p)->ec_next;
  151. slapi_ch_free((void**)&tmp);
  152. found = 1;
  153. } else {
  154. p = &((*p)->ec_next);
  155. }
  156. }
  157. PR_Unlock(eq->eq_lock);
  158. }
  159. slapi_log_error(SLAPI_LOG_HOUSE, NULL,
  160. "cancellation of event id %p requested: %s\n",
  161. ctx, found ? "cancellation succeeded" : "event not found");
  162. return found;
  163. }
  164. /*
  165. * Construct a new ec structure
  166. */
  167. static slapi_eq_context *
  168. eq_new(slapi_eq_fn_t fn, void *arg, time_t when, unsigned long interval)
  169. {
  170. slapi_eq_context *retptr = (slapi_eq_context *)slapi_ch_calloc(1, sizeof(slapi_eq_context));
  171. time_t now;
  172. retptr->ec_fn = fn;
  173. retptr->ec_arg = arg;
  174. now = current_time();
  175. retptr->ec_when = when < now ? now : when;
  176. retptr->ec_interval = interval == 0UL ? 0UL : (interval + 999) / 1000;
  177. retptr->ec_id = (Slapi_Eq_Context)retptr;
  178. return retptr;
  179. }
  180. /*
  181. * Add a new event to the event queue.
  182. */
  183. static void
  184. eq_enqueue(slapi_eq_context *newec)
  185. {
  186. slapi_eq_context **p;
  187. PR_ASSERT(NULL != newec);
  188. PR_Lock(eq->eq_lock);
  189. /* Insert <newec> in order (sorted by start time) in the list */
  190. for (p = &(eq->eq_queue); *p != NULL; p = &((*p)->ec_next)) {
  191. if ((*p)->ec_when > newec->ec_when) {
  192. break;
  193. }
  194. }
  195. if (NULL != *p) {
  196. newec->ec_next = *p;
  197. } else {
  198. newec->ec_next = NULL;
  199. }
  200. *p = newec;
  201. PR_NotifyCondVar(eq->eq_cv); /* wake up scheduler thread */
  202. PR_Unlock(eq->eq_lock);
  203. }
  204. /*
  205. * If there is an event in the queue scheduled at time
  206. * <now> or before, dequeue it and return a pointer
  207. * to it. Otherwise, return NULL.
  208. */
  209. static slapi_eq_context *
  210. eq_dequeue(time_t now)
  211. {
  212. slapi_eq_context *retptr = NULL;
  213. PR_Lock(eq->eq_lock);
  214. if (NULL != eq->eq_queue && eq->eq_queue->ec_when <= now) {
  215. retptr = eq->eq_queue;
  216. eq->eq_queue = retptr->ec_next;
  217. }
  218. PR_Unlock(eq->eq_lock);
  219. return retptr;
  220. }
  221. /*
  222. * Call all events which are due to run.
  223. * Note that if we've missed a schedule
  224. * opportunity, we don't try to catch up
  225. * by calling the function repeatedly.
  226. */
  227. static void
  228. eq_call_all()
  229. {
  230. slapi_eq_context *p;
  231. while ((p = eq_dequeue(current_time())) != NULL) {
  232. /* Call the scheduled function */
  233. p->ec_fn(p->ec_when, p->ec_arg);
  234. slapi_log_error(SLAPI_LOG_HOUSE, NULL,
  235. "Event id %p called at %ld (scheduled for %ld)\n",
  236. p->ec_id, current_time(), p->ec_when);
  237. if (0UL != p->ec_interval) {
  238. /* This is a repeating event. Requeue it. */
  239. do {
  240. p->ec_when += p->ec_interval;
  241. } while (p->ec_when < current_time());
  242. eq_enqueue(p);
  243. } else {
  244. slapi_ch_free((void **)&p);
  245. }
  246. }
  247. }
  248. /*
  249. * The main event queue loop.
  250. */
  251. #define WORK_AVAILABLE ((NULL != eq->eq_queue) && (eq->eq_queue->ec_when <= current_time()))
  252. static void
  253. eq_loop(void *arg)
  254. {
  255. while (eq_running) {
  256. PRIntervalTime timeout;
  257. int until;
  258. PR_Lock(eq->eq_lock);
  259. while (!WORK_AVAILABLE) {
  260. if (!eq_running) {
  261. PR_Unlock(eq->eq_lock);
  262. goto bye;
  263. }
  264. /* Compute new timeout */
  265. if (NULL != eq->eq_queue) {
  266. until = eq->eq_queue->ec_when - current_time();
  267. timeout = PR_SecondsToInterval(until);
  268. } else {
  269. timeout = PR_INTERVAL_NO_TIMEOUT;
  270. }
  271. PR_WaitCondVar(eq->eq_cv, timeout);
  272. }
  273. /* There is some work to do */
  274. PR_Unlock(eq->eq_lock);
  275. eq_call_all();
  276. }
  277. bye:
  278. eq_stopped = 1;
  279. PR_Lock(ss_lock);
  280. PR_NotifyAllCondVar(ss_cv);
  281. PR_Unlock(ss_lock);
  282. }
  283. /*
  284. * Allocate and initialize the event queue structures.
  285. */
  286. static PRStatus
  287. eq_create(void)
  288. {
  289. PR_ASSERT(NULL == eq->eq_lock);
  290. if ((eq->eq_lock = PR_NewLock()) == NULL) {
  291. slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewLock failed\n");
  292. exit(1);
  293. }
  294. if ((eq->eq_cv = PR_NewCondVar(eq->eq_lock)) == NULL) {
  295. slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewCondVar failed\n");
  296. exit(1);
  297. }
  298. if ((ss_lock = PR_NewLock()) == NULL) {
  299. slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewLock failed\n");
  300. exit(1);
  301. }
  302. if ((ss_cv = PR_NewCondVar(ss_lock)) == NULL) {
  303. slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewCondVar failed\n");
  304. exit(1);
  305. }
  306. eq->eq_queue = NULL;
  307. eq_initialized = 1;
  308. return PR_SUCCESS;
  309. }
  310. /*
  311. * eq_start: start the event queue system.
  312. *
  313. * This should be called exactly once. It will start a
  314. * thread which wakes up periodically and schedules events.
  315. */
  316. void
  317. eq_start()
  318. {
  319. PR_ASSERT(eq_initialized);
  320. eq_running = 1;
  321. if ((eq_loop_tid = PR_CreateThread(PR_USER_THREAD, (VFP)eq_loop,
  322. NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,
  323. SLAPD_DEFAULT_THREAD_STACKSIZE)) == NULL) {
  324. slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_init PR_CreateThread failed\n");
  325. exit(1);
  326. }
  327. slapi_log_error(SLAPI_LOG_HOUSE, NULL, "event queue services have started\n");
  328. }
  329. /*
  330. * eq_init: initialize the event queue system.
  331. *
  332. * This function should be called early in server startup.
  333. * Once it has been called, the event queue will queue
  334. * events, but will not fire any events. Once all of the
  335. * server plugins have been started, the eq_start()
  336. * function should be called, and events will then start
  337. * to fire.
  338. */
  339. void
  340. eq_init()
  341. {
  342. if (!eq_initialized) {
  343. if (PR_SUCCESS != PR_CallOnce(&init_once, eq_create)) {
  344. slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_init - eq_create failed\n");
  345. }
  346. }
  347. }
  348. /*
  349. * eq_stop: shut down the event queue system.
  350. * Does not return until event queue is fully
  351. * shut down.
  352. */
  353. void
  354. eq_stop()
  355. {
  356. slapi_eq_context *p, *q;
  357. if ( NULL == eq || NULL == eq->eq_lock ) { /* never started */
  358. eq_stopped = 1;
  359. return;
  360. }
  361. eq_stopped = 0;
  362. eq_running = 0;
  363. /*
  364. * Signal the eq thread function to stop, and wait until
  365. * it acknowledges by setting eq_stopped.
  366. */
  367. while (!eq_stopped) {
  368. PR_Lock(eq->eq_lock);
  369. PR_NotifyAllCondVar(eq->eq_cv);
  370. PR_Unlock(eq->eq_lock);
  371. PR_Lock(ss_lock);
  372. PR_WaitCondVar(ss_cv, PR_MillisecondsToInterval(100));
  373. PR_Unlock(ss_lock);
  374. }
  375. (void)PR_JoinThread(eq_loop_tid);
  376. /*
  377. * XXXggood we don't free the actual event queue data structures.
  378. * This is intentional, to allow enqueueing/cancellation of events
  379. * even after event queue services have shut down (these are no-ops).
  380. * The downside is that the event queue can't be stopped and restarted
  381. * easily.
  382. */
  383. PR_Lock(eq->eq_lock);
  384. p = eq->eq_queue;
  385. while (p != NULL) {
  386. q = p->ec_next;
  387. slapi_ch_free((void**)&p);
  388. /* Some ec_arg could get leaked here in shutdown (e.g., replica_name)
  389. * This can be fixed by specifying a flag when the context is queued.
  390. * [After 6.2]
  391. */
  392. p = q;
  393. }
  394. PR_Unlock(eq->eq_lock);
  395. slapi_log_error(SLAPI_LOG_HOUSE, NULL, "event queue services have shut down\n");
  396. }
  397. /*
  398. * return arg (ec_arg) only if the context is in the event queue
  399. */
  400. void *
  401. slapi_eq_get_arg ( Slapi_Eq_Context ctx )
  402. {
  403. slapi_eq_context **p;
  404. PR_ASSERT(eq_initialized);
  405. if (!eq_stopped) {
  406. PR_Lock(eq->eq_lock);
  407. p = &(eq->eq_queue);
  408. while (p && *p != NULL) {
  409. if ((*p)->ec_id == ctx) {
  410. PR_Unlock(eq->eq_lock);
  411. return (*p)->ec_arg;
  412. } else {
  413. p = &((*p)->ec_next);
  414. }
  415. }
  416. PR_Unlock(eq->eq_lock);
  417. }
  418. return NULL;
  419. }