Forráskód Böngészése

Ticket 49008 v2: aborted operation can leave RUV in incorrect state

    Bug description:
    If a plugin operation succeeded, but the operation itself fails and is aborted the RUV is in an incorrect state (rolled up to the succesful plugin op)

    Fix Decription:
    Introduce a "primary_csn", this is the csn of the main operation, either a client operation or a replicated operation.
    csns generated by internal operations, eg by plugins are secondary csn.

    Maintain the primary csn in thread local data, like it is used for the agreement name (or txn stack): prim_csn.

    Extend the data structure of the pending list to keep prim_csn for each inserted csn

    If a csn is created or received check prim_csn: if it exists use it, if it doesn't exist set it

    when inserting a csn to the pending list pass the prim_csn

    when cancelling a csn, if it is the prim_csn also cancell all secondary csns

    when committing a csn,

    if it is not the primary csn, do nothing

    if it is the prim_csn trigger the pending list rollup, stop at the first not committed csn

    if the RID of the prim_csn is not the local RID also rollup the pending list for the local RID.

    Reviewed by: Thierry, Thanks
Ludwig Krispenz 8 éve
szülő
commit
0c7ef560ae

+ 66 - 9
ldap/servers/plugins/replication/csnpl.c

@@ -24,8 +24,9 @@ struct csnpl
 
 
 typedef struct _csnpldata
 typedef struct _csnpldata
 {
 {
-	PRBool		committed;  /* True if CSN committed */
-	CSN			*csn;       /* The actual CSN */
+	PRBool	committed;  /* True if CSN committed */
+	CSN	*csn;       /* The actual CSN */
+	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
 } csnpldata;
 } csnpldata;
 
 
 /* forward declarations */
 /* forward declarations */
@@ -103,7 +104,7 @@ void csnplFree (CSNPL **csnpl)
  *          1 if the csn has already been seen
  *          1 if the csn has already been seen
  *         -1 for any other kind of errors
  *         -1 for any other kind of errors
  */
  */
-int csnplInsert (CSNPL *csnpl, const CSN *csn)
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
 {
 {
 	int rc;
 	int rc;
 	csnpldata *csnplnode;
 	csnpldata *csnplnode;
@@ -131,6 +132,7 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn)
 	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
 	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
 	csnplnode->committed = PR_FALSE;
 	csnplnode->committed = PR_FALSE;
 	csnplnode->csn = csn_dup(csn);
 	csnplnode->csn = csn_dup(csn);
+	csnplnode->prim_csn = prim_csn;
 	csn_as_string(csn, PR_FALSE, csn_str);
 	csn_as_string(csn, PR_FALSE, csn_str);
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
 
 
@@ -186,6 +188,57 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
 	return 0;
 	return 0;
 }
 }
 
 
