| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- /** BEGIN COPYRIGHT BLOCK
- * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
- * Copyright (C) 2005 Red Hat, Inc.
- * All rights reserved.
- *
- * License: GPL (version 3 or any later version).
- * See LICENSE for details.
- * END COPYRIGHT BLOCK **/
- #ifdef HAVE_CONFIG_H
- # include <config.h>
- #endif
- /* ********************************************************
- eventq.c - Event queue/scheduling system.
- There are 3 publicly-accessible entry points:
- slapi_eq_once(): cause an event to happen exactly once
- slapi_eq_repeat(): cause an event to happen repeatedly
- slapi_eq_cancel(): cancel a pending event
- There is also an initialization point which must be
- called by the server to initialize the event queue system:
- eq_start(), and an entry point used to shut down the system:
- eq_stop().
- *********************************************************** */
- #include "slap.h"
- #include "prlock.h"
- #include "prcvar.h"
- #include "prinit.h"
- /*
- * Private definition of slapi_eq_context. Only this
- * module (eventq.c) should know about the layout of
- * this structure.
- */
- typedef struct _slapi_eq_context {
- time_t ec_when;
- time_t ec_interval;
- slapi_eq_fn_t ec_fn;
- void *ec_arg;
- Slapi_Eq_Context ec_id;
- struct _slapi_eq_context *ec_next;
- } slapi_eq_context;
- /*
- * Definition of the event queue.
- */
- typedef struct _event_queue {
- PRLock *eq_lock;
- PRCondVar *eq_cv;
- slapi_eq_context *eq_queue;
- } event_queue;
- /*
- * The event queue itself.
- */
- static event_queue eqs = {0};
- static event_queue *eq = &eqs;
- /*
- * Thread ID of the main thread loop
- */
- static PRThread *eq_loop_tid = NULL;
- /*
- * Flags used to control startup/shutdown of the event queue
- */
- static int eq_running = 0;
- static int eq_stopped = 0;
- static int eq_initialized = 0;
- PRLock *ss_lock = NULL;
- PRCondVar *ss_cv = NULL;
- PRCallOnceType init_once = {0};
- /* Forward declarations */
- static slapi_eq_context *eq_new(slapi_eq_fn_t fn, void *arg,
- time_t when, unsigned long interval);
- static void eq_enqueue(slapi_eq_context *newec);
- static slapi_eq_context *eq_dequeue(time_t now);
- static PRStatus eq_create(void);
- /* ******************************************************** */
- /*
- * slapi_eq_once: cause an event to happen exactly once.
- *
- * Arguments:
- * fn: the function to call
- * arg: an argument to pass to the called function
- * when: the time that the function should be called
- * Returns:
- * slapi_eq_context - a handle to an opaque object which
- * the caller can use to refer to this particular scheduled
- * event.
- */
- Slapi_Eq_Context
- slapi_eq_once(slapi_eq_fn_t fn, void *arg, time_t when)
- {
- slapi_eq_context *tmp;
- PR_ASSERT(eq_initialized);
- if (!eq_stopped) {
-
- Slapi_Eq_Context id;
- tmp = eq_new(fn, arg, when, 0UL);
- id = tmp->ec_id;
- eq_enqueue(tmp);
- /* After this point, <tmp> may have */
- /* been freed, depending on the thread */
- /* scheduling. Too bad */
- slapi_log_error(SLAPI_LOG_HOUSE, NULL,
- "added one-time event id %p at time %ld\n",
- id, when);
- return(id);
- }
- return NULL; /* JCM - Not sure if this should be 0 or something else. */
- }
-
- /*
- * slapi_eq_repeat: cause an event to happen repeatedly.
- *
- * Arguments:
- * fn: the function to call
- * arg: an argument to pass to the called function
- * when: the time that the function should first be called
- * interval: the amount of time (in milliseconds) between
- * successive calls to the function
- * Returns:
- * slapi_eq_context - a handle to an opaque object which
- * the caller can use to refer to this particular scheduled
- */
- Slapi_Eq_Context
- slapi_eq_repeat(slapi_eq_fn_t fn, void *arg, time_t when, unsigned long interval)
- {
- slapi_eq_context *tmp ;
- PR_ASSERT(eq_initialized);
- if (!eq_stopped) {
- tmp = eq_new(fn, arg, when, interval);
- eq_enqueue(tmp);
- slapi_log_error(SLAPI_LOG_HOUSE, NULL,
- "added repeating event id %p at time %ld, interval %lu\n",
- tmp->ec_id, when, interval);
- return(tmp->ec_id);
- }
- return NULL; /* JCM - Not sure if this should be 0 or something else. */
- }
- /*
- * slapi_eq_cancel: cancel a pending event.
- * Arguments:
- * ctx: the context of the event which should be de-scheduled
- */
- int
- slapi_eq_cancel(Slapi_Eq_Context ctx)
- {
- slapi_eq_context **p, *tmp = NULL;
- int found = 0;
- PR_ASSERT(eq_initialized);
- if (!eq_stopped) {
- PR_Lock(eq->eq_lock);
- p = &(eq->eq_queue);
- while (!found && *p != NULL) {
- if ((*p)->ec_id == ctx) {
- tmp = *p;
- *p = (*p)->ec_next;
- slapi_ch_free((void**)&tmp);
- found = 1;
- } else {
- p = &((*p)->ec_next);
- }
- }
- PR_Unlock(eq->eq_lock);
- }
- slapi_log_error(SLAPI_LOG_HOUSE, NULL,
- "cancellation of event id %p requested: %s\n",
- ctx, found ? "cancellation succeeded" : "event not found");
- return found;
- }
- /*
- * Construct a new ec structure
- */
- static slapi_eq_context *
- eq_new(slapi_eq_fn_t fn, void *arg, time_t when, unsigned long interval)
- {
- slapi_eq_context *retptr = (slapi_eq_context *)slapi_ch_calloc(1, sizeof(slapi_eq_context));
- time_t now;
- retptr->ec_fn = fn;
- retptr->ec_arg = arg;
- now = current_time();
- retptr->ec_when = when < now ? now : when;
- retptr->ec_interval = interval == 0UL ? 0UL : (interval + 999) / 1000;
- retptr->ec_id = (Slapi_Eq_Context)retptr;
- return retptr;
- }
- /*
- * Add a new event to the event queue.
- */
- static void
- eq_enqueue(slapi_eq_context *newec)
- {
- slapi_eq_context **p;
- PR_ASSERT(NULL != newec);
- PR_Lock(eq->eq_lock);
- /* Insert <newec> in order (sorted by start time) in the list */
- for (p = &(eq->eq_queue); *p != NULL; p = &((*p)->ec_next)) {
- if ((*p)->ec_when > newec->ec_when) {
- break;
- }
- }
- if (NULL != *p) {
- newec->ec_next = *p;
- } else {
- newec->ec_next = NULL;
- }
- *p = newec;
- PR_NotifyCondVar(eq->eq_cv); /* wake up scheduler thread */
- PR_Unlock(eq->eq_lock);
- }
- /*
- * If there is an event in the queue scheduled at time
- * <now> or before, dequeue it and return a pointer
- * to it. Otherwise, return NULL.
- */
- static slapi_eq_context *
- eq_dequeue(time_t now)
- {
- slapi_eq_context *retptr = NULL;
- PR_Lock(eq->eq_lock);
- if (NULL != eq->eq_queue && eq->eq_queue->ec_when <= now) {
- retptr = eq->eq_queue;
- eq->eq_queue = retptr->ec_next;
- }
- PR_Unlock(eq->eq_lock);
- return retptr;
- }
- /*
- * Call all events which are due to run.
- * Note that if we've missed a schedule
- * opportunity, we don't try to catch up
- * by calling the function repeatedly.
- */
- static void
- eq_call_all()
- {
- slapi_eq_context *p;
- while ((p = eq_dequeue(current_time())) != NULL) {
- /* Call the scheduled function */
- p->ec_fn(p->ec_when, p->ec_arg);
- slapi_log_error(SLAPI_LOG_HOUSE, NULL,
- "Event id %p called at %ld (scheduled for %ld)\n",
- p->ec_id, current_time(), p->ec_when);
- if (0UL != p->ec_interval) {
- /* This is a repeating event. Requeue it. */
- do {
- p->ec_when += p->ec_interval;
- } while (p->ec_when < current_time());
- eq_enqueue(p);
- } else {
- slapi_ch_free((void **)&p);
- }
- }
- }
- /*
- * The main event queue loop.
- */
- #define WORK_AVAILABLE ((NULL != eq->eq_queue) && (eq->eq_queue->ec_when <= current_time()))
- static void
- eq_loop(void *arg)
- {
- while (eq_running) {
- PRIntervalTime timeout;
- int until;
- PR_Lock(eq->eq_lock);
- while (!WORK_AVAILABLE) {
- if (!eq_running) {
- PR_Unlock(eq->eq_lock);
- goto bye;
- }
- /* Compute new timeout */
- if (NULL != eq->eq_queue) {
- until = eq->eq_queue->ec_when - current_time();
- timeout = PR_SecondsToInterval(until);
- } else {
- timeout = PR_INTERVAL_NO_TIMEOUT;
- }
- PR_WaitCondVar(eq->eq_cv, timeout);
- }
- /* There is some work to do */
- PR_Unlock(eq->eq_lock);
- eq_call_all();
- }
- bye:
- eq_stopped = 1;
- PR_Lock(ss_lock);
- PR_NotifyAllCondVar(ss_cv);
- PR_Unlock(ss_lock);
- }
- /*
- * Allocate and initialize the event queue structures.
- */
- static PRStatus
- eq_create(void)
- {
- PR_ASSERT(NULL == eq->eq_lock);
- if ((eq->eq_lock = PR_NewLock()) == NULL) {
- slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewLock failed\n");
- exit(1);
- }
- if ((eq->eq_cv = PR_NewCondVar(eq->eq_lock)) == NULL) {
- slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewCondVar failed\n");
- exit(1);
- }
- if ((ss_lock = PR_NewLock()) == NULL) {
- slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewLock failed\n");
- exit(1);
- }
- if ((ss_cv = PR_NewCondVar(ss_lock)) == NULL) {
- slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_start PR_NewCondVar failed\n");
- exit(1);
- }
- eq->eq_queue = NULL;
- eq_initialized = 1;
- return PR_SUCCESS;
- }
- /*
- * eq_start: start the event queue system.
- *
- * This should be called exactly once. It will start a
- * thread which wakes up periodically and schedules events.
- */
- void
- eq_start()
- {
- PR_ASSERT(eq_initialized);
- eq_running = 1;
- if ((eq_loop_tid = PR_CreateThread(PR_USER_THREAD, (VFP)eq_loop,
- NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,
- SLAPD_DEFAULT_THREAD_STACKSIZE)) == NULL) {
- slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_init PR_CreateThread failed\n");
- exit(1);
- }
- slapi_log_error(SLAPI_LOG_HOUSE, NULL, "event queue services have started\n");
- }
- /*
- * eq_init: initialize the event queue system.
- *
- * This function should be called early in server startup.
- * Once it has been called, the event queue will queue
- * events, but will not fire any events. Once all of the
- * server plugins have been started, the eq_start()
- * function should be called, and events will then start
- * to fire.
- */
- void
- eq_init()
- {
- if (!eq_initialized) {
- if (PR_SUCCESS != PR_CallOnce(&init_once, eq_create)) {
- slapi_log_error(SLAPI_LOG_FATAL, NULL, "eq_init - eq_create failed\n");
- }
- }
- }
- /*
- * eq_stop: shut down the event queue system.
- * Does not return until event queue is fully
- * shut down.
- */
- void
- eq_stop()
- {
- slapi_eq_context *p, *q;
- if ( NULL == eq || NULL == eq->eq_lock ) { /* never started */
- eq_stopped = 1;
- return;
- }
- eq_stopped = 0;
- eq_running = 0;
- /*
- * Signal the eq thread function to stop, and wait until
- * it acknowledges by setting eq_stopped.
- */
- while (!eq_stopped) {
- PR_Lock(eq->eq_lock);
- PR_NotifyAllCondVar(eq->eq_cv);
- PR_Unlock(eq->eq_lock);
- PR_Lock(ss_lock);
- PR_WaitCondVar(ss_cv, PR_MillisecondsToInterval(100));
- PR_Unlock(ss_lock);
- }
- (void)PR_JoinThread(eq_loop_tid);
- /*
- * XXXggood we don't free the actual event queue data structures.
- * This is intentional, to allow enqueueing/cancellation of events
- * even after event queue services have shut down (these are no-ops).
- * The downside is that the event queue can't be stopped and restarted
- * easily.
- */
- PR_Lock(eq->eq_lock);
- p = eq->eq_queue;
- while (p != NULL) {
- q = p->ec_next;
- slapi_ch_free((void**)&p);
- /* Some ec_arg could get leaked here in shutdown (e.g., replica_name)
- * This can be fixed by specifying a flag when the context is queued.
- * [After 6.2]
- */
- p = q;
- }
- PR_Unlock(eq->eq_lock);
- slapi_log_error(SLAPI_LOG_HOUSE, NULL, "event queue services have shut down\n");
- }
- /*
- * return arg (ec_arg) only if the context is in the event queue
- */
- void *
- slapi_eq_get_arg ( Slapi_Eq_Context ctx )
- {
- slapi_eq_context **p;
- PR_ASSERT(eq_initialized);
- if (!eq_stopped) {
- PR_Lock(eq->eq_lock);
- p = &(eq->eq_queue);
- while (p && *p != NULL) {
- if ((*p)->ec_id == ctx) {
- PR_Unlock(eq->eq_lock);
- return (*p)->ec_arg;
- } else {
- p = &((*p)->ec_next);
- }
- }
- PR_Unlock(eq->eq_lock);
- }
- return NULL;
- }
|