|
|
@@ -318,9 +318,9 @@ int dblayer_db_uses_logging(DB_ENV *db_env) {
|
|
|
return db_uses_feature(db_env, DB_INIT_LOG);
|
|
|
}
|
|
|
|
|
|
-/* this flag use if user remotely turned batching off */
|
|
|
+/* this flag is used if user remotely turned batching off */
|
|
|
+#define FLUSH_REMOTEOFF 0
|
|
|
|
|
|
-#define FLUSH_REMOTEOFF -1
|
|
|
/* routine that allows batch value to be changed remotely:
|
|
|
|
|
|
1. value = 0 turns batching off
|
|
|
@@ -337,51 +337,90 @@ dblayer_set_batch_transactions(void *arg, void *value, char *errorbuf, int phase
|
|
|
|
|
|
if (apply) {
|
|
|
if(phase == CONFIG_PHASE_STARTUP) {
|
|
|
- trans_batch_limit=val;
|
|
|
- } else if(trans_batch_limit != FLUSH_REMOTEOFF ) {
|
|
|
- if((val == 0) && (log_flush_thread)) {
|
|
|
- log_flush_thread=PR_FALSE;
|
|
|
+ trans_batch_limit = val;
|
|
|
+ } else {
|
|
|
+ if(val == 0){
|
|
|
+ if(log_flush_thread){
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
trans_batch_limit = FLUSH_REMOTEOFF;
|
|
|
- } else if(val > 0) {
|
|
|
+ if(log_flush_thread){
|
|
|
+ log_flush_thread = PR_FALSE;
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
+ } else if(val > 0) {
|
|
|
+ if(trans_batch_limit == FLUSH_REMOTEOFF){
|
|
|
+ /* this requires a server restart to take effect */
|
|
|
+ LDAPDebug(LDAP_DEBUG_ANY,"dblayer_set_batch_transactions: enabling batch transactions "
|
|
|
+ "requires a server restart.\n",0,0,0);
|
|
|
+ } else if (!log_flush_thread){
|
|
|
+ /* we are already disabled, log a reminder of that fact. */
|
|
|
+ LDAPDebug(LDAP_DEBUG_ANY,"dblayer_set_batch_transactions: batch transactions was "
|
|
|
+ "previously disabled, this update requires a server restart.\n",0,0,0);
|
|
|
+ }
|
|
|
trans_batch_limit=val;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return retval;
|
|
|
}
|
|
|
+
|
|
|
int
|
|
|
dblayer_set_batch_txn_min_sleep(void *arg, void *value, char *errorbuf, int phase, int apply) {
|
|
|
int val = (int)((uintptr_t)value);
|
|
|
int retval = LDAP_SUCCESS;
|
|
|
|
|
|
if (apply) {
|
|
|
- if(phase == CONFIG_PHASE_STARTUP) {
|
|
|
- trans_batch_txn_min_sleep=val;
|
|
|
- } else if(trans_batch_txn_min_sleep != FLUSH_REMOTEOFF ) {
|
|
|
- if((val == 0) && (log_flush_thread)) {
|
|
|
- log_flush_thread=PR_FALSE;
|
|
|
+ if(phase == CONFIG_PHASE_STARTUP || phase == CONFIG_PHASE_INITIALIZATION) {
|
|
|
+ trans_batch_txn_min_sleep = val;
|
|
|
+ } else {
|
|
|
+ if(val == 0){
|
|
|
+ if(log_flush_thread){
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
trans_batch_txn_min_sleep = FLUSH_REMOTEOFF;
|
|
|
- } else if(val > 0) {
|
|
|
- trans_batch_txn_min_sleep=val;
|
|
|
+ if(log_flush_thread){
|
|
|
+ log_flush_thread = PR_FALSE;
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
+ } else if(val > 0) {
|
|
|
+ if(trans_batch_txn_min_sleep == FLUSH_REMOTEOFF || !log_flush_thread){
|
|
|
+ /* this really has no effect until batch transactions are enabled */
|
|
|
+ LDAPDebug(LDAP_DEBUG_ANY,"dblayer_set_batch_txn_min_sleep: Warning batch transactions "
|
|
|
+ "is not enabled.\n",0,0,0);
|
|
|
+ }
|
|
|
+ trans_batch_txn_min_sleep = val;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return retval;
|
|
|
}
|
|
|
+
|
|
|
int
|
|
|
dblayer_set_batch_txn_max_sleep(void *arg, void *value, char *errorbuf, int phase, int apply) {
|
|
|
int val = (int)((uintptr_t)value);
|
|
|
int retval = LDAP_SUCCESS;
|
|
|
|
|
|
if (apply) {
|
|
|
- if(phase == CONFIG_PHASE_STARTUP) {
|
|
|
- trans_batch_txn_max_sleep=val;
|
|
|
- } else if(trans_batch_txn_max_sleep != FLUSH_REMOTEOFF ) {
|
|
|
- if((val == 0) && (log_flush_thread)) {
|
|
|
- log_flush_thread=PR_FALSE;
|
|
|
+ if(phase == CONFIG_PHASE_STARTUP || phase == CONFIG_PHASE_INITIALIZATION) {
|
|
|
+ trans_batch_txn_max_sleep = val;
|
|
|
+ } else {
|
|
|
+ if(val == 0){
|
|
|
+ if(log_flush_thread){
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
trans_batch_txn_max_sleep = FLUSH_REMOTEOFF;
|
|
|
- } else if(val > 0) {
|
|
|
- trans_batch_txn_max_sleep=val;
|
|
|
+ if(log_flush_thread){
|
|
|
+ log_flush_thread = PR_FALSE;
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ }
|
|
|
+ } else if(val > 0) {
|
|
|
+ if(trans_batch_txn_max_sleep == FLUSH_REMOTEOFF || !log_flush_thread){
|
|
|
+ /* this really has no effect until batch transactions are enabled */
|
|
|
+ LDAPDebug(LDAP_DEBUG_ANY,"dblayer_set_batch_txn_max_sleep: Warning batch transactions "
|
|
|
+ "is not enabled.\n",0,0,0);
|
|
|
+ }
|
|
|
+ trans_batch_txn_max_sleep = val;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -3663,7 +3702,7 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn, PRBool use_lock)
|
|
|
priv->dblayer_env &&
|
|
|
priv->dblayer_enable_transactions)
|
|
|
{
|
|
|
- txn_id = db_txn->id(db_txn);
|
|
|
+ txn_id = db_txn->id(db_txn);
|
|
|
return_value = TXN_COMMIT(db_txn, 0);
|
|
|
/* if we were given a transaction, and it is the same as the
|
|
|
current transaction in progress, pop it off the stack
|
|
|
@@ -3677,36 +3716,45 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn, PRBool use_lock)
|
|
|
txn->back_txn_txn = NULL;
|
|
|
}
|
|
|
if ((priv->dblayer_durable_transactions) && use_lock ) {
|
|
|
- if(trans_batch_limit > 0) {
|
|
|
- /* let log_flush thread do the flushing */
|
|
|
- PR_Lock(sync_txn_log_flush);
|
|
|
- txn_batch_slot = trans_batch_count++;
|
|
|
- txn_log_flush_pending[txn_batch_slot] = txn_id;
|
|
|
- LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (befor notify): batchcount: %d, txn_in_progress: %d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count, txn_id);
|
|
|
- /* the log flush thread will periodically flush the txn log,
|
|
|
- * but in two cases it should be notified to do it immediately:
|
|
|
- * - the batch limit is passed
|
|
|
- * - there is no other outstanding txn
|
|
|
- */
|
|
|
- if (trans_batch_count > trans_batch_limit ||
|
|
|
- trans_batch_count == txn_in_progress_count)
|
|
|
- PR_NotifyCondVar(sync_txn_log_do_flush);
|
|
|
- /* we need to wait until the txn has been flushed before continuing
|
|
|
- * and returning success to the client, nit to vialate durability
|
|
|
- * PR_WaitCondvar releases and reaquires the lock
|
|
|
- */
|
|
|
- while (txn_log_flush_pending[txn_batch_slot] == txn_id)
|
|
|
- PR_WaitCondVar(sync_txn_log_flush_done, PR_INTERVAL_NO_TIMEOUT);
|
|
|
- txn_in_progress_count--;
|
|
|
- LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (before unlock): batchcount: %d, txn_in_progress: %d, curr_txn %x\n", trans_batch_count, txn_in_progress_count, txn_id);
|
|
|
- PR_Unlock(sync_txn_log_flush);
|
|
|
+ if(trans_batch_limit > 0 && log_flush_thread) {
|
|
|
+ /* let log_flush thread do the flushing */
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ txn_batch_slot = trans_batch_count++;
|
|
|
+ txn_log_flush_pending[txn_batch_slot] = txn_id;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (before notify): batchcount: %d, "
|
|
|
+ "txn_in_progress: %d, curr_txn: %x\n", trans_batch_count,
|
|
|
+ txn_in_progress_count, txn_id);
|
|
|
+ /*
|
|
|
+ * The log flush thread will periodically flush the txn log,
|
|
|
+ * but in two cases it should be notified to do it immediately:
|
|
|
+ * - the batch limit is passed
|
|
|
+ * - there is no other outstanding txn
|
|
|
+ */
|
|
|
+ if (trans_batch_count > trans_batch_limit ||
|
|
|
+ trans_batch_count == txn_in_progress_count)
|
|
|
+ {
|
|
|
+ PR_NotifyCondVar(sync_txn_log_do_flush);
|
|
|
+ }
|
|
|
+ /*
|
|
|
+ * We need to wait until the txn has been flushed before continuing
|
|
|
+ * and returning success to the client, nit to vialate durability
|
|
|
+ * PR_WaitCondvar releases and reaquires the lock
|
|
|
+ */
|
|
|
+ while (txn_log_flush_pending[txn_batch_slot] == txn_id){
|
|
|
+ PR_WaitCondVar(sync_txn_log_flush_done, PR_INTERVAL_NO_TIMEOUT);
|
|
|
+ }
|
|
|
+ txn_in_progress_count--;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (before unlock): batchcount: %d, "
|
|
|
+ "txn_in_progress: %d, curr_txn %x\n", trans_batch_count,
|
|
|
+ txn_in_progress_count, txn_id);
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
} else if(trans_batch_limit == FLUSH_REMOTEOFF) { /* user remotely turned batching off */
|
|
|
LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
- }
|
|
|
- }
|
|
|
- if(use_lock) slapi_rwlock_unlock(priv->dblayer_env->dblayer_env_lock);
|
|
|
- } else
|
|
|
- {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(use_lock)
|
|
|
+ slapi_rwlock_unlock(priv->dblayer_env->dblayer_env_lock);
|
|
|
+ } else {
|
|
|
return_value = 0;
|
|
|
}
|
|
|
|
|
|
@@ -4466,14 +4514,14 @@ dblayer_start_log_flush_thread(dblayer_private *priv)
|
|
|
int max_threads = config_get_threadnumber();
|
|
|
|
|
|
if ((priv->dblayer_durable_transactions) &&
|
|
|
- (priv->dblayer_enable_transactions) && (trans_batch_limit > 0)) {
|
|
|
- log_flush_thread=PR_TRUE;
|
|
|
- /* initialize the synchronization objects for the log_flush and worker threads */
|
|
|
- sync_txn_log_flush = PR_NewLock();
|
|
|
- sync_txn_log_flush_done = PR_NewCondVar (sync_txn_log_flush);
|
|
|
- sync_txn_log_do_flush = PR_NewCondVar (sync_txn_log_flush);
|
|
|
- txn_log_flush_pending = (int*)slapi_ch_malloc(max_threads*sizeof(int));
|
|
|
-
|
|
|
+ (priv->dblayer_enable_transactions) && (trans_batch_limit > 0))
|
|
|
+ {
|
|
|
+ /* initialize the synchronization objects for the log_flush and worker threads */
|
|
|
+ sync_txn_log_flush = PR_NewLock();
|
|
|
+ sync_txn_log_flush_done = PR_NewCondVar (sync_txn_log_flush);
|
|
|
+ sync_txn_log_do_flush = PR_NewCondVar (sync_txn_log_flush);
|
|
|
+ txn_log_flush_pending = (int*)slapi_ch_malloc(max_threads*sizeof(int));
|
|
|
+ log_flush_thread = PR_TRUE;
|
|
|
if (NULL == PR_CreateThread (PR_USER_THREAD,
|
|
|
(VFP) (void *) log_flush_threadmain, priv,
|
|
|
PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
|
|
|
@@ -4515,49 +4563,60 @@ static int log_flush_threadmain(void *param)
|
|
|
/* LK this is only needed if online change of
|
|
|
* of txn config is supported ???
|
|
|
*/
|
|
|
- while ((!priv->dblayer_stop_threads) && (log_flush_thread))
|
|
|
- {
|
|
|
- if (priv->dblayer_enable_transactions)
|
|
|
- {
|
|
|
- if (trans_batch_limit > 0) {
|
|
|
- /* synchronize flushing thread with workers */
|
|
|
- PR_Lock(sync_txn_log_flush);
|
|
|
- LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (in loop): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
- /* if here, do flush the txn logs if any of the following conditions are met
|
|
|
- * - batch limit exceeded
|
|
|
- * - no more active transaction, no need to wait
|
|
|
- * - do_flush indicate that the max waiting interval is exceeded
|
|
|
- */
|
|
|
- if(trans_batch_count >= trans_batch_limit || trans_batch_count >= txn_in_progress_count || do_flush) {
|
|
|
- LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (working): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
- LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
- for (i=0;i<trans_batch_count;i++)
|
|
|
- txn_log_flush_pending[i] = 0;
|
|
|
- trans_batch_count = 0;
|
|
|
- last_flush = PR_IntervalNow();
|
|
|
- do_flush = 0;
|
|
|
- LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (before notify): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
- PR_NotifyAllCondVar(sync_txn_log_flush_done);
|
|
|
- }
|
|
|
- /* wait until flushing conditions are met */
|
|
|
- while ( trans_batch_count == 0 ||
|
|
|
- ( trans_batch_count < trans_batch_limit &&
|
|
|
- trans_batch_count < txn_in_progress_count)) {
|
|
|
- if (priv->dblayer_stop_threads) break;
|
|
|
- if (PR_IntervalNow() - last_flush > interval_flush) {
|
|
|
- do_flush = 1;
|
|
|
- break;
|
|
|
- }
|
|
|
- PR_WaitCondVar(sync_txn_log_do_flush, interval_wait);
|
|
|
- }
|
|
|
- PR_Unlock(sync_txn_log_flush);
|
|
|
- LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (wakeup): batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ while ((!priv->dblayer_stop_threads) && (log_flush_thread)){
|
|
|
+ if (priv->dblayer_enable_transactions){
|
|
|
+ if (trans_batch_limit > 0) {
|
|
|
+ /* synchronize flushing thread with workers */
|
|
|
+ PR_Lock(sync_txn_log_flush);
|
|
|
+ if(!log_flush_thread){
|
|
|
+ /* batch transactions was disabled while waiting for the lock */
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (in loop): batchcount: %d, "
|
|
|
+ "txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ /*
|
|
|
+ * if here, do flush the txn logs if any of the following conditions are met
|
|
|
+ * - batch limit exceeded
|
|
|
+ * - no more active transaction, no need to wait
|
|
|
+ * - do_flush indicate that the max waiting interval is exceeded
|
|
|
+ */
|
|
|
+ if(trans_batch_count >= trans_batch_limit || trans_batch_count >= txn_in_progress_count || do_flush) {
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (working): batchcount: %d, "
|
|
|
+ "txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
|
|
|
+ for (i=0;i<trans_batch_count;i++){
|
|
|
+ txn_log_flush_pending[i] = 0;
|
|
|
+ }
|
|
|
+ trans_batch_count = 0;
|
|
|
+ last_flush = PR_IntervalNow();
|
|
|
+ do_flush = 0;
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (before notify): batchcount: %d, "
|
|
|
+ "txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
+ PR_NotifyAllCondVar(sync_txn_log_flush_done);
|
|
|
+ }
|
|
|
+ /* wait until flushing conditions are met */
|
|
|
+ while ( trans_batch_count == 0 ||
|
|
|
+ ( trans_batch_count < trans_batch_limit &&
|
|
|
+ trans_batch_count < txn_in_progress_count))
|
|
|
+ {
|
|
|
+ if (priv->dblayer_stop_threads)
|
|
|
+ break;
|
|
|
+ if (PR_IntervalNow() - last_flush > interval_flush) {
|
|
|
+ do_flush = 1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ PR_WaitCondVar(sync_txn_log_do_flush, interval_wait);
|
|
|
+ }
|
|
|
+ PR_Unlock(sync_txn_log_flush);
|
|
|
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (wakeup): batchcount: %d, "
|
|
|
+ "txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
|
|
|
} else {
|
|
|
- DS_Sleep(interval_def);
|
|
|
- }
|
|
|
+ DS_Sleep(interval_def);
|
|
|
+ }
|
|
|
} else {
|
|
|
- DS_Sleep(interval_def);
|
|
|
- }
|
|
|
+ DS_Sleep(interval_def);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
DECR_THREAD_COUNT(priv);
|