+int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
+{
+	csnpldata *data;
+	void *iterator;
+
+	slapi_rwlock_wrlock (csnpl->csnLock);
+	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
+	while (NULL != data)
+	{
+		if (csn_is_equal(data->csn, csn) ||
+		    csn_is_equal(data->prim_csn, csn)) {
+			csnpldata_free(&data);
+			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
+		} else {
+			data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
+		}
+	}
+#ifdef DEBUG
+    _csnplDumpContentNoLock(csnpl, "csnplRemoveAll");
+#endif
+	slapi_rwlock_unlock (csnpl->csnLock);
+	return 0;
+}
+
+
+int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
+{
+	csnpldata *data;
+	void *iterator;
+	char csn_str[CSN_STRSIZE];
+
+	csn_as_string(csn, PR_FALSE, csn_str);
+	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
+		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
+	slapi_rwlock_wrlock (csnpl->csnLock);
+	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
+	while (NULL != data)
+	{
+		csn_as_string(data->csn, PR_FALSE, csn_str);
+		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
+				"csnplCommitALL: processing data csn %s\n", csn_str);
+		if (csn_is_equal(data->csn, csn) ||
+		    csn_is_equal(data->prim_csn, csn)) {
+			data->committed = PR_TRUE;
+		}
+		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
+	}
+	slapi_rwlock_unlock (csnpl->csnLock);
+	return 0;
+}
+
 int csnplCommit (CSNPL *csnpl, const CSN *csn)
 int csnplCommit (CSNPL *csnpl, const CSN *csn)
 {
 {
 	csnpldata *data;
 	csnpldata *data;
@@ -276,13 +329,12 @@ csnplRollUp(CSNPL *csnpl, CSN **first_commited)
 	  *first_commited = NULL;
 	  *first_commited = NULL;
 	}
 	}
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
-	while (NULL != data)
+	while (NULL != data && data->committed)
 	{
 	{
 		if (NULL != largest_committed_csn && freeit)
 		if (NULL != largest_committed_csn && freeit)
 		{
 		{
 			csn_free(&largest_committed_csn);
 			csn_free(&largest_committed_csn);
 		}
 		}
-		if (data->committed) {
 			freeit = PR_TRUE;
 			freeit = PR_TRUE;
 			largest_committed_csn = data->csn; /* Save it */
 			largest_committed_csn = data->csn; /* Save it */
 			if (first_commited && (*first_commited == NULL)) {
 			if (first_commited && (*first_commited == NULL)) {
@@ -294,9 +346,6 @@ csnplRollUp(CSNPL *csnpl, CSN **first_commited)
 			data->csn = NULL;
 			data->csn = NULL;
 			csnpldata_free(&data);
 			csnpldata_free(&data);
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
-		} else {
-			data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
-		}
 	} 
 	} 
 
 
 #ifdef DEBUG
 #ifdef DEBUG
@@ -326,6 +375,7 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
     csnpldata *data;
     csnpldata *data;
     void *iterator;
     void *iterator;
     char csn_str[CSN_STRSIZE];
     char csn_str[CSN_STRSIZE];
+    char primcsn_str[CSN_STRSIZE];
     
     
     data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
     data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
 	if (data) {
 	if (data) {
@@ -334,11 +384,18 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
 	}
 	}
     while (data)
     while (data)
     {
     {
-        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "%s, %s\n",                        
+        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "%s,(prim %s), %s\n",
                         csn_as_string(data->csn, PR_FALSE, csn_str),
                         csn_as_string(data->csn, PR_FALSE, csn_str),
+			data->prim_csn ? csn_as_string(data->prim_csn, PR_FALSE, primcsn_str) : " ",
                         data->committed ? "committed" : "not committed");
                         data->committed ? "committed" : "not committed");
         data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
         data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
     }
     }
 }
 }
 #endif
 #endif
 
 
+/* wrapper around csn_free, to satisfy NSPR thread context API */
+void
+csnplFreeCSN (void *arg)
+{
+	csn_free((CSN **)&arg);
+}

+ 4 - 1
ldap/servers/plugins/replication/csnpl.h

@@ -22,10 +22,13 @@ typedef struct csnpl CSNPL;
 
 
 CSNPL* csnplNew(void);
 CSNPL* csnplNew(void);
 void csnplFree (CSNPL **csnpl);
 void csnplFree (CSNPL **csnpl);
-int csnplInsert (CSNPL *csnpl, const CSN *csn);
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
+int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
+int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
 void csnplDumpContent(CSNPL *csnpl, const char *caller); 
 void csnplDumpContent(CSNPL *csnpl, const char *caller); 
+
 #endif
 #endif

+ 2 - 0
ldap/servers/plugins/replication/repl5.h

@@ -232,6 +232,8 @@ int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
 extern int repl5_is_betxn;
 extern int repl5_is_betxn;
 char* get_thread_private_agmtname(void);
 char* get_thread_private_agmtname(void);
 void  set_thread_private_agmtname (const char *agmtname);
 void  set_thread_private_agmtname (const char *agmtname);
+void  set_thread_primary_csn (const CSN *prim_csn);
+CSN*  get_thread_primary_csn(void);
 void* get_thread_private_cache(void);
 void* get_thread_private_cache(void);
 void  set_thread_private_cache (void *buf);
 void  set_thread_private_cache (void *buf);
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);

+ 22 - 0
ldap/servers/plugins/replication/repl5_init.c

@@ -136,6 +136,7 @@ static int multimaster_started_flag = 0;
 /* Thread private data and interface */
 /* Thread private data and interface */
 static PRUintn thread_private_agmtname;	/* thread private index for logging*/
 static PRUintn thread_private_agmtname;	/* thread private index for logging*/
 static PRUintn thread_private_cache;
 static PRUintn thread_private_cache;
+static PRUintn thread_primary_csn;
 
 
 char*
 char*
 get_thread_private_agmtname()
 get_thread_private_agmtname()
