Browse Source

Ticket #514 - investigate connection locking

https://fedorahosted.org/389/ticket/514
Reviewed by: mreynolds,nhosoi (Thanks!)
Branch: master
Fix Description: There were two locks involved for every operation - a lock
to protect the pblock queue and a lock and condition variable to protect
the counter variable.  This fix consolidates them into a single lock/cv for
the pblock queue and gets rid of the separate lock for the counter.  The
queue structures have been cleaned up and renamed work_q and work_q_size.
The worker threads wait for new work if not shutdown and the work_q is
empty.  In addition, the timeout interval for the wait for the work_q cv
has been changed to "infinite" instead of 10 seconds.
Platforms tested: RHEL6 x86_64
Flag Day: no
Doc impact: Yes
Rich Megginson 12 years ago
parent
commit
304502cb42
1 changed files with 149 additions and 179 deletions
  1. 149 179
      ldap/servers/slapd/connection.c

+ 149 - 179
ldap/servers/slapd/connection.c

@@ -84,10 +84,11 @@ struct Slapi_PBlock_q
 static struct Slapi_PBlock_q *first_pb= NULL;	/* global work queue head */
 static struct Slapi_PBlock_q *last_pb= NULL;	/* global work queue tail */
 static PRLock *pb_q_lock=NULL;			/* protects first_pb & last_pb */
-
-static PRCondVar *op_thread_cv;	/* used by operation threads to wait for work */
-static PRLock *op_thread_lock;	/* associated with op_thread_cv */
-static int op_shutdown= 0;		/* if non-zero, server is shutting down */
+static PRCondVar *pb_q_cv;	/* used by operation threads to wait for work - when there is a op pblock in the queue waiting to be processed */
+static PRInt32 pb_q_size;       /* size of pb_q */
+static PRInt32 pb_q_size_max;   /* high water mark of pb_q_size */
+#define PB_Q_EMPTY (pb_q_size == 0)
+static PRInt32 op_shutdown= 0;		/* if non-zero, server is shutting down */
 
 #define LDAP_SOCKET_IO_BUFFER_SIZE 512 /* Size of the buffer we give to the I/O system for reads */
 
@@ -403,38 +404,30 @@ init_op_threads()
 	int max_threads = config_get_threadnumber();
 	/* Initialize the locks and cv */
 
