|
|
@@ -219,9 +219,16 @@ static int dblayer_start_checkpoint_thread(struct ldbminfo *li);
|
|
|
static int dblayer_start_trickle_thread(struct ldbminfo *li);
|
|
|
static int dblayer_start_perf_thread(struct ldbminfo *li);
|
|
|
static int dblayer_start_txn_test_thread(struct ldbminfo *li);
|
|
|
-static int trans_batch_count=1;
|
|
|
+static int trans_batch_count=0;
|
|
|
static int trans_batch_limit=0;
|
|
|
+static int trans_batch_txn_min_sleep = 50; /* ms */
|
|
|
+static int trans_batch_txn_max_sleep = 50;
|
|
|
static PRBool log_flush_thread=PR_FALSE;
|
|
|
+static int txn_in_progress_count = 0;
|
|
|
+static int *txn_log_flush_pending = NULL;
|
|
|
+static PRLock *sync_txn_log_flush = NULL;
|
|
|
+static PRCondVar *sync_txn_log_flush_done = NULL;
|
|
|
+static PRCondVar *sync_txn_log_do_flush = NULL;
|
|
|
static int dblayer_db_remove_ex(dblayer_private_env *env, char const path[], char const dbName[], PRBool use_lock);
|
|
|
static void dblayer_init_pvt_txn();
|
|
|
static void dblayer_push_pvt_txn(back_txn *txn);
|
|
|
@@ -342,12 +349,59 @@ dblayer_set_batch_transactions(void *arg, void *value, char *errorbuf, int phase
|
|
|
}
|
|
|
return retval;
|
|
|
}
|
|
|
+int
|
|
|
+dblayer_set_batch_txn_min_sleep(void *arg, void *value, char *errorbuf, int phase, int apply) {
|
|
|
+ int val = (int)((uintptr_t)value);
|
|
|
+ int retval = LDAP_SUCCESS;
|
|
|
+
|
|
|
+ if (apply) {
|
|
|
+ if(phase == CONFIG_PHASE_STARTUP) {
|
|
|
+ trans_batch_txn_min_sleep=val;
|
|
|
+ } else if(trans_batch_txn_min_sleep != FLUSH_REMOTEOFF ) {
|
|
|
+ if((val == 0) && (log_flush_thread)) {
|
|
|
+ log_flush_thread=PR_FALSE;
|
|
|
+ trans_batch_txn_min_sleep = FLUSH_REMOTEOFF;
|
|
|
+ } else if(val > 0) {
|
|
|
+ trans_batch_txn_min_sleep=val;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return retval;
|
|
|
+}
|
|
|
+int
|
|
|
+dblayer_set_batch_txn_max_sleep(void *arg, void *value, char *errorbuf, int phase, int apply) {
|
|
|
+ int val = (int)((uintptr_t)value);
|
|
|
+ int retval = LDAP_SUCCESS;
|
|
|
+
|
|
|
+ if (apply) {
|
|
|
+ if(phase == CONFIG_PHASE_STARTUP) {
|
|
|
+ trans_batch_txn_max_sleep=val;
|
|
|
+ } else if(trans_batch_txn_max_sleep != FLUSH_REMOTEOFF ) {
|
|
|
+ if((val == 0) && (log_flush_thread)) {
|
|
|
+ log_flush_thread=PR_FALSE;
|
|
|
+ trans_batch_txn_max_sleep = FLUSH_REMOTEOFF;
|
|
|
+ } else if(val > 0) {
|
|
|
+ trans_batch_txn_max_sleep=val;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return retval;
|
|
|
+}
|
|
|
|
|
|
void *
|
|
|
dblayer_get_batch_transactions(void *arg) {
|
|
|
return (void *)((uintptr_t)trans_batch_limit);
|
|
|
}
|
|
|
|
|
|
+void *
|
|
|
+dblayer_get_batch_txn_min_sleep(void *arg) {
|
|
|
+ return (void *)((uintptr_t)trans_batch_txn_min_sleep);
|
|
|
+}
|
|
|
+
|
|
|
+void *
|
|
|
+dblayer_get_batch_txn_max_sleep(void *arg) {
|
|
|
+ return (void *)((uintptr_t)trans_batch_txn_max_sleep);
|
|
|
+}
|
|
|
|
|
|
/*
|
|
|
Threading: dblayer isolates upper layers from threading considerations
|
|
|
@@ -3507,10 +3561,17 @@ dblayer_txn_begin_ext(struct ldbminfo *li, back_txnid parent_txn, back_txn *txn,
|
|
|
{
|
|
|
/* this txn is now our current transaction for current operations
|
|
|
and new parent for any nested transactions created */
|
|
|
- dblayer_push_pvt_txn(&new_txn);
|
|
|
- if (txn) {
|
|
|
- txn->back_txn_txn = new_txn.back_txn_txn;
|
|
|
- }
|
|
|
+ if (use_lock && log_flush_thread) {
|
|
|
+ int txn_id = new_txn.back_txn_txn->id(new_txn.back_txn_txn);
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ txn_in_progress_count++;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_begin: batchcount: %d, txn_in_progress: %d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count, txn_id);
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
+ dblayer_push_pvt_txn(&new_txn);
|
|
|
+ if (txn) {
|
|
|
+ txn->back_txn_txn = new_txn.back_txn_txn;
|
|
|
+ }
|
|
|
}
|
|
|
} else
|
|
|
{
|
|
|
@@ -3554,6 +3615,8 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn, PRBool use_lock)
|
|
|
dblayer_private *priv = NULL;
|
|
|
DB_TXN *db_txn = NULL;
|
|
|
back_txn *cur_txn = NULL;
|
|
|
+ int txn_id = 0;
|
|
|
+ int txn_batch_slot = 0;
|
|
|
|
|
|
PR_ASSERT(NULL != li);
|
|
|
|
|
|
@@ -3576,6 +3639,7 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn, PRBool use_lock)
|
|
|
priv->dblayer_env &&
|
|
|
priv->dblayer_enable_transactions)
|
|
|
{
|
|
|
+ txn_id = db_txn->id(db_txn);
|
|
|
return_value = TXN_COMMIT(db_txn, 0);
|
|
|
/* if we were given a transaction, and it is the same as the
|
|
|
current transaction in progress, pop it off the stack
|
|
|
@@ -3590,17 +3654,33 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn, PRBool use_lock)
|
|
|
}
|
|
|
if ((priv->dblayer_durable_transactions) && use_lock ) {
|
|
|
if(trans_batch_limit > 0) {
|
|
|
- if(trans_batch_count % trans_batch_limit) {
|
|
|
- trans_batch_count++;
|
|
|
- } else {
|
|
|
- LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
- trans_batch_count=1;
|
|
|
- }
|
|
|
+ /* let log_flush thread do the flushing */
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ txn_batch_slot = trans_batch_count++;
|
|
|
+ txn_log_flush_pending[txn_batch_slot] = txn_id;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (befor notify): batchcount: %d, txn_in_progress: %d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count, txn_id);
|
|
|
+ /* the log flush thread will periodically flush the txn log,
|
|
|
+ * but in two cases it should be notified to do it immediately:
|
|
|
+ * - the batch limit is passed
|
|
|
+ * - there is no other outstanding txn
|
|
|
+ */
|
|
|
+ if (trans_batch_count > trans_batch_limit ||
|
|
|
+ trans_batch_count == txn_in_progress_count)
|
|
|
+ PR_NotifyCondVar(sync_txn_log_do_flush);
|
|
|
+ /* we need to wait until the txn has been flushed before continuing
|
|
|
+ * and returning success to the client, nit to vialate durability
|
|
|
+ * PR_WaitCondvar releases and reaquires the lock
|
|
|
+ */
|
|
|
+ while (txn_log_flush_pending[txn_batch_slot] == txn_id)
|
|
|
+ PR_WaitCondVar(sync_txn_log_flush_done, PR_INTERVAL_NO_TIMEOUT);
|
|
|
+ txn_in_progress_count--;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (before unlock): batchcount: %d, txn_in_progress: %d, curr_txn %x\n", trans_batch_count, txn_in_progress_count, txn_id);
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
} else if(trans_batch_limit == FLUSH_REMOTEOFF) { /* user remotely turned batching off */
|
|
|
LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
- }
|
|
|
- }
|
|
|
- if(use_lock) slapi_rwlock_unlock(priv->dblayer_env->dblayer_env_lock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(use_lock) slapi_rwlock_unlock(priv->dblayer_env->dblayer_env_lock);
|
|
|
} else
|
|
|
{
|
|
|
return_value = 0;
|
|
|
@@ -3663,6 +3743,13 @@ int dblayer_txn_abort_ext(struct ldbminfo *li, back_txn *txn, PRBool use_lock)
|
|
|
priv->dblayer_env &&
|
|
|
priv->dblayer_enable_transactions)
|
|
|
{
|
|
|
+ int txn_id = db_txn->id(db_txn);
|
|
|
+ if (log_flush_thread) {
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ txn_in_progress_count--;
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_abort : batchcount: %d, txn_in_progress: %d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count, txn_id);
|
|
|
+ }
|
|
|
return_value = TXN_ABORT(db_txn);
|
|
|
/* if we were given a transaction, and it is the same as the
|
|
|
current transaction in progress, pop it off the stack
|
|
|
@@ -4336,10 +4423,17 @@ static int
|
|
|
dblayer_start_log_flush_thread(dblayer_private *priv)
|
|
|
{
|
|
|
int return_value = 0;
|
|
|
+ int max_threads = config_get_threadnumber();
|
|
|
|
|
|
if ((priv->dblayer_durable_transactions) &&
|
|
|
(priv->dblayer_enable_transactions) && (trans_batch_limit > 0)) {
|
|
|
log_flush_thread=PR_TRUE;
|
|
|
+ /* initialize the synchronization objects for the log_flush and worker threads */
|
|
|
+ sync_txn_log_flush = PR_NewLock();
|
|
|
+ sync_txn_log_flush_done = PR_NewCondVar (sync_txn_log_flush);
|
|
|
+ sync_txn_log_do_flush = PR_NewCondVar (sync_txn_log_flush);
|
|
|
+ txn_log_flush_pending = (int*)slapi_ch_malloc(max_threads*sizeof(int));
|
|
|
+
|
|
|
if (NULL == PR_CreateThread (PR_USER_THREAD,
|
|
|
(VFP) (void *) log_flush_threadmain, priv,
|
|
|
PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
|
|
|
@@ -4365,28 +4459,65 @@ dblayer_start_log_flush_thread(dblayer_private *priv)
|
|
|
static int log_flush_threadmain(void *param)
|
|
|
{
|
|
|
dblayer_private *priv = NULL;
|
|
|
- PRIntervalTime interval;
|
|
|
+ PRIntervalTime interval_wait, interval_flush, interval_def;
|
|
|
+ PRIntervalTime last_flush;
|
|
|
+ int i;
|
|
|
+ int do_flush = 0;
|
|
|
|
|
|
PR_ASSERT(NULL != param);
|
|
|
priv = (dblayer_private *) param;
|
|
|
|
|
|
INCR_THREAD_COUNT(priv);
|
|
|
|
|
|
- interval = PR_MillisecondsToInterval(300);
|
|
|
+ interval_flush = PR_MillisecondsToInterval(trans_batch_txn_min_sleep);
|
|
|
+ interval_wait = PR_MillisecondsToInterval(trans_batch_txn_max_sleep);
|
|
|
+ interval_def = PR_MillisecondsToInterval(300); /*used while no txn or txn batching */
|
|
|
+ /* LK this is only needed if online change of
|
|
|
+ * of txn config is supported ???
|
|
|
+ */
|
|
|
while ((!priv->dblayer_stop_threads) && (log_flush_thread))
|
|
|
{
|
|
|
if (priv->dblayer_enable_transactions)
|
|
|
- {
|
|
|
- DB_CHECKPOINT_LOCK(1, priv->dblayer_env->dblayer_env_lock);
|
|
|
- if(trans_batch_limit > 0) {
|
|
|
- if(trans_batch_count > 1) {
|
|
|
- LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
- trans_batch_count=1;
|
|
|
- }
|
|
|
- }
|
|
|
- DB_CHECKPOINT_UNLOCK(1, priv->dblayer_env->dblayer_env_lock);
|
|
|
- }
|
|
|
- DS_Sleep(interval);
|
|
|
+ {
|
|
|
+ if (trans_batch_limit > 0) {
|
|
|
+ /* synchronize flushing thread with workers */
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (in loop): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ /* if here, do flush the txn logs if any of the following conditions are met
|
|
|
+ * - batch limit exceeded
|
|
|
+ * - no more active transaction, no need to wait
|
|
|
+ * - do_flush indicate that the max waiting interval is exceeded
|
|
|
+ */
|
|
|
+ if(trans_batch_count >= trans_batch_limit || trans_batch_count == txn_in_progress_count || do_flush) {
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (working): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
+ for (i=0;i<trans_batch_count;i++)
|
|
|
+ txn_log_flush_pending[i] = 0;
|
|
|
+ trans_batch_count = 0;
|
|
|
+ last_flush = PR_IntervalNow();
|
|
|
+ do_flush = 0;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (before notify): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ PR_NotifyAllCondVar(sync_txn_log_flush_done);
|
|
|
+ }
|
|
|
+ /* wait until flushing conditions are met */
|
|
|
+ while ( trans_batch_count == 0 ||
|
|
|
+ ( trans_batch_count < trans_batch_limit &&
|
|
|
+ trans_batch_count < txn_in_progress_count)) {
|
|
|
+ if (priv->dblayer_stop_threads) break;
|
|
|
+ if (PR_IntervalNow() - last_flush > interval_flush) {
|
|
|
+ do_flush = 1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ PR_WaitCondVar(sync_txn_log_do_flush, interval_wait);
|
|
|
+ }
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (wakeup): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ } else {
|
|
|
+ DS_Sleep(interval_def);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ DS_Sleep(interval_def);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
DECR_THREAD_COUNT(priv);
|
|
|
@@ -4580,6 +4711,11 @@ static int
|
|
|
dblayer_start_trickle_thread(struct ldbminfo *li)
|
|
|
{
|
|
|
int return_value = 0;
|
|
|
+ dblayer_private *priv = (dblayer_private*)li->li_dblayer_private;
|
|
|
+
|
|
|
+ if (priv->dblayer_trickle_percentage == 0)
|
|
|
+ return return_value;
|
|
|
+
|
|
|
if (NULL == PR_CreateThread (PR_USER_THREAD,
|
|
|
(VFP) (void *) trickle_threadmain, li,
|
|
|
PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
|