@@ -153,6 +154,26 @@ set_thread_private_agmtname(const char *agmtname)
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
 }
 }
 
 
+CSN*
+get_thread_primary_csn(void)
+{
+	CSN *prim_csn = NULL;
+	if (thread_primary_csn)
+		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
+	return prim_csn;
+}
+void
+set_thread_primary_csn(const CSN *prim_csn)
+{
+	if (thread_primary_csn) {
+		if (prim_csn) {
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
+		} else {
+			PR_SetThreadPrivate(thread_primary_csn, NULL);
+		}
+	}
+}
+
 void*
 void*
 get_thread_private_cache ()
 get_thread_private_cache ()
 {
 {
@@ -719,6 +740,7 @@ multimaster_start( Slapi_PBlock *pb )
 		/* Initialize thread private data for logging. Ignore if fails */
 		/* Initialize thread private data for logging. Ignore if fails */
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
 
 
 		/* Decode the command line args to see if we're dumping to LDIF */
 		/* Decode the command line args to see if we're dumping to LDIF */
 		is_ldif_dump = check_for_ldif_dump(pb);
 		is_ldif_dump = check_for_ldif_dump(pb);

+ 24 - 16
ldap/servers/plugins/replication/repl5_plugins.c

@@ -1033,9 +1033,11 @@ static int
 write_changelog_and_ruv (Slapi_PBlock *pb)
 write_changelog_and_ruv (Slapi_PBlock *pb)
 {
 {
 	Slapi_Operation *op = NULL;
 	Slapi_Operation *op = NULL;
+	CSN *opcsn;
+	CSN *prim_csn;
 	int rc;
 	int rc;
 	slapi_operation_parameters *op_params = NULL;
 	slapi_operation_parameters *op_params = NULL;
-	Object *repl_obj;
+	Object *repl_obj = NULL;
 	int return_value = SLAPI_PLUGIN_SUCCESS;
 	int return_value = SLAPI_PLUGIN_SUCCESS;
 	Replica *r;
 	Replica *r;
 	Slapi_Backend *be;
 	Slapi_Backend *be;
@@ -1063,17 +1065,17 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	{
 	{
 		return return_value;
 		return return_value;
 	}
 	}
+	/* we only log changes for operations applied to a replica */
+	repl_obj = replica_get_replica_for_op (pb);
+	if (repl_obj == NULL)
+		return return_value;
 
 
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
 	if (rc) { /* op failed - just return */
 	if (rc) { /* op failed - just return */
-		return return_value;
+		cancel_opcsn(pb);
+		goto common_return;
 	}
 	}
 
 
-	/* we only log changes for operations applied to a replica */
-	repl_obj = replica_get_replica_for_op (pb);
-	if (repl_obj == NULL)
-		return return_value;
- 
 	r = (Replica*)object_get_data (repl_obj);
 	r = (Replica*)object_get_data (repl_obj);
 	PR_ASSERT (r);
 	PR_ASSERT (r);
 
 
@@ -1108,7 +1110,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 
 
 			slapi_pblock_get (pb, SLAPI_OPERATION_PARAMETERS, &op_params);
 			slapi_pblock_get (pb, SLAPI_OPERATION_PARAMETERS, &op_params);
 			if (NULL == op_params) {
 			if (NULL == op_params) {
-				return return_value;
+				goto common_return;
 			}
 			}
 
 
 			/* need to set uniqueid operation parameter */
 			/* need to set uniqueid operation parameter */
@@ -1127,19 +1129,18 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 				slapi_pblock_get (pb, SLAPI_ENTRY_PRE_OP, &e);
 				slapi_pblock_get (pb, SLAPI_ENTRY_PRE_OP, &e);
 			}
 			}
 			if (NULL == e) {
 			if (NULL == e) {
-				return return_value;
+				goto common_return;
 			}
 			}
 			uniqueid = slapi_entry_get_uniqueid (e);
 			uniqueid = slapi_entry_get_uniqueid (e);
 			if (NULL == uniqueid) {
 			if (NULL == uniqueid) {
-				return return_value;
+				goto common_return;
 			}
 			}
 			op_params->target_address.uniqueid = slapi_ch_strdup (uniqueid);
 			op_params->target_address.uniqueid = slapi_ch_strdup (uniqueid);
 		} 
 		} 
 
 
 		if( op_params->csn && is_cleaned_rid(csn_get_replicaid(op_params->csn))){
 		if( op_params->csn && is_cleaned_rid(csn_get_replicaid(op_params->csn))){
 			/* this RID has been cleaned */
 			/* this RID has been cleaned */
-			object_release (repl_obj);
-			return return_value;
+			goto common_return;
 		}
 		}
 
 
 		/* we might have stripped all the mods - in that case we do not
 		/* we might have stripped all the mods - in that case we do not
@@ -1152,7 +1153,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 			{
 			{
 				slapi_log_err(SLAPI_LOG_CRIT, repl_plugin_name,
 				slapi_log_err(SLAPI_LOG_CRIT, repl_plugin_name,
 								"write_changelog_and_ruv - Skipped due to DISKFULL\n");
 								"write_changelog_and_ruv - Skipped due to DISKFULL\n");
-				return return_value;
+				goto common_return;
 			}
 			}
 			slapi_pblock_get(pb, SLAPI_TXN, &txn);
 			slapi_pblock_get(pb, SLAPI_TXN, &txn);
 			rc = cl5WriteOperationTxn(repl_name, repl_gen, op_params, 
 			rc = cl5WriteOperationTxn(repl_name, repl_gen, op_params, 
@@ -1188,7 +1189,6 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	*/
 	*/
 	if (0 == return_value) {
 	if (0 == return_value) {
 		char csn_str[CSN_STRSIZE] = {'\0'};
 		char csn_str[CSN_STRSIZE] = {'\0'};
-		CSN *opcsn;
 		int rc;
 		int rc;
 		const char *dn = op_params ? REPL_GET_DN(&op_params->target_address) : "unknown";
 		const char *dn = op_params ? REPL_GET_DN(&op_params->target_address) : "unknown";
 		Slapi_DN *sdn = op_params ? (&op_params->target_address)->sdn : NULL;
 		Slapi_DN *sdn = op_params ? (&op_params->target_address)->sdn : NULL;
@@ -1220,7 +1220,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 		}
 		}
 	}
 	}
 
 