-   if ((pb_q_lock = PR_NewLock()) == NULL ) {
-        errorCode = PR_GetError();
-        LDAPDebug( LDAP_DEBUG_ANY,
-		   "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
-		   errorCode, slapd_pr_strerror(errorCode), 0 );
-        exit(-1);
-   }
-
-   if ((op_thread_lock = PR_NewLock()) == NULL ) {
-        errorCode = PR_GetError();
-        LDAPDebug( LDAP_DEBUG_ANY,
-		   "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
-		   errorCode, slapd_pr_strerror(errorCode), 0 );
-        exit(-1);
-   }
-
-   if ((op_thread_cv = PR_NewCondVar( op_thread_lock )) == NULL) {
-        errorCode = PR_GetError();
-       	LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
-		   errorCode, slapd_pr_strerror(errorCode), 0 );
-       	exit(-1);
-   }
+	if ((pb_q_lock = PR_NewLock()) == NULL ) {
+		errorCode = PR_GetError();
+		LDAPDebug( LDAP_DEBUG_ANY,
+			"init_op_threads: PR_NewLock failed for pb_q_lock, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
+			errorCode, slapd_pr_strerror(errorCode), 0 );
+		exit(-1);
+	}
+
+	if ((pb_q_cv = PR_NewCondVar( pb_q_lock )) == NULL) {
+		errorCode = PR_GetError();
+		LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed for pb_q_cv, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
+			errorCode, slapd_pr_strerror(errorCode), 0 );
+		exit(-1);
+	}
 
 	/* start the operation threads */
 	for (i=0; i < max_threads; i++) { 
 		PR_SetConcurrency(4);
 		if (PR_CreateThread (PR_USER_THREAD,
-                	(VFP) (void *) connection_threadmain, NULL,
-                	PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, 
-			PR_UNJOINABLE_THREAD, 
-			SLAPD_DEFAULT_THREAD_STACKSIZE
-			) == NULL ) {
+		                     (VFP) (void *) connection_threadmain, NULL,
+		                     PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+		                     PR_UNJOINABLE_THREAD,
+		                     SLAPD_DEFAULT_THREAD_STACKSIZE
+		) == NULL ) {
 			int prerr = PR_GetError();
 			LDAPDebug( LDAP_DEBUG_ANY, "PR_CreateThread failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 				prerr, slapd_pr_strerror( prerr ), 0 );
@@ -1535,8 +1528,6 @@ static int finished_chomping(Connection *conn)
  * IO Completion Ports are not available on this platform.
  */
 
-static int counter= 0; /* JCM Dumb Name */
-
 /* The connection private structure for UNIX turbo mode */
 struct Conn_private
 {
@@ -1700,37 +1691,26 @@ connection_free_private_buffer(Connection *conn)
 #define CONN_TURBO_PERCENTILE 50 /* proportion of threads allowed to be in turbo mode */
 #define CONN_TURBO_HYSTERESIS 0 /* avoid flip flopping in and out of turbo mode */
 
-int connection_wait_for_new_pb(Slapi_PBlock	**ppb, PRIntervalTime	interval)
+int connection_wait_for_new_pb(Slapi_PBlock **ppb, PRIntervalTime interval)
 {
 	int ret = CONN_FOUND_WORK_TO_DO;
-	
-	PR_Lock( op_thread_lock );
-
-	/* While there is no operation to do... */
-	while( counter < 1) {
-		/* Check if we should shutdown. */ 
-		if (op_shutdown) {
-			PR_Unlock( op_thread_lock );
-			return CONN_SHUTDOWN;
-		}
-		PR_WaitCondVar( op_thread_cv, interval);
-	}
 
-	/* There is some work to do. */
-
-	counter--;
-	PR_Unlock( op_thread_lock );
+	PR_Lock( pb_q_lock );
 
-	/* Get the next operation from the work queue. */
+	while( !op_shutdown && PB_Q_EMPTY ) {
+		PR_WaitCondVar( pb_q_cv, interval );
+	}
 
-	*ppb = get_pb();
-	if (*ppb == NULL) {
-		LDAPDebug( LDAP_DEBUG_ANY, "pb is null \n", 0,  0, 0 );
-		PR_Lock( op_thread_lock );
-		counter++;
-		PR_Unlock( op_thread_lock );
+	if ( op_shutdown ) {
+		LDAPDebug0Args( LDAP_DEBUG_ANY, "connection_wait_for_new_pb: shutdown\n" );
+		ret = CONN_SHUTDOWN;
+	} else if ( NULL == ( *ppb = get_pb() ) ) {
+		/* not sure how this can happen */
+		LDAPDebug0Args( LDAP_DEBUG_ANY, "connection_wait_for_new_pb: pb is null\n" );
 		ret = CONN_NOWORK;
 	}
+
+	PR_Unlock( pb_q_lock );
 	return ret;
 }
 
@@ -2166,12 +2146,12 @@ static void
 connection_threadmain()
 {
 	Slapi_PBlock	*pb = NULL;
-	PRIntervalTime	interval = PR_SecondsToInterval(10);
+	/* wait forever for new pb until one is available or shutdown */
+	PRIntervalTime	interval = PR_INTERVAL_NO_TIMEOUT; /* PR_SecondsToInterval(10); */
 	Connection	*conn = NULL;
 	Operation	*op;
 	ber_tag_t	tag = 0;
 	int need_wakeup = 0;
-	int need_conn_release = 0;
 	int thread_turbo_flag = 0;
 	int ret = 0;
 	int more_data = 0;
@@ -2202,6 +2182,7 @@ connection_threadmain()
 			ret = connection_wait_for_new_pb(&pb,interval);
 			switch (ret) {
 				case CONN_NOWORK:
+					PR_ASSERT(interval != PR_INTERVAL_NO_TIMEOUT); /* this should never happen with PR_INTERVAL_NO_TIMEOUT */
 					continue;
 				case CONN_SHUTDOWN:
 					LDAPDebug( LDAP_DEBUG_TRACE, 
@@ -2264,9 +2245,9 @@ connection_threadmain()
 		}
 
 		/* turn off turbo mode immediately if any pb waiting in global queue */
-		if (thread_turbo_flag && (counter > 0)) {
+		if (thread_turbo_flag && !PB_Q_EMPTY) {
 			thread_turbo_flag = 0;
-			LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode\n",conn->c_connid,0,0); 
+			LDAPDebug2Args(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode - pb_q is not empty %d\n",conn->c_connid,pb_q_size);
 		}
 #endif
 		
@@ -2313,13 +2294,16 @@ connection_threadmain()
 		 */
 		replication_connection = conn->c_isreplication_session;
 		if ((tag != LDAP_REQ_UNBIND) && !thread_turbo_flag && !more_data && !replication_connection) {
-			connection_make_readable(conn);
+			connection_make_readable_nolock(conn);
+			/* once the connection is readable, another thread may access conn,
+			 * so need locking from here on */
+			signal_listner();
 		}
 
 		/* are we in referral-only mode? */
 		if (config_check_referral_mode() && tag != LDAP_REQ_UNBIND) {
-		    referral_mode_reply(pb);
-		    goto done;
+			referral_mode_reply(pb);
+			goto done;
 		}
 
 		/* check if new password is required */
@@ -2347,6 +2331,21 @@ connection_threadmain()
 		connection_dispatch_operation(conn, op, pb);
 
 done:	
+		if (doshutdown) {
+			PR_Lock(conn->c_mutex);
+			connection_remove_operation( conn, op );
+			/* destroying the pblock will cause destruction of the operation
+			 * so this must happend before releasing the connection
+			 */
+			slapi_pblock_destroy( pb );
+			pb = NULL;
+			connection_make_readable_nolock(conn);
+			conn->c_threadnumber--;
+			connection_release_nolock(conn);
+			PR_Unlock(conn->c_mutex);
+			signal_listner();
+			return;
+		}
 		/*
 		 * done with this operation. delete it from the op
 		 * queue for this connection, delete the number of
@@ -2359,75 +2358,51 @@ done:
 		/* total number of ops for the server */
 		slapi_counter_increment(ops_completed);
 		/* If this op isn't a persistent search, remove it */
-		need_conn_release = 0;
-		if ( !( pb->pb_op->o_flags & OP_FLAG_PS )) {
-		    /* delete from connection operation queue & decr refcnt */
-		    PR_Lock( conn->c_mutex );
-		    connection_remove_operation( conn, op );
-		    /* destroying the pblock will cause destruction of the operation
-		     * so this must happend before releasing the connection
-		     */
-		    slapi_pblock_destroy( pb );
-
-		    /* If we're in turbo mode, we keep our reference to the connection 
-		       alive */
-		    if (!thread_turbo_flag && !more_data) {
-		        /*
-		         * Don't release the connection now.
-		         * But note down what to do.
-		         */
-		        need_conn_release = 1;
-		    }
-		    PR_Unlock( conn->c_mutex );
-		} else { /* the ps code acquires a ref to the conn - we need to release ours here */
-		    PR_Lock( conn->c_mutex );
-		    connection_release_nolock (conn);
-		    PR_Unlock( conn->c_mutex );
-		}
-		pb = NULL;
-		if (doshutdown) {
-			PR_Lock(conn->c_mutex);
-			connection_make_readable_nolock(conn);
-			conn->c_threadnumber--;
-			connection_release_nolock(conn);
-			PR_Unlock(conn->c_mutex);
-			signal_listner();
-			return;
-		}
-
-		if (!more_data) { /* no more data in the buffer */
-			if (!thread_turbo_flag) { /* Don't do this in turbo mode */
-				/* Since we didn't do so earlier, we need to make a 
-				 * replication connection readable again here */
-				PR_Lock( conn->c_mutex );
-				if (replication_connection || (1 == is_timedout)) {
-					connection_make_readable_nolock(conn);
-					need_wakeup = 1;
-				}
-				/* if the threadnumber of now below the maximum, wakeup
-				 * the listener thread so that we start polling on this 
-				 * connection again
-				 */
-				if (!need_wakeup) {
-					if (conn->c_threadnumber == config_get_maxthreadsperconn())
+		if ( pb->pb_op->o_flags & OP_FLAG_PS ) {
+			    PR_Lock( conn->c_mutex );
+			    connection_release_nolock (conn); /* psearch acquires ref to conn - release this one now */
+			    PR_Unlock( conn->c_mutex );
+		} else {
+			/* delete from connection operation queue & decr refcnt */
+			PR_Lock( conn->c_mutex );
+			connection_remove_operation( conn, op );
+			/* destroying the pblock will cause destruction of the operation
+			 * so this must happend before releasing the connection
+			 */
+			slapi_pblock_destroy( pb );
+
+			/* If we're in turbo mode, we keep our reference to the connection alive */
+			if (!more_data) {
+				if (!thread_turbo_flag) {
+					/*
+					 * Don't release the connection now.
+					 * But note down what to do.
+					 */
+					if (replication_connection || (1 == is_timedout)) {
+						connection_make_readable_nolock(conn);
 						need_wakeup = 1;
-					else
-						need_wakeup = 0;
-				}
-				conn->c_threadnumber--;
-				if (need_conn_release) {
+					}
+					if (!need_wakeup) {
+						if (conn->c_threadnumber == config_get_maxthreadsperconn())
+							need_wakeup = 1;
+						else
+							need_wakeup = 0;
+					}
+					conn->c_threadnumber--;
 					connection_release_nolock(conn);
-				}
-				PR_Unlock( conn->c_mutex );
-				/* Call signal_listner after releasing the 
-				 * connection if required. */
-				if (need_wakeup) {
+					/* Call signal_listner after releasing the
+					 * connection if required. */
+					if (need_wakeup) {
+						signal_listner();
+					}
+				} else if (1 == is_timedout) {
+					connection_make_readable_nolock(conn);
 					signal_listner();
 				}
-			} else if (1 == is_timedout) {
-				connection_make_readable(conn);
 			}
+			PR_Unlock( conn->c_mutex );
 		}
+		pb = NULL;
 	} /* while (1) */
 }
 
@@ -2437,33 +2412,27 @@ connection_activity(Connection *conn)
 {
 	Slapi_PBlock	*pb;
 
-	connection_make_new_pb(&pb, conn);
-	
-	/* Add pb to the end of the work queue.  */
-	add_pb( pb );
-
-	/* Check if exceed the max thread per connection.  If so, increment 
-	   c_pbwait.  Otherwise increment the counter and notify the cond. var. 
-	   there is work to do.   */
-
 	if (connection_acquire_nolock (conn) == -1) {
-	    LDAPDebug(LDAP_DEBUG_CONNS,
-		      "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n",
-		      conn->c_connid,conn->c_sd,0); 
-	    /* XXX how to handle this error? */
-	    /* MAB: 25 Jan 01: let's return on error and pray this won't leak */
-	    return (-1);
+		LDAPDebug(LDAP_DEBUG_CONNS,
+		          "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n",
+		          conn->c_connid,conn->c_sd,0);
+		/* XXX how to handle this error? */
+		/* MAB: 25 Jan 01: let's return on error and pray this won't leak */
+		return (-1);
 	}
+
+	connection_make_new_pb(&pb, conn);
+
+	/* set these here so setup_pr_read_pds will not add this conn back to the poll array */
 	conn->c_gettingber = 1;
 	conn->c_threadnumber++;
-    PR_Lock( op_thread_lock );
-    counter++;
-    PR_NotifyCondVar( op_thread_cv );
-    PR_Unlock( op_thread_lock );
-	
+	/* Add pb to the end of the work queue.  */
+	/* have to do this last - add_pb will signal waiters in connection_wait_for_new_pb */
+	add_pb( pb );
+
 	if (! config_check_referral_mode()) {
-	    slapi_counter_increment(ops_initiated);
-	    slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps); 
+		slapi_counter_increment(ops_initiated);
+		slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps);
 	}
 	return 0;
 }
@@ -2474,7 +2443,6 @@ connection_activity(Connection *conn)
 static void
 add_pb( Slapi_PBlock *pb)
 {
-
 	struct Slapi_PBlock_q	*new_pb=NULL;
 
 	LDAPDebug( LDAP_DEBUG_TRACE, "add_pb \n", 0,  0, 0 );
@@ -2492,25 +2460,27 @@ add_pb( Slapi_PBlock *pb)
 		last_pb->next_pb = new_pb;
 		last_pb = new_pb;
 	}
+	PR_AtomicIncrement( &pb_q_size ); /* increment q size */
+	if ( pb_q_size > pb_q_size_max ) {
+		pb_q_size_max = pb_q_size;
+	}
+	PR_NotifyCondVar( pb_q_cv ); /* notify waiters in connection_wait_for_new_pb */
 	PR_Unlock( pb_q_lock );
 }
 
-/* get_pb(): will get a pb from the begining of the work queue, return NULL if 
-	the queue is empty.*/
+/* get_pb(): will get a pb from the beginning of the work queue, return NULL if
+	the queue is empty.  This should only be called from connection_wait_for_new_pb
+	with the pb_q_lock held */
 
 static Slapi_PBlock *
 get_pb()
 {
-
 	struct Slapi_PBlock_q  *tmp = NULL;
 	Slapi_PBlock *pb;
 
-	LDAPDebug( LDAP_DEBUG_TRACE, "get_pb \n", 0,  0, 0 );
-	PR_Lock( pb_q_lock );
+	LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_pb \n" );
 	if (first_pb == NULL) {
-		PR_Unlock( pb_q_lock );
-		LDAPDebug( LDAP_DEBUG_ANY, "get_pb: the work queue is empty.\n",
-			 0,  0, 0 );
+		LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_pb: the work queue is empty.\n" );
 		return NULL;
 	}
 
@@ -2519,11 +2489,11 @@ get_pb()
 		last_pb = NULL;
 	}
 	first_pb = tmp->next_pb;
-	PR_Unlock( pb_q_lock );
 
 	pb = tmp->pb;
 	/* Free the memory used by the pb found. */
 	slapi_ch_free ((void **)&tmp);
+	PR_AtomicDecrement( &pb_q_size ); /* decrement q size */
 
 	return (pb);
 }
@@ -2540,28 +2510,28 @@ void
 op_thread_cleanup()
 {
 #ifdef _WIN32
-    int i;
-    PRIntervalTime    interval;
-    int max_threads = config_get_threadnumber();
-    interval = PR_SecondsToInterval(3); 
+	int i;
+	PRIntervalTime    interval;
+	int max_threads = config_get_threadnumber();
+	interval = PR_SecondsToInterval(3);
 #endif	
-    LDAPDebug( LDAP_DEBUG_ANY, 
-        "slapd shutting down - signaling operation threads\n", 0, 0, 0);
-    
-    PR_Lock( op_thread_lock );
-    op_shutdown = 1;
-    PR_NotifyAllCondVar ( op_thread_cv );
-    PR_Unlock( op_thread_lock );
+	LDAPDebug( LDAP_DEBUG_ANY,
+		"slapd shutting down - signaling operation threads\n", 0, 0, 0);
+
+	PR_AtomicIncrement(&op_shutdown);
+	PR_Lock( pb_q_lock );
+	PR_NotifyAllCondVar ( pb_q_cv ); /* tell any thread waiting in connection_wait_for_new_pb to shutdown */
+	PR_Unlock( pb_q_lock );
 #ifdef _WIN32 
-    LDAPDebug( LDAP_DEBUG_ANY,
-              "slapd shutting down - waiting for %d threads to terminate\n",
-              g_get_active_threadcnt(), 0, 0 );
-    /* kill off each worker waiting on GetQueuedCompletionStatus */
-    for ( i = 0; i < max_threads; ++ i )
-    {
-        PostQueuedCompletionStatus( completion_port, 0, COMPKEY_DIE ,0);
-    }
-    /* don't sleep: there's no reason to do so here DS_Sleep(interval); */ /* sleep 3 seconds */
+	LDAPDebug( LDAP_DEBUG_ANY,
+		"slapd shutting down - waiting for %d threads to terminate\n",
+		g_get_active_threadcnt(), 0, 0 );
+	/* kill off each worker waiting on GetQueuedCompletionStatus */
+	for ( i = 0; i < max_threads; ++ i )
+	{
+		PostQueuedCompletionStatus( completion_port, 0, COMPKEY_DIE ,0);
+	}
+	/* don't sleep: there's no reason to do so here DS_Sleep(interval); */ /* sleep 3 seconds */
 #endif
 }