|
|
@@ -58,10 +58,8 @@
|
|
|
#include <netinet/tcp.h> /* for TCP_CORK */
|
|
|
#endif
|
|
|
|
|
|
-
|
|
|
+typedef Connection work_q_item;
|
|
|
static void connection_threadmain( void );
|
|
|
-static void add_pb( Slapi_PBlock * );
|
|
|
-static Slapi_PBlock *get_pb( void );
|
|
|
static void connection_add_operation(Connection* conn, Operation *op);
|
|
|
static void connection_free_private_buffer(Connection *conn);
|
|
|
static void op_copy_identity(Connection *conn, Operation *op);
|
|
|
@@ -70,28 +68,89 @@ static int is_ber_too_big(const Connection *conn, ber_len_t ber_len);
|
|
|
static void log_ber_too_big_error(const Connection *conn,
|
|
|
ber_len_t ber_len, ber_len_t maxbersize);
|
|
|
|
|
|
+static PRStack *op_stack; /* stack of Slapi_Operation * objects so we don't have to malloc/free every time */
|
|
|
+static PRInt32 op_stack_size; /* size of op_stack */
|
|
|
+
|
|
|
+struct Slapi_op_stack {
|
|
|
+ PRStackElem stackelem; /* must be first in struct for PRStack to work */
|
|
|
+ Slapi_Operation *op;
|
|
|
+};
|
|
|
+
|
|
|
+static void add_work_q( work_q_item *, struct Slapi_op_stack * );
|
|
|
+static work_q_item *get_work_q( struct Slapi_op_stack ** );
|
|
|
+
|
|
|
/*
|
|
|
- * We maintain a global work queue of Slapi_PBlock's that have not yet
|
|
|
+ * We maintain a global work queue of items that have not yet
|
|
|
* been handed off to an operation thread.
|
|
|
*/
|
|
|
-struct Slapi_PBlock_q
|
|
|
-{
|
|
|
- Slapi_PBlock *pb;
|
|
|
- struct Slapi_PBlock_q *next_pb;
|
|
|
- int pb_fd;
|
|
|
+struct Slapi_work_q {
|
|
|
+ PRStackElem stackelem; /* must be first in struct for PRStack to work */
|
|
|
+ work_q_item *work_item;
|
|
|
+ struct Slapi_op_stack *op_stack_obj;
|
|
|
+ struct Slapi_work_q *next_work_item;
|
|
|
};
|
|
|
|
|
|
-static struct Slapi_PBlock_q *first_pb= NULL; /* global work queue head */
|
|
|
-static struct Slapi_PBlock_q *last_pb= NULL; /* global work queue tail */
|
|
|
-static PRLock *pb_q_lock=NULL; /* protects first_pb & last_pb */
|
|
|
-static PRCondVar *pb_q_cv; /* used by operation threads to wait for work - when there is a op pblock in the queue waiting to be processed */
|
|
|
-static PRInt32 pb_q_size; /* size of pb_q */
|
|
|
-static PRInt32 pb_q_size_max; /* high water mark of pb_q_size */
|
|
|
-#define PB_Q_EMPTY (pb_q_size == 0)
|
|
|
+static struct Slapi_work_q *head_work_q= NULL; /* global work queue head */
|
|
|
+static struct Slapi_work_q *tail_work_q= NULL; /* global work queue tail */
|
|
|
+static PRLock *work_q_lock=NULL; /* protects head_conn_q and tail_conn_q */
|
|
|
+static PRCondVar *work_q_cv; /* used by operation threads to wait for work - when there is a conn in the queue waiting to be processed */
|
|
|
+static PRInt32 work_q_size; /* size of conn_q */
|
|
|
+static PRInt32 work_q_size_max; /* high water mark of work_q_size */
|
|
|
+#define WORK_Q_EMPTY (work_q_size == 0)
|
|
|
+static PRStack *work_q_stack; /* stack of work_q structs so we don't have to malloc/free every time */
|
|
|
+static PRInt32 work_q_stack_size; /* size of work_q_stack */
|
|
|
+static PRInt32 work_q_stack_size_max; /* max size of work_q_stack */
|
|
|
static PRInt32 op_shutdown= 0; /* if non-zero, server is shutting down */
|
|
|
|
|
|
#define LDAP_SOCKET_IO_BUFFER_SIZE 512 /* Size of the buffer we give to the I/O system for reads */
|
|
|
|
|
|
+static struct Slapi_work_q *
|
|
|
+create_work_q()
|
|
|
+{
|
|
|
+ struct Slapi_work_q *work_q = (struct Slapi_work_q *)PR_StackPop(work_q_stack);
|
|
|
+ if (!work_q) {
|
|
|
+ work_q = (struct Slapi_work_q *)slapi_ch_malloc(sizeof(struct Slapi_work_q));
|
|
|
+ } else {
|
|
|
+ PR_AtomicDecrement(&work_q_stack_size);
|
|
|
+ }
|
|
|
+ return work_q;
|
|
|
+}
|
|
|
+
|
|
|
+static void
|
|
|
+destroy_work_q(struct Slapi_work_q **work_q)
|
|
|
+{
|
|
|
+ if (work_q && *work_q) {
|
|
|
+ PR_StackPush(work_q_stack, (PRStackElem *)*work_q);
|
|
|
+ PR_AtomicIncrement(&work_q_stack_size);
|
|
|
+ if (work_q_stack_size > work_q_stack_size_max) {
|
|
|
+ work_q_stack_size_max = work_q_stack_size;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static struct Slapi_op_stack *
|
|
|
+connection_get_operation(void)
|
|
|
+{
|
|
|
+ struct Slapi_op_stack *stack_obj = (struct Slapi_op_stack *)PR_StackPop(op_stack);
|
|
|
+ if (!stack_obj) {
|
|
|
+ stack_obj = (struct Slapi_op_stack *)slapi_ch_malloc(sizeof(struct Slapi_op_stack));
|
|
|
+ stack_obj->op = operation_new( plugin_build_operation_action_bitmap( 0,
|
|
|
+ plugin_get_server_plg() ));
|
|
|
+ } else {
|
|
|
+ PR_AtomicDecrement(&op_stack_size);
|
|
|
+ operation_init(stack_obj->op,
|
|
|
+ plugin_build_operation_action_bitmap( 0, plugin_get_server_plg() ));
|
|
|
+ }
|
|
|
+ return stack_obj;
|
|
|
+}
|
|
|
+
|
|
|
+static void
|
|
|
+connection_done_operation(Connection *conn, struct Slapi_op_stack *stack_obj)
|
|
|
+{
|
|
|
+ operation_done(&(stack_obj->op), conn);
|
|
|
+ PR_StackPush(op_stack, (PRStackElem *)stack_obj);
|
|
|
+ PR_AtomicIncrement(&op_stack_size);
|
|
|
+}
|
|
|
|
|
|
/*
|
|
|
* We really are done with this connection. Get rid of everything.
|
|
|
@@ -404,21 +463,25 @@ init_op_threads()
|
|
|
int max_threads = config_get_threadnumber();
|
|
|
/* Initialize the locks and cv */
|
|
|
|
|
|
- if ((pb_q_lock = PR_NewLock()) == NULL ) {
|
|
|
+ if ((work_q_lock = PR_NewLock()) == NULL ) {
|
|
|
errorCode = PR_GetError();
|
|
|
LDAPDebug( LDAP_DEBUG_ANY,
|
|
|
- "init_op_threads: PR_NewLock failed for pb_q_lock, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
|
|
|
+ "init_op_threads: PR_NewLock failed for work_q_lock, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
|
|
|
errorCode, slapd_pr_strerror(errorCode), 0 );
|
|
|
exit(-1);
|
|
|
}
|
|
|
|
|
|
- if ((pb_q_cv = PR_NewCondVar( pb_q_lock )) == NULL) {
|
|
|
+ if ((work_q_cv = PR_NewCondVar( work_q_lock )) == NULL) {
|
|
|
errorCode = PR_GetError();
|
|
|
- LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed for pb_q_cv, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
|
|
|
+ LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed for work_q_cv, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
|
|
|
errorCode, slapd_pr_strerror(errorCode), 0 );
|
|
|
exit(-1);
|
|
|
}
|
|
|
|
|
|
+ work_q_stack = PR_CreateStack("connection_work_q");
|
|
|
+
|
|
|
+ op_stack = PR_CreateStack("connection_operation");
|
|
|
+
|
|
|
/* start the operation threads */
|
|
|
for (i=0; i < max_threads; i++) {
|
|
|
PR_SetConcurrency(4);
|
|
|
@@ -1678,7 +1741,7 @@ connection_free_private_buffer(Connection *conn)
|
|
|
|
|
|
|
|
|
/* Connection status values returned by
|
|
|
- connection_wait_for_new_pb(), connection_read_operation(), etc. */
|
|
|
+ connection_wait_for_new_work(), connection_read_operation(), etc. */
|
|
|
|
|
|
#define CONN_FOUND_WORK_TO_DO 0
|
|
|
#define CONN_SHUTDOWN 1
|
|
|
@@ -1691,43 +1754,51 @@ connection_free_private_buffer(Connection *conn)
|
|
|
#define CONN_TURBO_PERCENTILE 50 /* proportion of threads allowed to be in turbo mode */
|
|
|
#define CONN_TURBO_HYSTERESIS 0 /* avoid flip flopping in and out of turbo mode */
|
|
|
|
|
|
-int connection_wait_for_new_pb(Slapi_PBlock **ppb, PRIntervalTime interval)
|
|
|
+void connection_make_new_pb(Slapi_PBlock *pb, Connection *conn)
|
|
|
+{
|
|
|
+ struct Slapi_op_stack *stack_obj = NULL;
|
|
|
+ /* we used to malloc/free the pb for each operation - now, just use a local stack pb
|
|
|
+ * in connection_threadmain, and just clear it out
|
|
|
+ */
|
|
|
+ /* *ppb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) ); */
|
|
|
+ /* *ppb = slapi_pblock_new(); */
|
|
|
+ pb->pb_conn = conn;
|
|
|
+ stack_obj = connection_get_operation();
|
|
|
+ pb->pb_op = stack_obj->op;
|
|
|
+ pb->op_stack_elem = stack_obj;
|
|
|
+ connection_add_operation( conn, pb->pb_op );
|
|
|
+}
|
|
|
+
|
|
|
+int connection_wait_for_new_work(Slapi_PBlock *pb, PRIntervalTime interval)
|
|
|
{
|
|
|
int ret = CONN_FOUND_WORK_TO_DO;
|
|
|
+ work_q_item *wqitem = NULL;
|
|
|
+ struct Slapi_op_stack *op_stack_obj = NULL;
|
|
|
|
|
|
- PR_Lock( pb_q_lock );
|
|
|
+ PR_Lock( work_q_lock );
|
|
|
|
|
|
- while( !op_shutdown && PB_Q_EMPTY ) {
|
|
|
- PR_WaitCondVar( pb_q_cv, interval );
|
|
|
+ while( !op_shutdown && WORK_Q_EMPTY ) {
|
|
|
+ PR_WaitCondVar( work_q_cv, interval );
|
|
|
}
|
|
|
|
|
|
if ( op_shutdown ) {
|
|
|
- LDAPDebug0Args( LDAP_DEBUG_ANY, "connection_wait_for_new_pb: shutdown\n" );
|
|
|
+ LDAPDebug0Args( LDAP_DEBUG_TRACE, "connection_wait_for_new_work: shutdown\n" );
|
|
|
ret = CONN_SHUTDOWN;
|
|
|
- } else if ( NULL == ( *ppb = get_pb() ) ) {
|
|
|
+ } else if ( NULL == ( wqitem = get_work_q( &op_stack_obj ) ) ) {
|
|
|
/* not sure how this can happen */
|
|
|
- LDAPDebug0Args( LDAP_DEBUG_ANY, "connection_wait_for_new_pb: pb is null\n" );
|
|
|
+ LDAPDebug0Args( LDAP_DEBUG_TRACE, "connection_wait_for_new_work: no work to do\n" );
|
|
|
ret = CONN_NOWORK;
|
|
|
+ } else {
|
|
|
+ /* make new pb */
|
|
|
+ pb->pb_conn = (Connection *)wqitem;
|
|
|
+ pb->op_stack_elem = op_stack_obj;
|
|
|
+ pb->pb_op = op_stack_obj->op;
|
|
|
}
|
|
|
|
|
|
- PR_Unlock( pb_q_lock );
|
|
|
+ PR_Unlock( work_q_lock );
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
-void connection_make_new_pb(Slapi_PBlock **ppb, Connection *conn)
|
|
|
-{
|
|
|
- /* In the classic case, the pb is made in connection_activity() and then
|
|
|
- queued. get_pb() dequeues it. So we can just make it ourselves here */
|
|
|
-
|
|
|
- /* *ppb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) ); */
|
|
|
- *ppb = slapi_pblock_new();
|
|
|
- (*ppb)->pb_conn = conn;
|
|
|
- (*ppb)->pb_op = operation_new( plugin_build_operation_action_bitmap( 0,
|
|
|
- plugin_get_server_plg() ));
|
|
|
- connection_add_operation( conn, (*ppb)->pb_op );
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
/*
|
|
|
* Utility function called by connection_read_operation(). This is a
|
|
|
* small wrapper on top of libldap's ber_get_next_buffer_ext().
|
|
|
@@ -2145,7 +2216,8 @@ void connection_enter_leave_turbo(Connection *conn, int current_turbo_flag, int
|
|
|
static void
|
|
|
connection_threadmain()
|
|
|
{
|
|
|
- Slapi_PBlock *pb = NULL;
|
|
|
+ Slapi_PBlock local_pb;
|
|
|
+ Slapi_PBlock *pb = &local_pb;
|
|
|
/* wait forever for new pb until one is available or shutdown */
|
|
|
PRIntervalTime interval = PR_INTERVAL_NO_TIMEOUT; /* PR_SecondsToInterval(10); */
|
|
|
Connection *conn = NULL;
|
|
|
@@ -2163,6 +2235,7 @@ connection_threadmain()
|
|
|
SIGNAL( SIGPIPE, SIG_IGN );
|
|
|
#endif
|
|
|
|
|
|
+ pblock_init(pb);
|
|
|
while (1) {
|
|
|
int is_timedout = 0;
|
|
|
time_t curtime = 0;
|
|
|
@@ -2174,12 +2247,12 @@ connection_threadmain()
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (!thread_turbo_flag && (NULL == pb) && !more_data) {
|
|
|
+ if (!thread_turbo_flag && !more_data) {
|
|
|
/* If more data is left from the previous connection_read_operation,
|
|
|
we should finish the op now. Client might be thinking it's
|
|
|
done sending the request and wait for the response forever.
|
|
|
[blackflag 624234] */
|
|
|
- ret = connection_wait_for_new_pb(&pb,interval);
|
|
|
+ ret = connection_wait_for_new_work(pb,interval);
|
|
|
switch (ret) {
|
|
|
case CONN_NOWORK:
|
|
|
PR_ASSERT(interval != PR_INTERVAL_NO_TIMEOUT); /* this should never happen with PR_INTERVAL_NO_TIMEOUT */
|
|
|
@@ -2201,7 +2274,7 @@ connection_threadmain()
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
- } else if (NULL == pb) {
|
|
|
+ } else {
|
|
|
|
|
|
/* The turbo mode may cause threads starvation.
|
|
|
Do a yield here to reduce the starving
|
|
|
@@ -2210,7 +2283,7 @@ connection_threadmain()
|
|
|
|
|
|
PR_Lock(conn->c_mutex);
|
|
|
/* Make our own pb in turbo mode */
|
|
|
- connection_make_new_pb(&pb,conn);
|
|
|
+ connection_make_new_pb(pb,conn);
|
|
|
if (connection_call_io_layer_callbacks(conn)) {
|
|
|
LDAPDebug0Args( LDAP_DEBUG_ANY, "Error: could not add/remove IO layers from connection\n" );
|
|
|
}
|
|
|
@@ -2245,9 +2318,9 @@ connection_threadmain()
|
|
|
}
|
|
|
|
|
|
/* turn off turbo mode immediately if any pb waiting in global queue */
|
|
|
- if (thread_turbo_flag && !PB_Q_EMPTY) {
|
|
|
+ if (thread_turbo_flag && !WORK_Q_EMPTY) {
|
|
|
thread_turbo_flag = 0;
|
|
|
- LDAPDebug2Args(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode - pb_q is not empty %d\n",conn->c_connid,pb_q_size);
|
|
|
+ LDAPDebug2Args(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode - pb_q is not empty %d\n",conn->c_connid,work_q_size);
|
|
|
}
|
|
|
#endif
|
|
|
|
|
|
@@ -2333,12 +2406,7 @@ connection_threadmain()
|
|
|
done:
|
|
|
if (doshutdown) {
|
|
|
PR_Lock(conn->c_mutex);
|
|
|
- connection_remove_operation( conn, op );
|
|
|
- /* destroying the pblock will cause destruction of the operation
|
|
|
- * so this must happend before releasing the connection
|
|
|
- */
|
|
|
- slapi_pblock_destroy( pb );
|
|
|
- pb = NULL;
|
|
|
+ connection_remove_operation_ext(pb, conn, op);
|
|
|
connection_make_readable_nolock(conn);
|
|
|
conn->c_threadnumber--;
|
|
|
connection_release_nolock(conn);
|
|
|
@@ -2362,14 +2430,15 @@ done:
|
|
|
PR_Lock( conn->c_mutex );
|
|
|
connection_release_nolock (conn); /* psearch acquires ref to conn - release this one now */
|
|
|
PR_Unlock( conn->c_mutex );
|
|
|
+ /* ps_add makes a shallow copy of the pb - so we
|
|
|
+ * can't free it or init it here - just memset it to 0
|
|
|
+ * ps_send_results will call connection_remove_operation_ext to free it
|
|
|
+ */
|
|
|
+ memset(pb, 0, sizeof(*pb));
|
|
|
} else {
|
|
|
/* delete from connection operation queue & decr refcnt */
|
|
|
PR_Lock( conn->c_mutex );
|
|
|
- connection_remove_operation( conn, op );
|
|
|
- /* destroying the pblock will cause destruction of the operation
|
|
|
- * so this must happend before releasing the connection
|
|
|
- */
|
|
|
- slapi_pblock_destroy( pb );
|
|
|
+ connection_remove_operation_ext( pb, conn, op );
|
|
|
|
|
|
/* If we're in turbo mode, we keep our reference to the connection alive */
|
|
|
if (!more_data) {
|
|
|
@@ -2402,7 +2471,6 @@ done:
|
|
|
}
|
|
|
PR_Unlock( conn->c_mutex );
|
|
|
}
|
|
|
- pb = NULL;
|
|
|
} /* while (1) */
|
|
|
}
|
|
|
|
|
|
@@ -2410,7 +2478,7 @@ done:
|
|
|
int
|
|
|
connection_activity(Connection *conn)
|
|
|
{
|
|
|
- Slapi_PBlock *pb;
|
|
|
+ struct Slapi_op_stack *op_stack_obj;
|
|
|
|
|
|
if (connection_acquire_nolock (conn) == -1) {
|
|
|
LDAPDebug(LDAP_DEBUG_CONNS,
|
|
|
@@ -2421,14 +2489,14 @@ connection_activity(Connection *conn)
|
|
|
return (-1);
|
|
|
}
|
|
|
|
|
|
- connection_make_new_pb(&pb, conn);
|
|
|
-
|
|
|
/* set these here so setup_pr_read_pds will not add this conn back to the poll array */
|
|
|
conn->c_gettingber = 1;
|
|
|
conn->c_threadnumber++;
|
|
|
- /* Add pb to the end of the work queue. */
|
|
|
- /* have to do this last - add_pb will signal waiters in connection_wait_for_new_pb */
|
|
|
- add_pb( pb );
|
|
|
+ op_stack_obj = connection_get_operation();
|
|
|
+ connection_add_operation(conn, op_stack_obj->op);
|
|
|
+ /* Add conn to the end of the work queue. */
|
|
|
+ /* have to do this last - add_work_q will signal waiters in connection_wait_for_new_work */
|
|
|
+ add_work_q( (work_q_item *)conn, op_stack_obj );
|
|
|
|
|
|
if (! config_check_referral_mode()) {
|
|
|
slapi_counter_increment(ops_initiated);
|
|
|
@@ -2437,65 +2505,67 @@ connection_activity(Connection *conn)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-/* add_pb(): will add a pb to the end of the global work queue. The work queue
|
|
|
- is implemented as a singal link list. */
|
|
|
+/* add_work_q(): will add a work_q_item to the end of the global work queue. The work queue
|
|
|
+ is implemented as a single link list. */
|
|
|
|
|
|
static void
|
|
|
-add_pb( Slapi_PBlock *pb)
|
|
|
+add_work_q( work_q_item *wqitem, struct Slapi_op_stack *op_stack_obj )
|
|
|
{
|
|
|
- struct Slapi_PBlock_q *new_pb=NULL;
|
|
|
+ struct Slapi_work_q *new_work_q=NULL;
|
|
|
|
|
|
- LDAPDebug( LDAP_DEBUG_TRACE, "add_pb \n", 0, 0, 0 );
|
|
|
+ LDAPDebug( LDAP_DEBUG_TRACE, "add_work_q \n", 0, 0, 0 );
|
|
|
|
|
|
- new_pb = (struct Slapi_PBlock_q *) slapi_ch_malloc ( sizeof( struct Slapi_PBlock_q ));
|
|
|
- new_pb->pb = pb;
|
|
|
- new_pb->next_pb =NULL;
|
|
|
+ new_work_q = create_work_q();
|
|
|
+ new_work_q->work_item = wqitem;
|
|
|
+ new_work_q->op_stack_obj = op_stack_obj;
|
|
|
+ new_work_q->next_work_item =NULL;
|
|
|
|
|
|
- PR_Lock( pb_q_lock );
|
|
|
- if (last_pb == NULL) {
|
|
|
- last_pb = new_pb;
|
|
|
- first_pb = new_pb;
|
|
|
+ PR_Lock( work_q_lock );
|
|
|
+ if (tail_work_q == NULL) {
|
|
|
+ tail_work_q = new_work_q;
|
|
|
+ head_work_q = new_work_q;
|
|
|
}
|
|
|
else {
|
|
|
- last_pb->next_pb = new_pb;
|
|
|
- last_pb = new_pb;
|
|
|
+ tail_work_q->next_work_item = new_work_q;
|
|
|
+ tail_work_q = new_work_q;
|
|
|
}
|
|
|
- PR_AtomicIncrement( &pb_q_size ); /* increment q size */
|
|
|
- if ( pb_q_size > pb_q_size_max ) {
|
|
|
- pb_q_size_max = pb_q_size;
|
|
|
+ PR_AtomicIncrement( &work_q_size ); /* increment q size */
|
|
|
+ if ( work_q_size > work_q_size_max ) {
|
|
|
+ work_q_size_max = work_q_size;
|
|
|
}
|
|
|
- PR_NotifyCondVar( pb_q_cv ); /* notify waiters in connection_wait_for_new_pb */
|
|
|
- PR_Unlock( pb_q_lock );
|
|
|
+ PR_NotifyCondVar( work_q_cv ); /* notify waiters in connection_wait_for_new_work */
|
|
|
+ PR_Unlock( work_q_lock );
|
|
|
}
|
|
|
|
|
|
-/* get_pb(): will get a pb from the beginning of the work queue, return NULL if
|
|
|
- the queue is empty. This should only be called from connection_wait_for_new_pb
|
|
|
- with the pb_q_lock held */
|
|
|
+/* get_work_q(): will get a work_q_item from the beginning of the work queue, return NULL if
|
|
|
+ the queue is empty. This should only be called from connection_wait_for_new_work
|
|
|
+ with the work_q_lock held */
|
|
|
|
|
|
-static Slapi_PBlock *
|
|
|
-get_pb()
|
|
|
+static work_q_item *
|
|
|
+get_work_q(struct Slapi_op_stack **op_stack_obj)
|
|
|
{
|
|
|
- struct Slapi_PBlock_q *tmp = NULL;
|
|
|
- Slapi_PBlock *pb;
|
|
|
+ struct Slapi_work_q *tmp = NULL;
|
|
|
+ work_q_item *wqitem;
|
|
|
|
|
|
- LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_pb \n" );
|
|
|
- if (first_pb == NULL) {
|
|
|
- LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_pb: the work queue is empty.\n" );
|
|
|
+ LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_work_q \n" );
|
|
|
+ if (head_work_q == NULL) {
|
|
|
+ LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_work_q: the work queue is empty.\n" );
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- tmp = first_pb;
|
|
|
- if ( first_pb == last_pb ) {
|
|
|
- last_pb = NULL;
|
|
|
+ tmp = head_work_q;
|
|
|
+ if ( head_work_q == tail_work_q ) {
|
|
|
+ tail_work_q = NULL;
|
|
|
}
|
|
|
- first_pb = tmp->next_pb;
|
|
|
+ head_work_q = tmp->next_work_item;
|
|
|
|
|
|
- pb = tmp->pb;
|
|
|
- /* Free the memory used by the pb found. */
|
|
|
- slapi_ch_free ((void **)&tmp);
|
|
|
- PR_AtomicDecrement( &pb_q_size ); /* decrement q size */
|
|
|
+ wqitem = tmp->work_item;
|
|
|
+ *op_stack_obj = tmp->op_stack_obj;
|
|
|
+ PR_AtomicDecrement( &work_q_size ); /* decrement q size */
|
|
|
+ /* Free the memory used by the item found. */
|
|
|
+ destroy_work_q(&tmp);
|
|
|
|
|
|
- return (pb);
|
|
|
+ return (wqitem);
|
|
|
}
|
|
|
#endif /* LDAP_IOCP */
|
|
|
|
|
|
@@ -2519,9 +2589,9 @@ op_thread_cleanup()
|
|
|
"slapd shutting down - signaling operation threads\n", 0, 0, 0);
|
|
|
|
|
|
PR_AtomicIncrement(&op_shutdown);
|
|
|
- PR_Lock( pb_q_lock );
|
|
|
- PR_NotifyAllCondVar ( pb_q_cv ); /* tell any thread waiting in connection_wait_for_new_pb to shutdown */
|
|
|
- PR_Unlock( pb_q_lock );
|
|
|
+ PR_Lock( work_q_lock );
|
|
|
+ PR_NotifyAllCondVar ( work_q_cv ); /* tell any thread waiting in connection_wait_for_new_work to shutdown */
|
|
|
+ PR_Unlock( work_q_lock );
|
|
|
#ifdef _WIN32
|
|
|
LDAPDebug( LDAP_DEBUG_ANY,
|
|
|
"slapd shutting down - waiting for %d threads to terminate\n",
|
|
|
@@ -2579,6 +2649,14 @@ connection_remove_operation( Connection *conn, Operation *op )
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void
|
|
|
+connection_remove_operation_ext( Slapi_PBlock *pb, Connection *conn, Operation *op )
|
|
|
+{
|
|
|
+ connection_remove_operation(conn, op);
|
|
|
+ connection_done_operation(conn, pb->op_stack_elem);
|
|
|
+ pb->pb_op = NULL;
|
|
|
+ slapi_pblock_init(pb);
|
|
|
+}
|
|
|
|
|
|
/*
|
|
|
* Return a non-zero value if any operations are pending on conn.
|