-	object_release (repl_obj);
+common_return:
+	opcsn = operation_get_csn(op);
+	prim_csn = get_thread_primary_csn();
+	if (csn_is_equal(opcsn, prim_csn)) {
+		set_thread_primary_csn(NULL);
+	}
+	if (repl_obj) {
+		object_release (repl_obj);
+	}
 	return return_value;
 	return return_value;
 }
 }
 
 
@@ -1417,7 +1425,7 @@ cancel_opcsn (Slapi_PBlock *pb)
 
 
             ruv_obj = replica_get_ruv (r);
             ruv_obj = replica_get_ruv (r);
             PR_ASSERT (ruv_obj);
             PR_ASSERT (ruv_obj);
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn);
+            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
             object_release (ruv_obj);
             object_release (ruv_obj);
         }
         }
 
 

+ 3 - 3
ldap/servers/plugins/replication/repl5_replica.c

@@ -895,7 +895,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
 					}
 					}
 				}
 				}
 				/* Update max csn for local and remote replicas */
 				/* Update max csn for local and remote replicas */
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, rid == r->repl_rid);
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
 				if (RUV_COVERS_CSN == rc)
 				if (RUV_COVERS_CSN == rc)
 				{
 				{
 					slapi_log_err(SLAPI_LOG_REPL,
 					slapi_log_err(SLAPI_LOG_REPL,
@@ -3618,7 +3618,7 @@ assign_csn_callback(const CSN *csn, void *data)
 	
 	
     if (NULL != r->min_csn_pl)
     if (NULL != r->min_csn_pl)
     {
     {
-        if (csnplInsert(r->min_csn_pl, csn) != 0)
+        if (csnplInsert(r->min_csn_pl, csn, NULL) != 0)
         {
         {
             char csn_str[CSN_STRSIZE]; /* For logging only */
             char csn_str[CSN_STRSIZE]; /* For logging only */
             /* Ack, we can't keep track of min csn. Punt. */
             /* Ack, we can't keep track of min csn. Punt. */
@@ -3666,7 +3666,7 @@ abort_csn_callback(const CSN *csn, void *data)
         }
         }
     }
     }
 
 
-    ruv_cancel_csn_inprogress (ruv, csn);
+    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
     replica_unlock(r->repl_lock);
     replica_unlock(r->repl_lock);
 
 
     object_release (ruv_obj);
     object_release (ruv_obj);

+ 55 - 19
ldap/servers/plugins/replication/repl5_ruv.c

@@ -77,6 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
 static const char * const prefix_replicageneration = "{replicageneration}";
 static const char * const prefix_replicageneration = "{replicageneration}";
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
 
 
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
 
 
 /* API implementation */
 /* API implementation */
 
 
@@ -1604,6 +1605,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
     char csn_str[CSN_STRSIZE];
     char csn_str[CSN_STRSIZE];
     int rc = RUV_SUCCESS;
     int rc = RUV_SUCCESS;
     int rid = csn_get_replicaid (csn);
     int rid = csn_get_replicaid (csn);
+    CSN *prim_csn;
 
 
     PR_ASSERT (ruv && csn);
     PR_ASSERT (ruv && csn);
 
 
@@ -1641,8 +1643,12 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
         rc = RUV_COVERS_CSN;
         rc = RUV_COVERS_CSN;
         goto done;
         goto done;
     }
     }
-
-    rc = csnplInsert (replica->csnpl, csn);
+    prim_csn = get_thread_primary_csn();
+    if (prim_csn == NULL) {
+        set_thread_primary_csn(csn);
+        prim_csn = get_thread_primary_csn();
+    }
+    rc = csnplInsert (replica->csnpl, csn, prim_csn);
     if (rc == 1)    /* we already seen this csn */
     if (rc == 1)    /* we already seen this csn */
     {
     {
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
@@ -1650,6 +1656,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
                             "The csn %s has already be seen - ignoring\n",
                             "The csn %s has already be seen - ignoring\n",
                             csn_as_string (csn, PR_FALSE, csn_str));
                             csn_as_string (csn, PR_FALSE, csn_str));
         }
         }
