|
|
@@ -39,6 +39,7 @@
|
|
|
#define DEFAULT_CLC_BUFFER_COUNT_MAX 0
|
|
|
#define DEFAULT_CLC_BUFFER_PAGE_COUNT 32
|
|
|
#define DEFAULT_CLC_BUFFER_PAGE_SIZE 1024
|
|
|
+#define WORK_CLC_BUFFER_PAGE_SIZE 8*DEFAULT_CLC_BUFFER_PAGE_SIZE
|
|
|
|
|
|
enum {
|
|
|
CLC_STATE_READY = 0, /* ready to iterate */
|
|
|
@@ -56,8 +57,9 @@ struct csn_seq_ctrl_block {
|
|
|
ReplicaId rid; /* RID this block serves */
|
|
|
CSN *consumer_maxcsn; /* Don't send CSN <= this */
|
|
|
CSN *local_maxcsn; /* Don't send CSN > this */
|
|
|
- CSN *prev_local_maxcsn; /* */
|
|
|
- int state; /* CLC_STATE_* */
|
|
|
+ CSN *prev_local_maxcsn; /* Copy of last state at buffer loading */
|
|
|
+ CSN *local_mincsn; /* Used to determin anchor csn*/
|
|
|
+ int state; /* CLC_STATE_* */
|
|
|
};
|
|
|
|
|
|
/*
|
|
|
@@ -70,6 +72,8 @@ struct clc_buffer {
|
|
|
ReplicaId buf_consumer_rid; /* help checking threshold csn */
|
|
|
const RUV *buf_consumer_ruv; /* used to skip change */
|
|
|
const RUV *buf_local_ruv; /* used to refresh local_maxcsn */
|
|
|
+ int buf_ignoreConsumerRID; /* how to handle updates from consumer */
|
|
|
+ int buf_load_cnt; /* number of loads for session */
|
|
|
|
|
|
/*
|
|
|
* fields for retriving data from DB
|
|
|
@@ -90,7 +94,6 @@ struct clc_buffer {
|
|
|
int buf_max_cscbs;
|
|
|
|
|
|
/* fields for debugging stat */
|
|
|
- int buf_load_cnt; /* number of loads for session */
|
|
|
int buf_record_cnt; /* number of changes for session */
|
|
|
int buf_record_skipped; /* number of changes skipped */
|
|
|
int buf_skipped_new_rid; /* number of changes skipped due to new_rid */
|
|
|
@@ -133,7 +136,8 @@ struct clc_pool {
|
|
|
static struct clc_pool *_pool = NULL; /* process's buffer pool */
|
|
|
|
|
|
/* static prototypes */
|
|
|
-static int clcache_adjust_anchorcsn ( CLC_Buffer *buf );
|
|
|
+static int clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag );
|
|
|
+static int clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag );
|
|
|
static void clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf );
|
|
|
static int clcache_refresh_local_maxcsns ( CLC_Buffer *buf );
|
|
|
static int clcache_skip_change ( CLC_Buffer *buf );
|
|
|
@@ -251,8 +255,23 @@ clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV
|
|
|
}
|
|
|
|
|
|
if ( NULL != *buf ) {
|
|
|
+ CSN *c_csn = NULL;
|
|
|
+ CSN *l_csn = NULL;
|
|
|
(*buf)->buf_consumer_ruv = consumer_ruv;
|
|
|
(*buf)->buf_local_ruv = local_ruv;
|
|
|
+ (*buf)->buf_load_flag = DB_MULTIPLE_KEY;
|
|
|
+ ruv_get_largest_csn_for_replica (consumer_ruv, consumer_rid, &c_csn);
|
|
|
+ ruv_get_largest_csn_for_replica (local_ruv, consumer_rid, &l_csn);
|
|
|
+ if (l_csn && csn_compare(l_csn, c_csn) > 0) {
|
|
|
+ /* the supplier has updates for the consumer RID and
|
|
|
+ * these updates are newer than on the consumer
|
|
|
+ */
|
|
|
+ (*buf)->buf_ignoreConsumerRID = 0;
|
|
|
+ } else {
|
|
|
+ (*buf)->buf_ignoreConsumerRID = 1;
|
|
|
+ }
|
|
|
+ csn_free(&c_csn);
|
|
|
+ csn_free(&l_csn);
|
|
|
}
|
|
|
else {
|
|
|
slapi_log_error ( SLAPI_LOG_FATAL, get_thread_private_agmtname(),
|
|
|
@@ -305,36 +324,25 @@ clcache_return_buffer ( CLC_Buffer **buf )
|
|
|
* historic reason.
|
|
|
*/
|
|
|
int
|
|
|
-clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
|
|
|
+clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN )
|
|
|
{
|
|
|
int rc = 0;
|
|
|
+ int flag = DB_NEXT;
|
|
|
|
|
|
+ if (anchorCSN) *anchorCSN = NULL;
|
|
|
clcache_refresh_local_maxcsns ( buf );
|
|
|
|
|
|
- /* Set the loading key */
|
|
|
- if ( anchorcsn ) {
|
|
|
+ if (buf->buf_load_cnt == 0 ) {
|
|
|
clcache_refresh_consumer_maxcsns ( buf );
|
|
|
- buf->buf_load_flag = DB_MULTIPLE_KEY;
|
|
|
- csn_as_string ( anchorcsn, 0, (char*)buf->buf_key.data );
|
|
|
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
- "session start: anchorcsn=%s\n", (char*)buf->buf_key.data );
|
|
|
- }
|
|
|
- else if ( csn_get_time(buf->buf_current_csn) == 0 ) {
|
|
|
- /* time == 0 means this csn has never been set */
|
|
|
- rc = DB_NOTFOUND;
|
|
|
- }
|
|
|
- else if ( clcache_adjust_anchorcsn ( buf ) != 0 ) {
|
|
|
- rc = DB_NOTFOUND;
|
|
|
- }
|
|
|
- else {
|
|
|
- csn_as_string ( buf->buf_current_csn, 0, (char*)buf->buf_key.data );
|
|
|
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
- "load next: anchorcsn=%s\n", (char*)buf->buf_key.data );
|
|
|
+ rc = clcache_initial_anchorcsn ( buf, &flag );
|
|
|
+ } else {
|
|
|
+ rc = clcache_adjust_anchorcsn ( buf, &flag );
|
|
|
}
|
|
|
|
|
|
if ( rc == 0 ) {
|
|
|
|
|
|
buf->buf_state = CLC_STATE_READY;
|
|
|
+ if (anchorCSN) *anchorCSN = buf->buf_current_csn;
|
|
|
rc = clcache_load_buffer_bulk ( buf, flag );
|
|
|
|
|
|
/* Reset some flag variables */
|
|
|
@@ -344,21 +352,15 @@ clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
|
|
|
buf->buf_cscbs[i]->state = CLC_STATE_READY;
|
|
|
}
|
|
|
}
|
|
|
- else if ( anchorcsn ) {
|
|
|
- /* Report error only when the missing is persistent */
|
|
|
- if ( buf->buf_missing_csn && csn_compare (buf->buf_missing_csn, anchorcsn) == 0 ) {
|
|
|
- if (!buf->buf_prev_missing_csn || csn_compare (buf->buf_prev_missing_csn, anchorcsn)) {
|
|
|
- slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
|
|
|
- "Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
|
|
|
- (char*)buf->buf_key.data, rc );
|
|
|
- csn_dup_or_init_by_csn (&buf->buf_prev_missing_csn, anchorcsn);
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- csn_dup_or_init_by_csn (&buf->buf_missing_csn, anchorcsn);
|
|
|
- }
|
|
|
+ else {
|
|
|
+ slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
|
|
|
+ "Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
|
|
|
+ (char*)buf->buf_key.data, rc );
|
|
|
}
|
|
|
+ } else if (rc == CLC_STATE_DONE) {
|
|
|
+ rc = DB_NOTFOUND;
|
|
|
}
|
|
|
+
|
|
|
if ( rc != 0 ) {
|
|
|
slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
"clcache_load_buffer: rc=%d\n", rc );
|
|
|
@@ -483,7 +485,7 @@ clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **da
|
|
|
* We're done with the current buffer. Now load the next chunk.
|
|
|
*/
|
|
|
if ( NULL == *key && CLC_STATE_READY == buf->buf_state ) {
|
|
|
- rc = clcache_load_buffer ( buf, NULL, DB_NEXT );
|
|
|
+ rc = clcache_load_buffer ( buf, NULL );
|
|
|
if ( 0 == rc && buf->buf_record_ptr ) {
|
|
|
DB_MULTIPLE_KEY_NEXT ( buf->buf_record_ptr, &buf->buf_data,
|
|
|
*key, *keylen, *data, *datalen );
|
|
|
@@ -521,7 +523,6 @@ clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf )
|
|
|
int i;
|
|
|
|
|
|
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
- csn_free(&buf->buf_cscbs[i]->consumer_maxcsn);
|
|
|
ruv_get_largest_csn_for_replica (
|
|
|
buf->buf_consumer_ruv,
|
|
|
buf->buf_cscbs[i]->rid,
|
|
|
@@ -538,14 +539,11 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
|
|
|
int i;
|
|
|
|
|
|
rid = csn_get_replicaid ( rid_data->csn );
|
|
|
-
|
|
|
- /*
|
|
|
- * No need to create cscb for consumer's RID.
|
|
|
- * If RID==65535, the CSN is originated from a
|
|
|
- * legacy consumer. In this case the supplier
|
|
|
- * and the consumer may have the same RID.
|
|
|
+ /* we do not handle updates originated at the consumer if not required
|
|
|
+ * and we ignore RID which have been cleaned
|
|
|
*/
|
|
|
- if ( rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID )
|
|
|
+ if ( (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID) ||
|
|
|
+ is_cleaned_rid(rid) )
|
|
|
return rc;
|
|
|
|
|
|
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
@@ -564,9 +562,20 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
|
|
|
}
|
|
|
buf->buf_cscbs[i]->rid = rid;
|
|
|
buf->buf_num_cscbs++;
|
|
|
+ /* this is the first time we have a local change for the RID
|
|
|
+ * we need to check what the consumer knows about it.
|
|
|
+ */
|
|
|
+ ruv_get_largest_csn_for_replica (
|
|
|
+ buf->buf_consumer_ruv,
|
|
|
+ buf->buf_cscbs[i]->rid,
|
|
|
+ &buf->buf_cscbs[i]->consumer_maxcsn );
|
|
|
}
|
|
|
|
|
|
+ if (buf->buf_cscbs[i]->local_maxcsn)
|
|
|
+ csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn, buf->buf_cscbs[i]->local_maxcsn );
|
|
|
+
|
|
|
csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_maxcsn, rid_data->csn );
|
|
|
+ csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_mincsn, rid_data->min_csn );
|
|
|
|
|
|
if ( buf->buf_cscbs[i]->consumer_maxcsn &&
|
|
|
csn_compare (buf->buf_cscbs[i]->consumer_maxcsn, rid_data->csn) >= 0 ) {
|
|
|
@@ -580,88 +589,147 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
|
|
|
static int
|
|
|
clcache_refresh_local_maxcsns ( CLC_Buffer *buf )
|
|
|
{
|
|
|
- int i;
|
|
|
|
|
|
- for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
- csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn,
|
|
|
- buf->buf_cscbs[i]->local_maxcsn );
|
|
|
- }
|
|
|
return ruv_enumerate_elements ( buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf );
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Algorithm:
|
|
|
*
|
|
|
- * 1. Snapshot local RUVs;
|
|
|
- * 2. Load buffer;
|
|
|
- * 3. Send to the consumer only those CSNs that are covered
|
|
|
- * by the RUVs snapshot taken in the first step;
|
|
|
- * All CSNs that are covered by the RUVs snapshot taken in the
|
|
|
- * first step are guaranteed in consecutive order for the respected
|
|
|
- * RIDs because of the the CSN pending list control;
|
|
|
- * A CSN that is not covered by the RUVs snapshot may be out of order
|
|
|
- * since it is possible that a smaller CSN might not have committed
|
|
|
- * yet by the time the buffer was loaded.
|
|
|
- * 4. Determine anchorcsn for each RID:
|
|
|
- *
|
|
|
- * Case| Local vs. Buffer | New Local | Next
|
|
|
- * | MaxCSN MaxCSN | MaxCSN | Anchor-CSN
|
|
|
- * ----+-------------------+-----------+----------------
|
|
|
- * 1 | Cl >= Cb | * | Cb
|
|
|
- * 2 | Cl < Cb | Cl | Cb
|
|
|
- * 3 | Cl < Cb | Cl2 | Cl
|
|
|
- *
|
|
|
- * 5. Determine anchorcsn for next load:
|
|
|
+ * 1. Determine anchorcsn for each RID:
|
|
|
+ * 2. Determine anchorcsn for next load:
|
|
|
* Anchor-CSN = min { all Next-Anchor-CSN, Buffer-MaxCSN }
|
|
|
*/
|
|
|
static int
|
|
|
-clcache_adjust_anchorcsn ( CLC_Buffer *buf )
|
|
|
+clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag )
|
|
|
{
|
|
|
PRBool hasChange = PR_FALSE;
|
|
|
struct csn_seq_ctrl_block *cscb;
|
|
|
int i;
|
|
|
+ CSN *anchorcsn = NULL;
|
|
|
|
|
|
if ( buf->buf_state == CLC_STATE_READY ) {
|
|
|
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
+ CSN *rid_anchor = NULL;
|
|
|
+ int rid_flag = DB_NEXT;
|
|
|
cscb = buf->buf_cscbs[i];
|
|
|
|
|
|
- if ( cscb->state == CLC_STATE_UP_TO_DATE )
|
|
|
- continue;
|
|
|
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
+ char prevmax[CSN_STRSIZE];
|
|
|
+ char local[CSN_STRSIZE];
|
|
|
+ char curr[CSN_STRSIZE];
|
|
|
+ char conmaxcsn[CSN_STRSIZE];
|
|
|
+ csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
|
|
|
+ csn_as_string(cscb->local_maxcsn, 0, local);
|
|
|
+ csn_as_string(buf->buf_current_csn, 0, curr);
|
|
|
+ csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn" ,
|
|
|
+ "%s - (cscb %d - state %d) - csnPrevMax (%s) "
|
|
|
+ "csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
|
|
|
+ buf->buf_agmt_name, i, cscb->state, prevmax, local,
|
|
|
+ curr, conmaxcsn);
|
|
|
+ }
|
|
|
|
|
|
- /*
|
|
|
- * Case 3 unsafe ruv change: next buffer load should start
|
|
|
- * from where the maxcsn in the old ruv was. Since each
|
|
|
- * cscb has remembered the maxcsn sent to the consumer,
|
|
|
- * CSNs that may be loaded again could easily be skipped.
|
|
|
- */
|
|
|
- if ( cscb->prev_local_maxcsn &&
|
|
|
- csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) < 0 &&
|
|
|
- csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) != 0 ) {
|
|
|
+ if (cscb->consumer_maxcsn == NULL) {
|
|
|
+ /* the consumer hasn't seen changes for this RID */
|
|
|
+ rid_anchor = cscb->local_mincsn;
|
|
|
+ rid_flag = DB_SET;
|
|
|
+ } else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
|
|
|
+ rid_anchor = cscb->consumer_maxcsn;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (rid_anchor && (anchorcsn == NULL ||
|
|
|
+ ( csn_compare(rid_anchor, anchorcsn) < 0))) {
|
|
|
+ anchorcsn = rid_anchor;
|
|
|
+ *flag = rid_flag;
|
|
|
hasChange = PR_TRUE;
|
|
|
- cscb->state = CLC_STATE_READY;
|
|
|
- csn_init_by_csn ( buf->buf_current_csn, cscb->prev_local_maxcsn );
|
|
|
- csn_as_string ( cscb->prev_local_maxcsn, 0, (char*)buf->buf_key.data );
|
|
|
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
- "adjust anchor csn upon %s\n",
|
|
|
- ( cscb->state == CLC_STATE_CSN_GT_RUV ? "out of sequence csn" : "unsafe ruv change") );
|
|
|
- continue;
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * check if there are still changes to send for this RID
|
|
|
- * Assume we had compared the local maxcsn and the consumer
|
|
|
- * max csn before this function was called and hence the
|
|
|
- * cscb->state had been set accordingly.
|
|
|
- */
|
|
|
- if ( hasChange == PR_FALSE &&
|
|
|
- csn_compare (cscb->local_maxcsn, buf->buf_current_csn) > 0 ) {
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( !hasChange ) {
|
|
|
+ buf->buf_state = CLC_STATE_DONE;
|
|
|
+ } else {
|
|
|
+ csn_init_by_csn(buf->buf_current_csn, anchorcsn);
|
|
|
+ csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn",
|
|
|
+ "anchor is now: %s\n", (char *)buf->buf_key.data);
|
|
|
+ }
|
|
|
+
|
|
|
+ return buf->buf_state;
|
|
|
+}
|
|
|
+
|
|
|
+static int
|
|
|
+clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag )
|
|
|
+{
|
|
|
+ PRBool hasChange = PR_FALSE;
|
|
|
+ struct csn_seq_ctrl_block *cscb;
|
|
|
+ int i;
|
|
|
+ CSN *anchorcsn = NULL;
|
|
|
+
|
|
|
+ if ( buf->buf_state == CLC_STATE_READY ) {
|
|
|
+ for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
|
|
|
+ CSN *rid_anchor = NULL;
|
|
|
+ int rid_flag = DB_NEXT;
|
|
|
+ cscb = buf->buf_cscbs[i];
|
|
|
+
|
|
|
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
+ char prevmax[CSN_STRSIZE];
|
|
|
+ char local[CSN_STRSIZE];
|
|
|
+ char curr[CSN_STRSIZE];
|
|
|
+ char conmaxcsn[CSN_STRSIZE];
|
|
|
+ csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
|
|
|
+ csn_as_string(cscb->local_maxcsn, 0, local);
|
|
|
+ csn_as_string(buf->buf_current_csn, 0, curr);
|
|
|
+ csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn" ,
|
|
|
+ "%s - (cscb %d - state %d) - csnPrevMax (%s) "
|
|
|
+ "csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
|
|
|
+ buf->buf_agmt_name, i, cscb->state, prevmax, local,
|
|
|
+ curr, conmaxcsn);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) == 0 ||
|
|
|
+ csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) > 0 ) {
|
|
|
+ if (csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
|
|
|
+ rid_anchor = buf->buf_current_csn;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ /* prev local max csn < csnBuffer AND different from local maxcsn */
|
|
|
+ if (cscb->prev_local_maxcsn == NULL) {
|
|
|
+ if (cscb->consumer_maxcsn == NULL) {
|
|
|
+ /* the consumer hasn't seen changes for this RID */
|
|
|
+ rid_anchor = cscb->local_mincsn;
|
|
|
+ rid_flag = DB_SET;
|
|
|
+ } else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
|
|
|
+ rid_anchor = cscb->consumer_maxcsn;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ /* csnPrevMaxSup > 0 */
|
|
|
+ rid_anchor = cscb->consumer_maxcsn;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (rid_anchor && (anchorcsn == NULL ||
|
|
|
+ ( csn_compare(rid_anchor, anchorcsn) < 0))) {
|
|
|
+ anchorcsn = rid_anchor;
|
|
|
+ *flag = rid_flag;
|
|
|
hasChange = PR_TRUE;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if ( !hasChange ) {
|
|
|
buf->buf_state = CLC_STATE_DONE;
|
|
|
+ } else {
|
|
|
+ csn_init_by_csn(buf->buf_current_csn, anchorcsn);
|
|
|
+ csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn",
|
|
|
+ "anchor is now: %s\n", (char *)buf->buf_key.data);
|
|
|
}
|
|
|
|
|
|
return buf->buf_state;
|
|
|
@@ -675,7 +743,6 @@ clcache_skip_change ( CLC_Buffer *buf )
|
|
|
int skip = 1;
|
|
|
int i;
|
|
|
char buf_cur_csn_str[CSN_STRSIZE];
|
|
|
- char oth_csn_str[CSN_STRSIZE];
|
|
|
|
|
|
do {
|
|
|
|
|
|
@@ -688,25 +755,14 @@ clcache_skip_change ( CLC_Buffer *buf )
|
|
|
* legacy consumer. In this case the supplier
|
|
|
* and the consumer may have the same RID.
|
|
|
*/
|
|
|
- if (rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID){
|
|
|
- CSN *cons_maxcsn = NULL;
|
|
|
-
|
|
|
- ruv_get_max_csn(buf->buf_consumer_ruv, &cons_maxcsn);
|
|
|
- if ( csn_compare ( buf->buf_current_csn, cons_maxcsn) > 0 ) {
|
|
|
- /*
|
|
|
- * The consumer must have been "restored" and needs this newer update.
|
|
|
- */
|
|
|
- skip = 0;
|
|
|
- } else if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
+ if (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID){
|
|
|
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
|
|
|
csn_as_string(buf->buf_current_csn, 0, buf_cur_csn_str);
|
|
|
- csn_as_string(cons_maxcsn, 0, oth_csn_str);
|
|
|
slapi_log_error(SLAPI_LOG_REPL, buf->buf_agmt_name,
|
|
|
- "Skipping update because the changelog buffer current csn [%s] is "
|
|
|
- "less than or equal to the consumer max csn [%s]\n",
|
|
|
- buf_cur_csn_str, oth_csn_str);
|
|
|
+ "Skipping update because the consumer with Rid: [%d] is "
|
|
|
+ "ignored\n", rid);
|
|
|
buf->buf_skipped_csn_gt_cons_maxcsn++;
|
|
|
}
|
|
|
- csn_free(&cons_maxcsn);
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
@@ -821,6 +877,7 @@ clcache_free_cscb ( struct csn_seq_ctrl_block ** cscb )
|
|
|
csn_free ( & (*cscb)->consumer_maxcsn );
|
|
|
csn_free ( & (*cscb)->local_maxcsn );
|
|
|
csn_free ( & (*cscb)->prev_local_maxcsn );
|
|
|
+ csn_free ( & (*cscb)->local_mincsn );
|
|
|
slapi_ch_free ( (void **) cscb );
|
|
|
}
|
|
|
|
|
|
@@ -1003,6 +1060,15 @@ clcache_cursor_get ( DBC *cursor, CLC_Buffer *buf, int flag )
|
|
|
{
|
|
|
int rc;
|
|
|
|
|
|
+ if (buf->buf_data.ulen > WORK_CLC_BUFFER_PAGE_SIZE) {
|
|
|
+ /*
|
|
|
+ * The buffer size had to be increased,
|
|
|
+ * reset it to a smaller working size,
|
|
|
+ * if not sufficient it will be increased again
|
|
|
+ */
|
|
|
+ buf->buf_data.ulen = WORK_CLC_BUFFER_PAGE_SIZE;
|
|
|
+ }
|
|
|
+
|
|
|
rc = cursor->c_get ( cursor,
|
|
|
& buf->buf_key,
|
|
|
& buf->buf_data,
|