Browse Source

Ticket 368 - Make the cleanAllRUV task one step

Bug Description:  The first version of this fix required a second releaseRUV step.

Fix Description:  Created a new "monitoring" thread that checks all the replicas RUV's
                  to see if the rid was cleaned.  Once they are cleaned, we automatically
                  release the RID for reuse.

                  I also refined the logging so it easy to track the status of the task.

https://fedorahosted.org/389/ticket/368

reviewed by: richm (Thanks Rich!)
Mark Reynolds 13 years ago
parent
commit
507b593fa4

+ 10 - 0
ldap/servers/plugins/replication/repl5.h

@@ -439,6 +439,9 @@ long conn_get_timeout(Repl_Connection *conn);
 void conn_set_agmt_changed(Repl_Connection *conn);
 ConnResult conn_read_result(Repl_Connection *conn, int *message_id);
 ConnResult conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retdatap, LDAPControl ***returned_controls, int send_msgid, int *resp_msgid, int noblock);
+LDAP * conn_get_ldap(Repl_Connection *conn);
+void conn_lock(Repl_Connection *conn);
+void conn_unlock(Repl_Connection *conn);
 
 /* In repl5_protocol.c */
 typedef struct repl_protocol Repl_Protocol;
@@ -598,9 +601,16 @@ void set_released_rid(int rid);
 int is_released_rid(int rid);
 int is_already_released_rid();
 void delete_released_rid();
+void replica_cleanallruv_monitor_thread(void *arg);
 
 #define ALREADY_RELEASED -1
 
+typedef struct _cleanruv_data
+{
+	Object *repl_obj;
+	ReplicaId rid;
+} cleanruv_data;
+
 /* replutil.c */
 LDAPControl* create_managedsait_control ();
 LDAPControl* create_backend_control(Slapi_DN *sdn);

+ 26 - 0
ldap/servers/plugins/replication/repl5_connection.c

@@ -1710,6 +1710,16 @@ conn_get_timeout(Repl_Connection *conn)
 	return retval;
 }
 
+LDAP *
+conn_get_ldap(Repl_Connection *conn)
+{
+	if(conn){
+		return conn->ld;
+	} else {
+		return NULL;
+	}
+}
+
 void conn_set_agmt_changed(Repl_Connection *conn)
 {
 	PR_ASSERT(NULL != conn);
@@ -1908,3 +1918,19 @@ repl5_debug_timeout_callback(time_t when, void *arg)
 		"repl5_debug_timeout_callback: set debug level to %d at %ld\n",
 		s_debug_level, when);
 }
+
+void
+conn_lock(Repl_Connection *conn)
+{
+	if(conn){
+		PR_Lock(conn->lock);
+	}
+}
+
+void
+conn_unlock(Repl_Connection *conn)
+{
+	if(conn){
+		PR_Unlock(conn->lock);
+	}
+}

+ 223 - 34
ldap/servers/plugins/replication/repl5_replica_config.c

@@ -81,12 +81,12 @@ static int replica_execute_cl2ldif_task (Object *r, char *returntext);
 static int replica_execute_ldif2cl_task (Object *r, char *returntext);
 static int replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext);
 static int replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext);
-static int replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext);
+static int replica_execute_release_ruv_task(Object *r, ReplicaId rid);
 static struct berval *create_ruv_payload(char *value);
 static int replica_cleanup_task (Object *r, const char *task_name, char *returntext, int apply_mods);
 static int replica_task_done(Replica *replica);
-												
 static multimaster_mtnode_extension * _replica_config_get_mtnode_ext (const Slapi_Entry *e);
