|
|
@@ -77,7 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
|
|
|
static const char * const prefix_replicageneration = "{replicageneration}";
|
|
|
static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
|
|
|
|
|
|
-static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
|
|
|
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal);
|
|
|
|
|
|
/* API implementation */
|
|
|
|
|
|
@@ -1599,13 +1599,13 @@ ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile)
|
|
|
|
|
|
/* this function notifies the ruv that there are operations in progress so that
|
|
|
they can be added to the pending list for the appropriate client. */
|
|
|
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
|
|
|
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn)
|
|
|
{
|
|
|
RUVElement* replica;
|
|
|
char csn_str[CSN_STRSIZE];
|
|
|
int rc = RUV_SUCCESS;
|
|
|
int rid = csn_get_replicaid (csn);
|
|
|
- CSN *prim_csn;
|
|
|
+ CSNPL_CTX *prim_csn;
|
|
|
|
|
|
PR_ASSERT (ruv && csn);
|
|
|
|
|
|
@@ -1645,8 +1645,13 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
|
|
|
}
|
|
|
prim_csn = get_thread_primary_csn();
|
|
|
if (prim_csn == NULL) {
|
|
|
- set_thread_primary_csn(csn);
|
|
|
+ set_thread_primary_csn(csn, (Replica *)repl);
|
|
|
prim_csn = get_thread_primary_csn();
|
|
|
+ } else {
|
|
|
+ /* the prim csn data already exist, need to check if
|
|
|
+ * current replica is already present
|
|
|
+ */
|
|
|
+ add_replica_to_primcsn(prim_csn, (Replica *)repl);
|
|
|
}
|
|
|
rc = csnplInsert (replica->csnpl, csn, prim_csn);
|
|
|
if (rc == 1) /* we already seen this csn */
|
|
|
@@ -1656,7 +1661,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
|
|
|
"The csn %s has already be seen - ignoring\n",
|
|
|
csn_as_string (csn, PR_FALSE, csn_str));
|
|
|
}
|
|
|
- set_thread_primary_csn(NULL);
|
|
|
+ set_thread_primary_csn(NULL, NULL);
|
|
|
rc = RUV_COVERS_CSN;
|
|
|
}
|
|
|
else if(rc != 0)
|
|
|
@@ -1681,11 +1686,13 @@ done:
|
|
|
return rc;
|
|
|
}
|
|
|
|
|
|
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
|
|
|
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId local_rid)
|
|
|
{
|
|
|
- RUVElement* replica;
|
|
|
+ RUVElement* repl_ruv;
|
|
|
int rc = RUV_SUCCESS;
|
|
|
- CSN *prim_csn = NULL;
|
|
|
+ CSNPL_CTX *prim_csn = NULL;
|
|
|
+ Replica *repl_it;
|
|
|
+ size_t it;
|
|
|
|
|
|
|
|
|
PR_ASSERT (ruv && csn);
|
|
|
@@ -1693,29 +1700,44 @@ int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
|
|
|
prim_csn = get_thread_primary_csn();
|
|
|
/* locate ruvElement */
|
|
|
slapi_rwlock_wrlock (ruv->lock);
|
|
|
- replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
|
|
|
- if (replica == NULL) {
|
|
|
+ repl_ruv = ruvGetReplica (ruv, csn_get_replicaid (csn));
|
|
|
+ if (repl_ruv == NULL) {
|
|
|
/* ONREPL - log error */
|
|
|
rc = RUV_NOTFOUND;
|
|
|
goto done;
|
|
|
}
|
|
|
- if (csn_is_equal(csn, prim_csn)) {
|
|
|
+ if (csn_primary(repl, csn, prim_csn)) {
|
|
|
/* the prim csn is cancelled, lets remove all dependent csns */
|
|
|
+ /* for the primary replica we can have modifications for two RIDS:
|
|
|
+ * - the local RID for direct or internal operations
|
|
|
+ * - a remote RID if the primary csn is for a replciated op.
|
|
|
+ */
|
|
|
ReplicaId prim_rid = csn_get_replicaid (csn);
|
|
|
- replica = ruvGetReplica (ruv, prim_rid);
|
|
|
- rc = csnplRemoveAll (replica->csnpl, prim_csn);
|
|
|
+ repl_ruv = ruvGetReplica (ruv, local_rid);
|
|
|
+ rc = csnplRemoveAll (repl_ruv->csnpl, prim_csn);
|
|
|
if (prim_rid != local_rid) {
|
|
|
+ repl_ruv = ruvGetReplica (ruv, prim_rid);
|
|
|
+ rc = csnplRemoveAll (repl_ruv->csnpl, prim_csn);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (it=0; it<prim_csn->repl_cnt; it++) {
|
|
|
+ repl_it = prim_csn->sec_repl[it];
|
|
|
+ replica_lock_replica(repl_it);
|
|
|
+ local_rid = replica_get_rid(repl_it);
|
|
|
if( local_rid != READ_ONLY_REPLICA_ID) {
|
|
|
- replica = ruvGetReplica (ruv, local_rid);
|
|
|
- if (replica) {
|
|
|
- rc = csnplRemoveAll (replica->csnpl, prim_csn);
|
|
|
+ Object *ruv_obj = replica_get_ruv (repl_it);
|
|
|
+ RUV *ruv_it = object_get_data (ruv_obj);
|
|
|
+ repl_ruv = ruvGetReplica (ruv_it, local_rid);
|
|
|
+ if (repl_ruv) {
|
|
|
+ rc = csnplRemoveAll (repl_ruv->csnpl, prim_csn);
|
|
|
} else {
|
|
|
rc = RUV_NOTFOUND;
|
|
|
}
|
|
|
}
|
|
|
+ replica_unlock_replica(repl_it);
|
|
|
}
|
|
|
} else {
|
|
|
- rc = csnplRemove (replica->csnpl, csn);
|
|
|
+ rc = csnplRemove (repl_ruv->csnpl, csn);
|
|
|
}
|
|
|
if (rc != 0)
|
|
|
rc = RUV_NOTFOUND;
|
|
|
@@ -1727,86 +1749,100 @@ done:
|
|
|
return rc;
|
|
|
}
|
|
|
|
|
|
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
|
|
|
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid)
|
|
|
{
|
|
|
int rc=RUV_SUCCESS;
|
|
|
- RUVElement *replica;
|
|
|
+ RUVElement *repl_ruv;
|
|
|
ReplicaId prim_rid;
|
|
|
+ Replica *repl_it = NULL;
|
|
|
+ size_t it = 0;
|
|
|
|
|
|
- CSN *prim_csn = get_thread_primary_csn();
|
|
|
+ CSNPL_CTX *prim_csn = get_thread_primary_csn();
|
|
|
|
|
|
- if (! csn_is_equal(csn, prim_csn)) {
|
|
|
+ if (! csn_primary(replica, csn, prim_csn)) {
|
|
|
/* not a primary csn, nothing to do */
|
|
|
return rc;
|
|
|
}
|
|
|
- slapi_rwlock_wrlock (ruv->lock);
|
|
|
+
|
|
|
+ /* first handle primary replica
|
|
|
+ * there can be two ruv elements affected
|
|
|
+ */
|
|
|
prim_rid = csn_get_replicaid (csn);
|
|
|
- replica = ruvGetReplica (ruv, local_rid);
|
|
|
- rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
|
|
|
- if ( rc || local_rid == prim_rid) goto done;
|
|
|
- replica = ruvGetReplica (ruv, prim_rid);
|
|
|
- rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
|
|
|
-done:
|
|
|
+ slapi_rwlock_wrlock (ruv->lock);
|
|
|
+ if ( local_rid != prim_rid) {
|
|
|
+ repl_ruv = ruvGetReplica (ruv, prim_rid);
|
|
|
+ rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_FALSE);
|
|
|
+ }
|
|
|
+ repl_ruv = ruvGetReplica (ruv, local_rid);
|
|
|
+ rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_TRUE);
|
|
|
slapi_rwlock_unlock (ruv->lock);
|
|
|
+ if (rc) return rc;
|
|
|
+
|
|
|
+ /* now handle secondary replicas */
|
|
|
+ for (it=0; it<prim_csn->repl_cnt; it++) {
|
|
|
+ repl_it = prim_csn->sec_repl[it];
|
|
|
+ replica_lock_replica(repl_it);
|
|
|
+ Object *ruv_obj = replica_get_ruv (repl_it);
|
|
|
+ RUV *ruv_it = object_get_data (ruv_obj);
|
|
|
+ slapi_rwlock_wrlock (ruv_it->lock);
|
|
|
+ repl_ruv = ruvGetReplica (ruv_it, replica_get_rid(repl_it));
|
|
|
+ rc = ruv_update_ruv_element(ruv_it, repl_ruv, prim_csn, replica_purl, PR_TRUE);
|
|
|
+ slapi_rwlock_unlock (ruv_it->lock);
|
|
|
+ replica_unlock_replica(repl_it);
|
|
|
+ if (rc) break;
|
|
|
+ }
|
|
|
return rc;
|
|
|
}
|
|
|
+
|
|
|
static int
|
|
|
-ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
|
|
|
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal)
|
|
|
{
|
|
|
int rc=RUV_SUCCESS;
|
|
|
char csn_str[CSN_STRSIZE];
|
|
|
CSN *max_csn;
|
|
|
CSN *first_csn = NULL;
|
|
|
|
|
|
- if (replica == NULL)
|
|
|
- {
|
|
|
+ if (replica == NULL) {
|
|
|
/* we should have a ruv element at this point because it would have
|
|
|
been added by ruv_add_inprogress function */
|
|
|
slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
|
|
|
- "Can't locate RUV element for replica %d\n", csn_get_replicaid (csn));
|
|
|
+ "Can't locate RUV element for replica %d\n", csn_get_replicaid (prim_csn->prim_csn));
|
|
|
goto done;
|
|
|
}
|
|
|
|
|
|
- if (csnplCommitAll(replica->csnpl, csn) != 0)
|
|
|
- {
|
|
|
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
|
|
|
- csn_as_string(csn, PR_FALSE, csn_str));
|
|
|
+ if (csnplCommitAll(replica->csnpl, prim_csn) != 0) {
|
|
|
+ slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
|
|
|
+ csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
|
|
|
rc = RUV_CSNPL_ERROR;
|
|
|
goto done;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
+ } else {
|
|
|
if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
|
|
|
- "Successfully committed csn %s\n", csn_as_string(csn, PR_FALSE, csn_str));
|
|
|
+ "Successfully committed csn %s\n", csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL)
|
|
|
- {
|
|
|
-#ifdef DEBUG
|
|
|
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
|
|
|
- csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
|
|
|
-#endif
|
|
|
+ if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL) {
|
|
|
+ slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
|
|
|
+ csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
|
|
|
/* replica object sets min csn for local replica */
|
|
|
- if (!isLocal && replica->min_csn == NULL) {
|
|
|
- /* bug 559223 - it seems that, under huge stress, a server might pass
|
|
|
- * through this code when more than 1 change has already been sent and commited into
|
|
|
- * the pending lists... Therefore, as we are trying to set the min_csn ever
|
|
|
- * generated by this replica, we need to set the first_csn as the min csn in the
|
|
|
- * ruv */
|
|
|
- set_min_csn_nolock(ruv, first_csn, replica_purl);
|
|
|
- }
|
|
|
- /* only update the max_csn in the RUV if it is greater than the existing one */
|
|
|
- rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
|
|
|
- /* It is possible that first_csn points to max_csn.
|
|
|
- We need to free it once */
|
|
|
- if (max_csn != first_csn) {
|
|
|
- csn_free(&first_csn);
|
|
|
- }
|
|
|
- csn_free(&max_csn);
|
|
|
- }
|
|
|
-
|
|
|
+ if (!isLocal && replica->min_csn == NULL) {
|
|
|
+ /* bug 559223 - it seems that, under huge stress, a server might pass
|
|
|
+ * through this code when more than 1 change has already been sent and commited into
|
|
|
+ * the pending lists... Therefore, as we are trying to set the min_csn ever
|
|
|
+ * generated by this replica, we need to set the first_csn as the min csn in the
|
|
|
+ * ruv */
|
|
|
+ set_min_csn_nolock(ruv, first_csn, replica_purl);
|
|
|
+ }
|
|
|
+ /* only update the max_csn in the RUV if it is greater than the existing one */
|
|
|
+ rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
|
|
|
+ /* It is possible that first_csn points to max_csn.
|
|
|
+ We need to free it once */
|
|
|
+ if (max_csn != first_csn) {
|
|
|
+ csn_free(&first_csn);
|
|
|
+ }
|
|
|
+ csn_free(&max_csn);
|
|
|
+ }
|
|
|
done:
|
|
|
|
|
|
return rc;
|