|
|
@@ -317,7 +317,7 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB
|
|
|
static int _cl5TrimInit ();
|
|
|
static void _cl5TrimCleanup ();
|
|
|
static int _cl5TrimMain (void *param);
|
|
|
-static void _cl5DoTrimming (ReplicaId rid);
|
|
|
+static void _cl5DoTrimming ();
|
|
|
static void _cl5CompactDBs();
|
|
|
static void _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid);
|
|
|
static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
|
|
|
@@ -3447,43 +3447,37 @@ static int _cl5TrimMain (void *param)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-/* We remove an entry if it has been replayed to all consumers and
|
|
|
- and the number of entries in the changelog is larger than maxEntries
|
|
|
- or age of the entry is larger than maxAge.
|
|
|
- Also we can't purge entries which correspond to max csns in the
|
|
|
- supplier's ruv. Here is a example where we can get into trouble:
|
|
|
- The server is setup with time based trimming and no consumer's
|
|
|
- At some point all the entries are trimmed from the changelog.
|
|
|
- At a later point a consumer is added and initialized online
|
|
|
- Then a change is made on the supplier.
|
|
|
- To update the consumer, the supplier would attempt to locate
|
|
|
- the last change sent to the consumer in the changelog and will
|
|
|
- fail because the change was removed.
|
|
|
-
|
|
|
+/*
|
|
|
+ * We remove an entry if it has been replayed to all consumers and the number
|
|
|
+ * of entries in the changelog is larger than maxEntries or age of the entry
|
|
|
+ * is larger than maxAge. Also we can't purge entries which correspond to max
|
|
|
+ * csns in the supplier's ruv. Here is a example where we can get into trouble:
|
|
|
+ *
|
|
|
+ * The server is setup with time based trimming and no consumer's
|
|
|
+ * At some point all the entries are trimmed from the changelog.
|
|
|
+ * At a later point a consumer is added and initialized online.
|
|
|
+ * Then a change is made on the supplier.
|
|
|
+ * To update the consumer, the supplier would attempt to locate the last
|
|
|
+ * change sent to the consumer in the changelog and will fail because the
|
|
|
+ * change was removed.
|
|
|
*/
|
|
|
-
|
|
|
-static void _cl5DoTrimming (ReplicaId rid)
|
|
|
+static void _cl5DoTrimming ()
|
|
|
{
|
|
|
Object *obj;
|
|
|
long numToTrim;
|
|
|
|
|
|
PR_Lock (s_cl5Desc.dbTrim.lock);
|
|
|
|
|
|
- /* ONREPL We trim file by file which means that some files will be
|
|
|
- trimmed more often than other. We might have to fix that by, for
|
|
|
- example, randomizing starting point */
|
|
|
+ /*
|
|
|
+ * We are trimming all the changelogs. We trim file by file which
|
|
|
+ * means that some files will be trimmed more often than other. We
|
|
|
+ * might have to fix that by, for example, randomizing the starting
|
|
|
+ * point.
|
|
|
+ */
|
|
|
obj = objset_first_obj (s_cl5Desc.dbFiles);
|
|
|
- while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
|
|
|
+ while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
|
|
|
{
|
|
|
- if (rid){
|
|
|
- /*
|
|
|
- * We are cleaning an invalid rid, and need to strip it
|
|
|
- * from the changelog.
|
|
|
- */
|
|
|
- _cl5PurgeRID (obj, rid);
|
|
|
- } else {
|
|
|
- _cl5TrimFile (obj, &numToTrim);
|
|
|
- }
|
|
|
+ _cl5TrimFile (obj, &numToTrim);
|
|
|
obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
|
|
|
}
|
|
|
|
|
|
@@ -3495,6 +3489,43 @@ static void _cl5DoTrimming (ReplicaId rid)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+ * We are purging a changelog after a cleanAllRUV task. Find the specific
|
|
|
+ * changelog for the backend that is being cleaned, and purge all the records
|
|
|
+ * with the cleaned rid.
|
|
|
+ */
|
|
|
+static void _cl5DoPurging (Replica *replica)
|
|
|
+{
|
|
|
+ ReplicaId rid = replica_get_rid(replica);
|
|
|
+ const Slapi_DN *sdn = replica_get_root(replica);
|
|
|
+ const char *replName = replica_get_name(replica);
|
|
|
+ char *replGen = replica_get_generation(replica);
|
|
|
+ char *fileName;
|
|
|
+ Object *obj;
|
|
|
+
|
|
|
+ PR_Lock (s_cl5Desc.dbTrim.lock);
|
|
|
+ fileName = _cl5MakeFileName (replName, replGen);
|
|
|
+ obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
|
|
|
+ if (obj) {
|
|
|
+ /* We found our changelog, now purge it */
|
|
|
+ _cl5PurgeRID (obj, rid);
|
|
|
+ object_release (obj);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
+ "Purged rid (%d) from suffix (%s)\n",
|
|
|
+ rid, slapi_sdn_get_dn(sdn));
|
|
|
+ } else {
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
+ "Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
|
|
|
+ rid, fileName, slapi_sdn_get_dn(sdn));
|
|
|
+ }
|
|
|
+ PR_Unlock (s_cl5Desc.dbTrim.lock);
|
|
|
+
|
|
|
+ slapi_ch_free_string(&replGen);
|
|
|
+ slapi_ch_free_string(&fileName);
|
|
|
+
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
/* clear free page files to reduce changelog */
|
|
|
static void
|
|
|
_cl5CompactDBs()
|
|
|
@@ -4072,23 +4103,25 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
|
|
|
{
|
|
|
*numToTrim = 0;
|
|
|
|
|
|
- if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0)
|
|
|
+ if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
|
|
|
return PR_FALSE;
|
|
|
-
|
|
|
+ }
|
|
|
if (s_cl5Desc.dbTrim.maxAge == 0)
|
|
|
{
|
|
|
*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
|
|
|
return ( *numToTrim > 0 );
|
|
|
}
|
|
|
|
|
|
- if (s_cl5Desc.dbTrim.maxEntries > 0 &&
|
|
|
- (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
|
|
|
- return PR_TRUE;
|
|
|
+ if (s_cl5Desc.dbTrim.maxEntries > 0 &&
|
|
|
+ (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
|
|
|
+ return PR_TRUE;
|
|
|
+ }
|
|
|
|
|
|
- if (time)
|
|
|
+ if (time) {
|
|
|
return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
|
|
|
- else
|
|
|
- return PR_TRUE;
|
|
|
+ } else {
|
|
|
+ return PR_TRUE;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
|
|
|
@@ -4101,7 +4134,6 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
|
|
|
char *pos;
|
|
|
char *agmt_name;
|
|
|
|
|
|
-
|
|
|
PR_ASSERT (replGen && obj);
|
|
|
|
|
|
file = (CL5DBFile*)object_get_data (obj);
|
|
|
@@ -4109,13 +4141,12 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
|
|
|
|
|
|
agmt_name = get_thread_private_agmtname();
|
|
|
|
|
|
- if (purge) /* read purge vector entry */
|
|
|
- key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
|
|
|
- else /* read upper bound vector */
|
|
|
- key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
|
|
|
-
|
|
|
+ if (purge) { /* read purge vector entry */
|
|
|
+ key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
|
|
|
+ } else { /* read upper bound vector */
|
|
|
+ key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
|
|
|
+ }
|
|
|
key.size = CSN_STRSIZE;
|
|
|
-
|
|
|
data.flags = DB_DBT_MALLOC;
|
|
|
|
|
|
rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
|
|
|
@@ -4125,13 +4156,13 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
|
|
|
rc = _cl5ReadBervals (&vals, &pos, data.size);
|
|
|
slapi_ch_free (&(data.data));
|
|
|
if (rc != CL5_SUCCESS)
|
|
|
- goto done;
|
|
|
+ goto done;
|
|
|
|
|
|
- if (purge)
|
|
|
+ if (purge) {
|
|
|
rc = ruv_init_from_bervals(vals, &file->purgeRUV);
|
|
|
- else
|
|
|
+ } else {
|
|
|
rc = ruv_init_from_bervals(vals, &file->maxRUV);
|
|
|
-
|
|
|
+ }
|
|
|
if (rc != RUV_SUCCESS)
|
|
|
{
|
|
|
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
@@ -4139,7 +4170,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
|
|
|
"RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc);
|
|
|
|
|
|
rc = CL5_RUV_ERROR;
|
|
|
- goto done;
|
|
|
+ goto done;
|
|
|
}
|
|
|
|
|
|
/* delete the entry; it is re-added when file
|
|
|
@@ -4151,7 +4182,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
|
|
|
|
|
|
case DB_NOTFOUND: /* RUV is lost - need to construct */
|
|
|
rc = _cl5ConstructRUV (replGen, obj, purge);
|
|
|
- goto done;
|
|
|
+ goto done;
|
|
|
|
|
|
default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
"%s: _cl5ReadRUV: failed to get purge RUV; "
|
|
|
@@ -6946,12 +6977,14 @@ cl5CleanRUV(ReplicaId rid){
|
|
|
slapi_rwlock_unlock (s_cl5Desc.stLock);
|
|
|
}
|
|
|
|
|
|
-void trigger_cl_purging(ReplicaId rid){
|
|
|
+/*
|
|
|
+ * Create a thread to purge a changelog of cleaned RIDs
|
|
|
+ */
|
|
|
+void trigger_cl_purging(Replica *replica){
|
|
|
PRThread *trim_tid = NULL;
|
|
|
|
|
|
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
|
|
|
trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
|
|
|
- (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
|
|
|
+ (void *)replica, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
|
|
|
PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
|
|
|
if (NULL == trim_tid){
|
|
|
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
@@ -6963,19 +6996,32 @@ void trigger_cl_purging(ReplicaId rid){
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+ * Purge a changelog of entries that originated from a particular replica(rid)
|
|
|
+ */
|
|
|
void
|
|
|
trigger_cl_purging_thread(void *arg){
|
|
|
- ReplicaId rid = *(ReplicaId *)arg;
|
|
|
+ Replica *replica = (Replica *)arg;
|
|
|
|
|
|
- /* make sure we have a change log, and we aren't closing it */
|
|
|
- if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
|
|
|
+ /* Make sure we have a change log, and we aren't closing it */
|
|
|
+ if (replica == NULL ||
|
|
|
+ s_cl5Desc.dbState == CL5_STATE_CLOSED ||
|
|
|
+ s_cl5Desc.dbState == CL5_STATE_CLOSING) {
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ /* Bump the changelog thread count */
|
|
|
if (CL5_SUCCESS != _cl5AddThread()) {
|
|
|
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
|
|
|
- "trigger_cl_purging: failed to increment thread count "
|
|
|
+ "trigger_cl_purging: Abort - failed to increment thread count "
|
|
|
"NSPR error - %d\n", PR_GetError ());
|
|
|
+ return;
|
|
|
}
|
|
|
- _cl5DoTrimming(rid);
|
|
|
+
|
|
|
+ /* Purge the changelog */
|
|
|
+ _cl5DoPurging(replica);
|
|
|
_cl5RemoveThread();
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
|
|
|
+ "trigger_cl_purging: purged changelog for (%s) rid (%d)\n",
|
|
|
+ slapi_sdn_get_dn(replica_get_root(replica)), replica_get_rid(replica));
|
|
|
}
|