+int g_get_shutdown();
 
 /*
  * Note: internal add/modify/delete operations should not be run while
@@ -860,21 +860,6 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt
 		else
 			return LDAP_SUCCESS;
 	}
-	else if (strncasecmp (task_name, RELEASERUV, RELEASERUVLEN) == 0)
-	{
-		int temprid = atoi(&(task_name[RELEASERUVLEN]));
-		if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
-			PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id for task - %s", task_name);
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,"replica_execute_task: %s\n", returntext);
-			return LDAP_OPERATIONS_ERROR;
-		}
-		if (apply_mods)
-		{
-			return replica_execute_release_ruv_task(r, (ReplicaId)temprid, returntext);
-		}
-		else
-			return LDAP_SUCCESS;
-	}
 	else
 	{
         PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "unsupported replica task - %s", task_name);
@@ -1177,7 +1162,6 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
 	 *  Clean the changelog RUV's, and set the rids
 	 */
 	cl5CleanRUV(rid);
-	set_cleaned_rid(rid);
 	delete_released_rid();
 
 	if (rc != RUV_SUCCESS){
@@ -1191,19 +1175,22 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
 static int
 replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext)
 {
+	PRThread *thread = NULL;
 	Repl_Connection *conn;
 	Replica *replica = (Replica*)object_get_data (r);
 	Object *agmt_obj;
 	Repl_Agmt *agmt;
 	ConnResult crc;
+	cleanruv_data *data = NULL;
 	const Slapi_DN *dn = NULL;
 	struct berval *payload = NULL;
 	char *ridstr = NULL;
 	int send_msgid = 0;
+	int agmt_count = 0;
 	int rc = 0;
 
-	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: cleaning rid (%d)...\n",(int)rid);
-
+	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: cleaning rid (%d)...\n",(int)rid);
+	set_cleaned_rid(rid);
 	/*
 	 *  Create payload
 	 */
@@ -1245,8 +1232,9 @@ replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext)
 				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to send "
 					"cleanruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
 			} else {
-				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: successfully sent "
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: successfully sent "
 					"cleanruv extended op to (%s)\n",slapi_sdn_get_dn(dn));
+				agmt_count++;
 			}
 			conn_start_linger(conn);
 		}
@@ -1270,17 +1258,218 @@ done:
 	 */
 	replica_execute_cleanruv_task (r, rid, returntext);
 
-	if(rc == 0){
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: operation successful\n");
+	if(rc == 0 && agmt_count > 0){  /* success, but we need to check our replicas */
+		/*
+		 *  Launch the cleanruv monitoring thread.  Once all the replicas are cleaned it will release the rid
+		 */
+		data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data));
+		if (data == NULL) {
+			slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to allocate "
+				"cleanruv_data.  Aborting task.\n");
+			return -1;
+		}
+		data->repl_obj = r;
+		data->rid = rid;
+
+		thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_monitor_thread,
+				(void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+				PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);
+		if (thread == NULL) {
+			slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: unable to create cleanAllRUV "
+				"monitoring thread.  Aborting task.\n");
+		}
+
+	} else if(r == 0){ /* success */
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully Finished.\n");
 	} else {
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: operation failed (%d)\n",rc);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Task failed (%d)\n",rc);
 	}
 
 	return rc;
 }
 