+        set_thread_primary_csn(NULL);
         rc = RUV_COVERS_CSN;    
         rc = RUV_COVERS_CSN;    
     }
     }
     else if(rc != 0)
     else if(rc != 0)
@@ -1674,24 +1681,36 @@ done:
     return rc;
     return rc;
 }
 }
 
 
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn)
+int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
 {
 {
     RUVElement* replica;
     RUVElement* replica;
     int rc = RUV_SUCCESS;
     int rc = RUV_SUCCESS;
+    CSN *prim_csn = NULL;
+
 
 
     PR_ASSERT (ruv && csn);
     PR_ASSERT (ruv && csn);
 
 
+    prim_csn = get_thread_primary_csn();
     /* locate ruvElement */
     /* locate ruvElement */
     slapi_rwlock_wrlock (ruv->lock);
     slapi_rwlock_wrlock (ruv->lock);
     replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
     replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
-    if (replica == NULL)
-    {
+    if (replica == NULL) {
         /* ONREPL - log error */
         /* ONREPL - log error */
-        rc = RUV_NOTFOUND;
-        goto done;
-    } 
-
-    rc = csnplRemove (replica->csnpl, csn);
+	rc = RUV_NOTFOUND;
+	goto done;
+    }
+    if (csn_is_equal(csn, prim_csn)) {
+	/* the prim csn is cancelled, lets remove all dependent csns */
+	ReplicaId prim_rid = csn_get_replicaid (csn);
+	replica = ruvGetReplica (ruv, prim_rid);
+	rc = csnplRemoveAll (replica->csnpl, prim_csn);
+	if (prim_rid != local_rid) {
+		replica = ruvGetReplica (ruv, local_rid);
+		rc = csnplRemoveAll (replica->csnpl, prim_csn);
+	}
+    } else {
+	rc = csnplRemove (replica->csnpl, csn);
+    }
     if (rc != 0)
     if (rc != 0)
         rc = RUV_NOTFOUND;
         rc = RUV_NOTFOUND;
     else
     else
@@ -1702,19 +1721,37 @@ done:
     return rc;
     return rc;
 }
 }
 
 
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool isLocal)
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
+{
+    int rc=RUV_SUCCESS;
+    RUVElement *replica;
+    ReplicaId prim_rid;
+
+    CSN *prim_csn = get_thread_primary_csn();
+
+    if (! csn_is_equal(csn, prim_csn)) {
+	/* not a primary csn, nothing to do */
+	return rc;
+    }
+    slapi_rwlock_wrlock (ruv->lock);
+    prim_rid = csn_get_replicaid (csn);
+    replica = ruvGetReplica (ruv, local_rid);
+    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
+    if ( rc || local_rid == prim_rid) goto done;
+    replica = ruvGetReplica (ruv, prim_rid);
+    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
+done:
+    slapi_rwlock_unlock (ruv->lock);
+    return rc;
+}
+static int
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
 {
 {
     int rc=RUV_SUCCESS;
     int rc=RUV_SUCCESS;
     char csn_str[CSN_STRSIZE];
     char csn_str[CSN_STRSIZE];
     CSN *max_csn;
     CSN *max_csn;
     CSN *first_csn = NULL;
     CSN *first_csn = NULL;
-    RUVElement *replica;
     
     
-    PR_ASSERT (ruv && csn);
-
-    slapi_rwlock_wrlock (ruv->lock);
-
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
     if (replica == NULL)
     if (replica == NULL)
     {
     {
         /* we should have a ruv element at this point because it would have
         /* we should have a ruv element at this point because it would have
@@ -1724,7 +1761,7 @@ int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool i
         goto done;
         goto done;
     } 
     } 
 
 
-	if (csnplCommit(replica->csnpl, csn) != 0)
+	if (csnplCommitAll(replica->csnpl, csn) != 0)
 	{
 	{
 		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
 		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
 			            csn_as_string(csn, PR_FALSE, csn_str));
 			            csn_as_string(csn, PR_FALSE, csn_str));
@@ -1765,7 +1802,6 @@ int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool i
 	}
 	}
 
 
 done:
 done:
-    slapi_rwlock_unlock (ruv->lock);
 
 
     return rc;
     return rc;
 }
 }

+ 2 - 2
ldap/servers/plugins/replication/repl5_ruv.h

@@ -109,8 +109,8 @@ PRInt32 ruv_replica_count (const RUV *ruv);
 char **ruv_get_referrals(const RUV *ruv);
 char **ruv_get_referrals(const RUV *ruv);
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
 int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
 int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn);
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool isLocal);
+int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);

+ 15 - 0
ldap/servers/slapd/csn.c

@@ -268,6 +268,21 @@ csn_as_attr_option_string(CSNType t,const CSN *csn,char *ss)
 	return s;
 	return s;
 }
 }
 
 
+int
+csn_is_equal(const CSN *csn1, const CSN *csn2)
+{
+	int retval = 0;
+	if ((csn1 == NULL && csn2 == NULL) ||
+		(csn1 && csn2 &&
+		 csn1->tstamp == csn2->tstamp &&
+		 csn1->seqnum == csn2->seqnum &&
+		 csn1->rid == csn2->rid &&
+		 csn1->subseqnum == csn2->subseqnum)) {
+		retval = 1;
+	}
+	return retval;
+}
+
 int 
 int 
 csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags)
 csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags)
 {
 {

+ 2 - 0
ldap/servers/slapd/slapi-private.h

@@ -174,6 +174,7 @@ time_t csn_get_time(const CSN *csn);
 PRUint16 csn_get_seqnum(const CSN *csn);
 PRUint16 csn_get_seqnum(const CSN *csn);
 PRUint16 csn_get_subseqnum(const CSN *csn);
 PRUint16 csn_get_subseqnum(const CSN *csn);
 char *csn_as_string(const CSN *csn, PRBool replicaIdOrder, char *ss); /* WARNING: ss must be CSN_STRSIZE bytes, or NULL. */
 char *csn_as_string(const CSN *csn, PRBool replicaIdOrder, char *ss); /* WARNING: ss must be CSN_STRSIZE bytes, or NULL. */
+int csn_is_equal(const CSN *csn1, const CSN *csn2);
 int csn_compare(const CSN *csn1, const CSN *csn2);
 int csn_compare(const CSN *csn1, const CSN *csn2);
 int csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags);
 int csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags);
 #define CSN_COMPARE_SKIP_SUBSEQ 0x1
 #define CSN_COMPARE_SKIP_SUBSEQ 0x1
@@ -189,6 +190,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
    a csn from the set.*/
    a csn from the set.*/
 int csn_increment_subsequence (CSN *csn);
 int csn_increment_subsequence (CSN *csn);
 
 
+void csnplFreeCSN (void *arg);
 /*
 /*
  * csnset.c
  * csnset.c
  */
  */