+/*
+ *  After all the cleanAllRUV extended ops have been sent, we need to monitoring those replicas'
+ *  RUVs.  Once the rid is cleaned, then we need to release it, and push this "release"
+ *  to the other replicas
+ */
+void
+replica_cleanallruv_monitor_thread(void *arg)
+{
+	Object *agmt_obj;
+	LDAP *ld;
+	Repl_Connection *conn;
+	Repl_Agmt *agmt;
+	Replica *replica;;
+	cleanruv_data *data = arg;
+	LDAPMessage *result, *entry;
+	BerElement   *ber;
+	time_t start_time;
+	struct berval **vals;
+	char *rid_text;
+	char *attrs[2];
+	char *attr;
+	int replicas_cleaned = 0;
+	int found = 0;
+	int crc;
+	int rc = 0;
+	int i;
+
+	/*
+	 *  Initialize our settings
+	 */
+	attrs[0] = "nsds50ruv";
+	attrs[1] = NULL;
+	rid_text = slapi_ch_smprintf("{replica %d ldap", data->rid);
+	replica = (Replica*)object_get_data (data->repl_obj);
+	start_time = current_time();
+
+	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Waiting for all the replicas to get cleaned...\n");
+
+	while(!g_get_shutdown())
+	{
+		DS_Sleep(PR_SecondsToInterval(10));
+		found = 0;
+		agmt_obj = agmtlist_get_first_agreement_for_replica (replica);
+		while (agmt_obj){
+			agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+			if(!agmt_is_enabled(agmt)){
+				agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+				continue;
+			}
+			/*
+			 *  Get the replication connection
+			 */
+			conn = (Repl_Connection *)agmt_get_connection(agmt);
+			crc = conn_connect(conn);
+			if (CONN_OPERATION_FAILED == crc ){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to connect "
+					"to repl agreement connection (%s), error %d\n","", ACQUIRE_TRANSIENT_ERROR);
+				continue;
+			} else if (CONN_SSL_NOT_ENABLED == crc){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to acquire "
+					"repl agmt connection (%s), error %d\n","", ACQUIRE_FATAL_ERROR);
+				continue;
+			}
+			/*
+			 *  Get the LDAP connection handle from the conn
+			 */
+			ld = conn_get_ldap(conn);
+			if(ld == NULL){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to get LDAP "
+					"handle from the replication agmt (%s).  Moving on to the next agmt.\n",agmt_get_long_name(agmt));
+				continue;
+			}
+			/*
+			 *  Search this replica for its tombstone/ruv entry
+			 */
+			conn_cancel_linger(conn);
+			conn_lock(conn);
+			rc = ldap_search_ext_s(ld, slapi_sdn_get_dn(agmt_get_replarea(agmt)), LDAP_SCOPE_SUBTREE,
+				"(&(nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff)(objectclass=nstombstone))",
+				attrs, 0, NULL, NULL, NULL, 0, &result);
+			if(rc != LDAP_SUCCESS){
+				/*
+				 *  Couldn't contact ldap server, what should we do?
+				 *
+				 *  Skip it and move on!  It's the admin's job to make sure all the replicas are up and running
+				 */
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to contact "
+					"agmt (%s), moving on to the next agmt.\n", agmt_get_long_name(agmt));
+				conn_unlock(conn);
+				conn_start_linger(conn);
+				continue;
+			}
+			/*
+			 *  There is only one entry.  Check its "nsds50ruv" for our cleaned rid
+			 */
+			entry = ldap_first_entry( ld, result );
+			if ( entry != NULL ) {
+				for ( attr = ldap_first_attribute( ld, entry, &ber ); attr != NULL; attr = ldap_next_attribute( ld, entry, ber ) ){
+					/* make sure the attribute is nsds50ruv */
+					if(strcasecmp(attr,"nsds50ruv") != 0){
+						continue;
+					}
+					if ((vals = ldap_get_values_len( ld, entry, attr)) != NULL ) {
+						for ( i = 0; vals[i] && vals[i]->bv_val; i++ ) {
+							/* look for this replica */
+							if(strstr(rid_text, vals[i]->bv_val)){
+								/* rid has not been cleaned yet, start over */
+								found = 1;
+								break;
+							}
+						}
+						ldap_value_free_len(vals);
+					}
+					ldap_memfree( attr );
+				}
+				if ( ber != NULL ) {
+					ber_free( ber, 0 );
+				}
+			}
+			ldap_msgfree( result );
+			/*
+			 *  Unlock the connection, and start the linger timer
+			 */
+			conn_unlock(conn);
+			conn_start_linger(conn);
+
+			if(found){
+				/*
+				 *  If we've been trying to clean these replicas for over an hour, just quit
+				 */
+				if( (current_time() - start_time) > 3600){
+					slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: timed out checking if replicas have "
+						"been cleaned.  The rid has not been released, you need to rerun the task.\n");
+					goto done;
+				}
+				/*
+				 *  a rid has not been cleaned yet, go back to sleep and check them again
+				 */
+				break;
+			}
+
+			agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+		}
+		if(!found){
+			/*
+			 *  The replicas have been cleaned!  Next, release the rid
+			 */
+			replicas_cleaned = 1;
+			break;
+		}
+	} /* while */
+
+	/*
+	 *  If the replicas are cleaned, release the rid
+	 */
+	if(replicas_cleaned){
+		rc = replica_execute_release_ruv_task(data->repl_obj, data->rid);
+		if(rc == 0){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully Finished.  All active "
+				"replicas have been cleaned.\n");
+		} else {
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Failed: Replica ID was not released (%d)  "
+			"You will need to rerun the task.\n", rc);
+		}
+	} else {
+		/*
+		 *  Shutdown
+		 */
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: slapd shutting down, you will need to rerun the task.\n");
+	}
+
+done:
+	slapi_ch_free((void **)&rid_text);
+	slapi_ch_free((void **)&data);
+}
+
+/*
+ *  This function releases the cleaned rid so that it can be reused.
+ *  We send this operation to all the known active replicas.
+ */
 static int
-replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext)
+replica_execute_release_ruv_task(Object *r, ReplicaId rid)
 {
 	Repl_Connection *conn;
 	Replica *replica = (Replica*)object_get_data (r);
@@ -1293,7 +1482,7 @@ replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext)
 	int send_msgid = 0;
 	int rc = 0;
 
-	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: releasing rid (%d)...\n", rid);
+	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: releasing rid (%d)...\n", rid);
 
 	/*
 	 * Set the released rid, and trigger cl trimmming
@@ -1339,18 +1528,18 @@ replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext)
 			conn_cancel_linger(conn);
 			crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, payload, NULL, &send_msgid);
 			if (CONN_OPERATION_SUCCESS != crc){
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to send "
-					"releaseruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to send "
+					"releaseRUV extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
 				rc = LDAP_OPERATIONS_ERROR;
 			} else {
-				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: successfully sent "
-					"extended op to (%s)\n",slapi_sdn_get_dn(dn));
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: successfully sent "
+					"releaseRUV extended op to (%s)\n",slapi_sdn_get_dn(dn));
 			}
 			conn_start_linger(conn);
 		}
 		if(crc){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: replica (%s) has not "
-					"been cleaned.  You will need to rerun the RELEASERUV task on this replica\n",
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: replica (%s) has not "
+					"been released.  You will need to rerun the task\n",
 					slapi_sdn_get_dn(dn));
 			rc = crc;
 		}
@@ -1364,9 +1553,9 @@ done:
 	if(rc == 0){
 		set_released_rid(ALREADY_RELEASED);
 		delete_cleaned_rid();
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: Successfully released rid (%d)\n", rid);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully released rid (%d)\n", rid);
 	} else {
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: Failed to release rid (%d), error (%d)\n", rid, rc);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Failed to release rid (%d), error (%d)\n", rid, rc);
 	}
 
 	if(payload)

+ 57 - 34
ldap/servers/plugins/replication/repl_extop.c

@@ -1388,18 +1388,21 @@ free_and_return:
 int
 multimaster_extop_cleanruv(Slapi_PBlock *pb){
 	multimaster_mtnode_extension *mtnode_ext;
+	PRThread *thread = NULL;
 	Repl_Connection *conn;
 	const Slapi_DN *dn;
 	Replica *r = NULL;
 	Object *agmt_obj;
 	Repl_Agmt *agmt;
 	ConnResult crc;
+	cleanruv_data *data = NULL;
 	struct berval *extop_value;
 	char *extop_oid;
 	char *repl_root;
 	char *payload = NULL;
 	char *iter;
 	int send_msgid = 0;
+	int agmt_count = 0;
 	int rid = 0;
 	int rc = 0;
 
@@ -1411,41 +1414,39 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb){
 		/* something is wrong, error out */
 		return -1;
 	}
-
 	/*
 	 *  Extract the rid and repl_root from the payload
 	 */
 	if(decode_cleanruv_payload(extop_value, &payload)){
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to decode payload.  Aborting ext op\n");
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to decode payload.  Aborting ext op\n");
 		return -1;
 	}
 	rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
 	repl_root = ldap_utf8strtok_r(iter, ":", &iter);
 
-	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid (%d)...\n",rid);
-
 	/*
 	 *  If we already cleaned this server, just return success
 	 */
 	if(is_cleaned_rid(rid)){
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: rid (%d) has already been cleaned, skipping\n",rid);
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_extop: rid (%d) has already been cleaned, skipping\n",rid);
 		return rc;
 	} else {
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid (%d)...\n", rid);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: cleaning rid (%d)...\n", rid);
+		set_cleaned_rid(rid);
 	}
 
 	/*
 	 *  Get the node, so we can get the replica and its agreements
 	 */
 	if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to get replication node "
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to get replication node "
 			"from (%s), aborting operation\n", repl_root);
 		return -1;
 	}
 	if (mtnode_ext->replica)
 		object_acquire (mtnode_ext->replica);
 	if (mtnode_ext->replica == NULL){
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica is missing from (%s), "
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: replica is missing from (%s), "
 			"aborting operation\n",repl_root);
 		rc = LDAP_OPERATIONS_ERROR;
 		goto free_and_return;
@@ -1466,36 +1467,37 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb){
 		conn = (Repl_Connection *)agmt_get_connection(agmt);
 		if(conn == NULL){
 			/* no connection for this agreement, move on to the next agmt */
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: the replica (%s), is "
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: the replica (%s), is "
 				"missing the connection.  This replica will not be cleaned.\n", slapi_sdn_get_dn(dn));
 			agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
 			continue;
 		}
 		crc = conn_connect(conn);
 		if (CONN_OPERATION_FAILED == crc ){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to connect "
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to connect "
 				"to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR);
 			rc = LDAP_OPERATIONS_ERROR;
 		} else if (CONN_SSL_NOT_ENABLED == crc){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to acquire "
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to acquire "
 				"repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
 			rc = LDAP_OPERATIONS_ERROR;
 		} else {
 			conn_cancel_linger(conn);
 			crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, extop_value, NULL, &send_msgid);
 			if (CONN_OPERATION_SUCCESS != crc){
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to send "
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to send "
 					"clean_ruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
 				rc = LDAP_OPERATIONS_ERROR;
 			} else {
 				/* success */
-				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: successfully sent "
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: successfully sent "
 					"extended op to (%s)\n",slapi_sdn_get_dn(dn) );
+				agmt_count++;
 			}
 			conn_start_linger(conn);
 		}
 		if(crc != CONN_OPERATION_SUCCESS){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica (%s) has not "
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: replica (%s) has not "
 					        "been cleaned.  You will need to rerun the CLEANALLRUV task on this replica\n",
 					        slapi_sdn_get_dn(dn) );
 			rc = LDAP_OPERATIONS_ERROR;
@@ -1508,10 +1510,30 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb){
 
 free_and_return:
 
-	if(rc == 0){
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaned rid (%d)\n", rid);
+	if(rc == 0 && agmt_count > 0){
+		/*
+		 *  Launch the cleanruv monitoring thread.  Once all the replicas are cleaned it will release the rid
+		 */
+		data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data));
+		if (data == NULL) {
+			slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to allocate "
+				"cleanruv_Data\n");
+			return -1;
+		}
+		data->repl_obj = mtnode_ext->replica;
+		data->rid = rid;
+
+		thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_monitor_thread,
+				(void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+				PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);
+		if (thread == NULL) {
+			slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: unable to create cleanAllRUV "
+				"monitoring thread.  Aborting task.\n");
+		}
+	} else if (rc == 0){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: Successfully Finished.\n");
 	} else {
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to clean rid (%d), error (%d)\n",rid, rc);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanALLRUV_extop: failed to clean rid (%d), error (%d)\n",rid, rc);
 	}
 
 	if (mtnode_ext->replica)
@@ -1558,34 +1580,34 @@ multimaster_extop_releaseruv(Slapi_PBlock *pb){
 	}
 
 	if(decode_cleanruv_payload(extop_value, &payload)){
-		slapi_log_error(SLAPI_LOG_FATAL,repl_plugin_name, "releaseruv_extop: failed to decode payload, aborting ext op\n");
+		slapi_log_error(SLAPI_LOG_FATAL,repl_plugin_name, "releaseRUV_extop: failed to decode payload, aborting ext op.\n");
 		return -1;
 	}
 	rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
 	repl_root = ldap_utf8strtok_r(iter, ":", &iter);
 
-	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: releasing rid (%d)...\n",rid);
 	/*
 	 *  If we already released this ruv, just return.
 	 */
 	if(is_released_rid(rid) || is_already_released_rid()){
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: rid (%d) has already been released, skipping\n",rid);
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_extop: rid (%d) has already been released, skipping.\n",rid);
 		return 0;
 	} else {
 		/* set the released rid, and trigger trimming */
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: releasing rid (%d)...\n", rid);
 		set_released_rid((int)rid);
 		trigger_cl_trimming();
 	}
 
 	if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to get node "
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: failed to get node "
 			"from replication root dn(%s), aborting operation.\n", repl_root);
 		return -1;
 	}
 	if (mtnode_ext->replica)
 		object_acquire (mtnode_ext->replica);
 	if (mtnode_ext->replica == NULL){
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica is missing from (%s), "
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: replica is missing from (%s), "
 			"aborting operation.\n", repl_root);
 		rc = LDAP_OPERATIONS_ERROR;
 		goto free_and_return;
@@ -1606,38 +1628,38 @@ multimaster_extop_releaseruv(Slapi_PBlock *pb){
 		conn = (Repl_Connection *)agmt_get_connection(agmt);
 		if(conn == NULL){
 			/* no connection for this agreement, log error, and move on */
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: the replica (%s), is "
-				"missing the connection.  This replica will not be cleaned.\n", slapi_sdn_get_dn(dn));
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: the replica (%s), is "
+				"missing the connection.  This replica will not be released.\n", slapi_sdn_get_dn(dn));
 			agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
 			continue;
 		}
 		crc = conn_connect(conn);
 		if (CONN_OPERATION_FAILED == crc ){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to connect "
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: failed to connect "
 				"to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR);
 			rc = LDAP_OPERATIONS_ERROR;
 		} else if (CONN_SSL_NOT_ENABLED == crc){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to acquire "
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: failed to acquire "
 				"repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
 			rc = LDAP_OPERATIONS_ERROR;
 		} else {
 			conn_cancel_linger(conn);
 			crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, extop_value, NULL, &send_msgid);
 			if (CONN_OPERATION_SUCCESS != crc){
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to send "
-					"releaseruv extended op to repl agmt (%s), error %d\n",	slapi_sdn_get_dn(dn), crc);
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to send "
+					"releaseRUV extended op to repl agmt (%s), error %d\n",	slapi_sdn_get_dn(dn), crc);
 				rc = LDAP_OPERATIONS_ERROR;
 			} else {
 				/* success */
-				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: successfully sent "
-					"extended op to (%s)\n",slapi_sdn_get_dn(dn) );
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: successfully sent "
+					"releaseRUV extended op to (%s)\n",slapi_sdn_get_dn(dn) );
 				rc = 0;
 			}
 			conn_start_linger(conn);
 		}
 		if(crc){
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica (%s) has not "
-				"been cleaned.  You will need to rerun the RELEASERUV task on this replica\n",
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: replica (%s) has not "
+				"been released.  You will need to rerun the task.\n",
 				slapi_sdn_get_dn(dn) );
 		}
 		agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
@@ -1650,9 +1672,10 @@ free_and_return:
 	if(rc == 0){
 		set_released_rid(ALREADY_RELEASED);
 		delete_cleaned_rid();
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: released rid (%d) successfully\n", rid);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: Successfully released rid (%d)\n", rid);
 	} else {
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to release rid(%d), error (%d), please retry the task\n",rid, rc);
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: Failed to release rid(%d), error (%d), "
+			"please retry the task.\n",rid, rc);
 	}
 
 	if(mtnode_ext->replica)