Răsfoiți Sursa

Make all backend operations transaction aware

All backend functions will use the parent transaction (if any) to perform
all database read/write operations, and for the creation of nested
transactions.  The pseudocode looks like this:
begin operation
grab parent txn from pblock SLAPI_TXN and save to a local variable
use this parent txn to perform any searches or other ops before the
  nested txn is created (if created - search ops do not do this)
create the nested txn (if a write op) using the parent_txn
stash the new txn in the pblock SLAPI_TXN
call the betxnpreop plugins
perform the backend ops inside the txn
call the betxnpostop plugins
commit the nested txn
restore the saved parent_txn to the pblock SLAPI_TXN
upon failure, abort the nested txn and restore the saved parent_txn
  to the pblock SLAPI_TXN

There were a few api changes to allow the txn to be passed in wherever
it is needed.
The database lock was changed from a mutex to a monitor to allow plugins
to call other plugins inside the same parent transaction.
Reviewed by: nhosoi (Thanks!)
Rich Megginson 14 ani în urmă
părinte
comite
6120b3d50a

+ 4 - 2
ldap/servers/slapd/back-ldbm/back-ldbm.h

@@ -729,8 +729,10 @@ typedef struct ldbm_instance {
                                        * parent of the instance name dir */
     char *inst_parent_dir_name;       /* Absolute parent dir for this inst */
 
-    PRLock *inst_db_mutex;            /* Used to synchronize modify operations
-                                       * on this instance. */
+    PRMonitor *inst_db_mutex;            /* Used to synchronize write operations
+                                          * on this instance - the monitor is re-entrant
+                                          * which allows plugins in the same transaction
+                                          * to share the lock */
 
     dblayer_handle *inst_handle_head; /* These are used to maintain a list */
     dblayer_handle *inst_handle_tail; /* of open db handles for this instance */

+ 3 - 3
ldap/servers/slapd/back-ldbm/dblayer.c

@@ -639,7 +639,7 @@ int dblayer_terminate(struct ldbminfo *li)
          inst_obj = objset_next_obj(li->li_instance_set, inst_obj)) {
         inst = (ldbm_instance *)object_get_data(inst_obj);
         if (NULL != inst->inst_db_mutex) {
-            PR_DestroyLock(inst->inst_db_mutex);
+            PR_DestroyMonitor(inst->inst_db_mutex);
             inst->inst_db_mutex = NULL;
         }
         if (NULL != inst->inst_handle_list_mutex) {
@@ -3564,7 +3564,7 @@ void dblayer_lock_backend(backend *be)
     PR_ASSERT(NULL != inst);
     
     if (NULL != inst->inst_db_mutex) {
-        PR_Lock(inst->inst_db_mutex);
+        PR_EnterMonitor(inst->inst_db_mutex);
     }
 }
 
@@ -3577,7 +3577,7 @@ void dblayer_unlock_backend(backend *be)
     PR_ASSERT(NULL != inst);
     
     if (NULL != inst->inst_db_mutex) {
-        PR_Unlock(inst->inst_db_mutex);
+        PR_ExitMonitor(inst->inst_db_mutex);
     }
 }
 

+ 29 - 14
ldap/servers/slapd/back-ldbm/filterindex.c

@@ -71,7 +71,8 @@ keys2idl(
     const char  *indextype,
     Slapi_Value **ivals,
     int         *err,
-    int         *unindexed
+    int         *unindexed,
+    back_txn    *txn
 );
 
 IDList *
@@ -100,11 +101,13 @@ filter_candidates(
     }
 
     if (li->li_use_vlv) {
+        back_txn      txn = {NULL};
         /* first, check to see if this particular filter node matches any
          * vlv indexes we're keeping.  if so, we can use that index
          * instead.
          */
-        result = vlv_find_index_by_filter(be, base, f);
+        slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
+        result = vlv_find_index_by_filter_txn(be, base, f, &txn);
         if (result) {
             LDAPDebug( LDAP_DEBUG_TRACE, "<= filter_candidates %lu (vlv)\n",
                     (u_long)IDL_NIDS(result), 0, 0 );
@@ -197,6 +200,7 @@ ava_candidates(
     IDList        *idl = NULL;
     int           unindexed = 0;
     Slapi_Attr    sattr;
+    back_txn      txn = {NULL};
 
     LDAPDebug( LDAP_DEBUG_TRACE, "=> ava_candidates\n", 0, 0, 0 );
 
@@ -270,6 +274,7 @@ ava_candidates(
      * reset ivals[0]->bv.bv_val) or alloc an entirely new ivals array.
      */
 
+    slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
     if(ftype==LDAP_FILTER_EQUALITY) {
         Slapi_Value tmp, *ptr[2], fake;
         char buf[1024];
@@ -283,7 +288,7 @@ ava_candidates(
         ivals=ptr;
 
         slapi_attr_assertion2keys_ava_sv( &sattr, &tmp, (Slapi_Value ***)&ivals, LDAP_FILTER_EQUALITY_FAST);
-        idl = keys2idl( be, type, indextype, ivals, err, &unindexed );
+        idl = keys2idl( be, type, indextype, ivals, err, &unindexed, &txn );
         if ( unindexed ) {
             unsigned int opnote = SLAPI_OP_NOTE_UNINDEXED;
             slapi_pblock_set( pb, SLAPI_OPERATION_NOTES, &opnote );
@@ -315,7 +320,7 @@ ava_candidates(
             idl = idl_allids( be );
             goto done;
         }
-        idl = keys2idl( be, type, indextype, ivals, err, &unindexed );
+        idl = keys2idl( be, type, indextype, ivals, err, &unindexed, &txn );
         if ( unindexed ) {
             unsigned int opnote = SLAPI_OP_NOTE_UNINDEXED;
             slapi_pblock_set( pb, SLAPI_OPERATION_NOTES, &opnote );
@@ -341,6 +346,7 @@ presence_candidates(
     char    *type;
     IDList  *idl;
     int     unindexed = 0;
+    back_txn      txn = {NULL};
 
     LDAPDebug( LDAP_DEBUG_TRACE, "=> presence_candidates\n", 0, 0, 0 );
 
@@ -349,8 +355,9 @@ presence_candidates(
             0, 0, 0 );
         return( NULL );
     }
+    slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
     idl = index_read_ext( be, type, indextype_PRESENCE,
-                          NULL, NULL, err, &unindexed );
+                          NULL, &txn, err, &unindexed );
 
     if ( unindexed ) {
         unsigned int opnote = SLAPI_OP_NOTE_UNINDEXED;
@@ -366,7 +373,7 @@ presence_candidates(
         idl_free(idl);
         idl = index_range_read(pb, be, type, indextype_EQUALITY,
                                    SLAPI_OP_GREATER_OR_EQUAL,
-                                   NULL, NULL, 0, NULL, err);
+                                   NULL, NULL, 0, &txn, err);
     }
 
     LDAPDebug( LDAP_DEBUG_TRACE, "<= presence_candidates %lu\n",
@@ -386,7 +393,9 @@ extensible_candidates(
     Slapi_PBlock* pb = slapi_pblock_new();
     int mrOP = 0;
     Slapi_Operation *op = NULL;
+    back_txn txn = {NULL};
     LDAPDebug (LDAP_DEBUG_TRACE, "=> extensible_candidates\n", 0, 0, 0);
+    slapi_pblock_get(glob_pb, SLAPI_TXN, &txn.back_txn_txn);
     if ( ! slapi_mr_filter_index (f, pb) &&    !slapi_pblock_get (pb, SLAPI_PLUGIN_MR_QUERY_OPERATOR, &mrOP))
     {
         switch (mrOP)
@@ -453,10 +462,10 @@ extensible_candidates(
                         {
                             int unindexed = 0;
                             IDList* idl3 = (mrOP == SLAPI_OP_EQUAL) ?
-                                index_read_ext(be, mrTYPE, mrOID, *key, NULL,
+                                index_read_ext(be, mrTYPE, mrOID, *key, &txn,
                                                           err, &unindexed) :
                                 index_range_read (pb, be, mrTYPE, mrOID, mrOP,
-                                                  *key, NULL, 0, NULL, err);
+                                                  *key, NULL, 0, &txn, err);
                             if ( unindexed ) {
                                 unsigned int opnote = SLAPI_OP_NOTE_UNINDEXED;
                                 slapi_pblock_set( glob_pb,
@@ -534,9 +543,12 @@ range_candidates(
     IDList *idl = NULL;
     struct berval *low = NULL, *high = NULL;
     struct berval **lows = NULL, **highs = NULL;
+    back_txn txn = {NULL};
 
     LDAPDebug(LDAP_DEBUG_TRACE, "=> range_candidates attr=%s\n", type, 0, 0);
 
+    slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
+
     if (low_val != NULL) {
         slapi_attr_assertion2keys_ava(sattr, low_val, &lows, LDAP_FILTER_EQUALITY);
         if (lows == NULL || *lows == NULL) {
@@ -562,15 +574,15 @@ range_candidates(
     if (low == NULL) {
         idl = index_range_read(pb, be, type, (char*)indextype_EQUALITY,
                                SLAPI_OP_LESS_OR_EQUAL,
-                               high, NULL, 0, NULL, err);
+                               high, NULL, 0, &txn, err);
     } else if (high == NULL) {
         idl = index_range_read(pb, be, type, (char*)indextype_EQUALITY,
                                SLAPI_OP_GREATER_OR_EQUAL,
-                               low, NULL, 0, NULL, err);
+                               low, NULL, 0, &txn, err);
     } else {
         idl = index_range_read(pb, be, type, (char*)indextype_EQUALITY,
                                SLAPI_OP_GREATER_OR_EQUAL,
-                               low, high, 1, NULL, err);
+                               low, high, 1, &txn, err);
     }
 
 done:
@@ -850,6 +862,7 @@ substring_candidates(
     int          unindexed = 0;
     unsigned int opnote = SLAPI_OP_NOTE_UNINDEXED;
     Slapi_Attr   sattr;
+    back_txn     txn = {NULL};
 
     LDAPDebug( LDAP_DEBUG_TRACE, "=> sub_candidates\n", 0, 0, 0 );
 
@@ -878,7 +891,8 @@ substring_candidates(
      * look up each key in the index, ANDing the resulting
      * IDLists together.
      */
-    idl = keys2idl( be, type, indextype_SUB, ivals, err, &unindexed );
+    slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
+    idl = keys2idl( be, type, indextype_SUB, ivals, err, &unindexed, &txn );
     if ( unindexed ) {
         slapi_pblock_set( pb, SLAPI_OPERATION_NOTES, &opnote );
         pagedresults_set_unindexed( pb->pb_conn );
@@ -897,7 +911,8 @@ keys2idl(
     const char  *indextype,
     Slapi_Value **ivals,
     int         *err,
-    int         *unindexed
+    int         *unindexed,
+    back_txn    *txn
 )
 {
     IDList    *idl;
@@ -909,7 +924,7 @@ keys2idl(
     for ( i = 0; ivals[i] != NULL; i++ ) {
         IDList    *idl2;
 
-        idl2 = index_read_ext( be, type, indextype, slapi_value_get_berval(ivals[i]), NULL, err, unindexed );
+        idl2 = index_read_ext( be, type, indextype, slapi_value_get_berval(ivals[i]), txn, err, unindexed );
 
 #ifdef LDAP_DEBUG
         /* XXX if ( slapd_ldap_debug & LDAP_DEBUG_TRACE ) { XXX */

+ 3 - 3
ldap/servers/slapd/back-ldbm/instance.c

@@ -95,9 +95,9 @@ int ldbm_instance_create(backend *be, char *name)
     }
 
     /* Lock used to synchronize modify operations. */
-    inst->inst_db_mutex = PR_NewLock();
+    inst->inst_db_mutex = PR_NewMonitor();
     if (NULL == inst->inst_db_mutex) {
-        LDAPDebug(LDAP_DEBUG_ANY, "ldbm_instance_create: PR_NewLock failed\n",
+        LDAPDebug(LDAP_DEBUG_ANY, "ldbm_instance_create: PR_NewMonitor failed\n",
                   0, 0, 0);
         rc = -1;
         goto error;
@@ -385,7 +385,7 @@ ldbm_instance_destructor(void **arg)
     PR_DestroyLock(inst->inst_config_mutex);
     slapi_ch_free_string(&inst->inst_dir_name);
     slapi_ch_free_string(&inst->inst_parent_dir_name);
-    PR_DestroyLock(inst->inst_db_mutex);
+    PR_DestroyMonitor(inst->inst_db_mutex);
     PR_DestroyLock(inst->inst_handle_list_mutex);
     PR_DestroyLock(inst->inst_nextid_mutex);
     PR_DestroyCondVar(inst->inst_indexer_cv);

+ 17 - 5
ldap/servers/slapd/back-ldbm/ldbm_add.c

@@ -121,7 +121,7 @@ ldbm_back_add( Slapi_PBlock *pb )
 	slapi_pblock_get( pb, SLAPI_ADD_ENTRY, &e );
 	slapi_pblock_get( pb, SLAPI_REQUESTOR_ISROOT, &isroot );
 	slapi_pblock_get( pb, SLAPI_MANAGEDSAIT, &managedsait );
-	slapi_pblock_get( pb, SLAPI_PARENT_TXN, (void**)&parent_txn );
+	slapi_pblock_get( pb, SLAPI_TXN, (void**)&parent_txn );
 	slapi_pblock_get( pb, SLAPI_OPERATION, &operation );
 	slapi_pblock_get( pb, SLAPI_IS_REPLICATED_OPERATION, &is_replicated_operation );
 	slapi_pblock_get( pb, SLAPI_BACKEND, &be);
@@ -142,6 +142,9 @@ ldbm_back_add( Slapi_PBlock *pb )
 	slapi_entry_delete_values( e, numsubordinates, NULL );
 
 	dblayer_txn_init(li,&txn);
+	/* the calls to get_copy_of_entry require the parent txn if any
+	   so set txn to the parent_txn until we begin the child transaction */
+	txn.back_txn_txn = parent_txn;
 
 	/* The dblock serializes writes to the database,
 	 * which reduces deadlocking in the db code,
@@ -354,7 +357,7 @@ ldbm_back_add( Slapi_PBlock *pb )
 		 */
 		addr.dn = addr.udn = NULL;
 		addr.uniqueid = (char *)slapi_entry_get_uniqueid(e); /* jcm - cast away const */
-		tombstoneentry = find_entry2modify( pb, be, &addr, NULL );
+		tombstoneentry = find_entry2modify( pb, be, &addr, &txn );
 		if ( tombstoneentry==NULL )
 		{
 			ldap_result_code= -1;
@@ -647,10 +650,13 @@ ldbm_back_add( Slapi_PBlock *pb )
  	 * So, we believe that no code up till here actually added anything
 	 * to persistent store. From now on, we're transacted
 	 */
-	
+	txn.back_txn_txn = NULL; /* ready to create the child transaction */
 	for (retry_count = 0; retry_count < RETRY_TIMES; retry_count++) {
 		if (retry_count > 0) {
 			dblayer_txn_abort(li,&txn);
+			/* txn is no longer valid - reset slapi_txn to the parent */
+			txn.back_txn_txn = NULL;
+			slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
 			/* We're re-trying */
 			LDAPDebug( LDAP_DEBUG_TRACE, "Add Retrying Transaction\n", 0, 0, 0 );
 #ifndef LDBM_NO_BACKOFF_DELAY
@@ -672,8 +678,8 @@ ldbm_back_add( Slapi_PBlock *pb )
 			goto error_return; 
 		}
 
-		/* stash the transaction */
-		slapi_pblock_set(pb, SLAPI_TXN, (void *)txn.back_txn_txn);
+		/* stash the transaction for plugins */
+		slapi_pblock_set(pb, SLAPI_TXN, txn.back_txn_txn);
 
 		/* call the transaction pre add plugins just after creating the transaction */
 		if ((retval = plugin_call_plugins(pb, SLAPI_PLUGIN_BE_TXN_PRE_ADD_FN))) {
@@ -904,6 +910,9 @@ ldbm_back_add( Slapi_PBlock *pb )
 	}
 
 	retval = dblayer_txn_commit(li,&txn);
+	/* after commit - txn is no longer valid - replace SLAPI_TXN with parent */
+	txn.back_txn_txn = NULL;
+	slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
 	if (0 != retval)
 	{
 		ADD_SET_ERROR(ldap_result_code, LDAP_OPERATIONS_ERROR, retry_count);
@@ -950,6 +959,9 @@ diskfull_return:
 		/* It is safer not to abort when the transaction is not started. */
 		if (retry_count > 0) {
 			dblayer_txn_abort(li,&txn); /* abort crashes in case disk full */
+			/* txn is no longer valid - reset the txn pointer to the parent */
+			txn.back_txn_txn = NULL;
+			slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
 		}
 		rc= SLAPI_FAIL_GENERAL;
 	}

+ 3 - 1
ldap/servers/slapd/back-ldbm/ldbm_bind.c

@@ -210,6 +210,7 @@ ldbm_back_bind( Slapi_PBlock *pb )
 	Slapi_Attr		*attr;
 	Slapi_Value **bvals;
 	entry_address *addr;
+	back_txn txn = {NULL};
 
 	/* get parameters */
 	slapi_pblock_get( pb, SLAPI_BACKEND, &be );
@@ -217,6 +218,7 @@ ldbm_back_bind( Slapi_PBlock *pb )
 	slapi_pblock_get( pb, SLAPI_TARGET_ADDRESS, &addr );
 	slapi_pblock_get( pb, SLAPI_BIND_METHOD, &method );
 	slapi_pblock_get( pb, SLAPI_BIND_CREDENTIALS, &cred );
+	slapi_pblock_get( pb, SLAPI_TXN, &txn.back_txn_txn );
 	
 	inst = (ldbm_instance *) be->be_instance_info;
 
@@ -229,7 +231,7 @@ ldbm_back_bind( Slapi_PBlock *pb )
 	 * find the target entry.  find_entry() takes care of referrals
 	 *   and sending errors if the entry does not exist.
 	 */
-	if (( e = find_entry( pb, be, addr, NULL /* no txn */ )) == NULL ) {
+	if (( e = find_entry( pb, be, addr, &txn )) == NULL ) {
 		return( SLAPI_BIND_FAIL );
 	}
 

+ 3 - 1
ldap/servers/slapd/back-ldbm/ldbm_compare.c

@@ -59,6 +59,7 @@ ldbm_back_compare( Slapi_PBlock *pb )
 	int result;
 	int ret = 0;
 	Slapi_DN *namespace_dn;
+	back_txn txn = {NULL};
 
 
 	slapi_pblock_get( pb, SLAPI_BACKEND, &be );
@@ -66,12 +67,13 @@ ldbm_back_compare( Slapi_PBlock *pb )
 	slapi_pblock_get( pb, SLAPI_TARGET_ADDRESS, &addr);
 	slapi_pblock_get( pb, SLAPI_COMPARE_TYPE, &type );
 	slapi_pblock_get( pb, SLAPI_COMPARE_VALUE, &bval );
+	slapi_pblock_get( pb, SLAPI_TXN, &txn.back_txn_txn );
 	
 	inst = (ldbm_instance *) be->be_instance_info;
 	/* get the namespace dn */
 	namespace_dn = (Slapi_DN*)slapi_be_getsuffix(be, 0);
 
-	if ( (e = find_entry( pb, be, addr, NULL )) == NULL ) {
+	if ( (e = find_entry( pb, be, addr, &txn )) == NULL ) {
 		return( -1 );	/* error result sent by find_entry() */
 	}
 

+ 5 - 2
ldap/servers/slapd/back-ldbm/ldbm_delete.c

@@ -99,7 +99,7 @@ ldbm_back_delete( Slapi_PBlock *pb )
 	slapi_pblock_get( pb, SLAPI_PLUGIN_PRIVATE, &li );
 	slapi_pblock_get( pb, SLAPI_DELETE_TARGET, &dn );
 	slapi_pblock_get( pb, SLAPI_TARGET_ADDRESS, &addr);
-	slapi_pblock_get( pb, SLAPI_PARENT_TXN, (void**)&parent_txn );
+	slapi_pblock_get( pb, SLAPI_TXN, (void**)&parent_txn );
 	slapi_pblock_get( pb, SLAPI_OPERATION, &operation );
 	slapi_pblock_get( pb, SLAPI_IS_REPLICATED_OPERATION, &is_replicated_operation );
 	
@@ -108,6 +108,9 @@ ldbm_back_delete( Slapi_PBlock *pb )
 
 	/* dblayer_txn_init needs to be called before "goto error_return" */
 	dblayer_txn_init(li,&txn);
+	/* the calls to perform searches require the parent txn if any
+	   so set txn to the parent_txn until we begin the child transaction */
+	txn.back_txn_txn = parent_txn;
 
 	if (pb->pb_conn)
 	{
@@ -165,7 +168,7 @@ ldbm_back_delete( Slapi_PBlock *pb )
 	}
 
 	/* find and lock the entry we are about to modify */
-	if ( (e = find_entry2modify( pb, be, addr, NULL )) == NULL )
+	if ( (e = find_entry2modify( pb, be, addr, &txn )) == NULL )
 	{
 		ldap_result_code= LDAP_NO_SUCH_OBJECT; 
 		/* retval is -1 */

+ 17 - 4
ldap/servers/slapd/back-ldbm/ldbm_modify.c

@@ -223,7 +223,7 @@ ldbm_back_modify( Slapi_PBlock *pb )
 	slapi_pblock_get( pb, SLAPI_PLUGIN_PRIVATE, &li );
 	slapi_pblock_get( pb, SLAPI_TARGET_ADDRESS, &addr );
 	slapi_pblock_get( pb, SLAPI_MODIFY_MODS, &mods );
-	slapi_pblock_get( pb, SLAPI_PARENT_TXN, (void**)&parent_txn );
+	slapi_pblock_get( pb, SLAPI_TXN, (void**)&parent_txn );
 
 	slapi_pblock_get( pb, SLAPI_OPERATION, &operation );
 	if (NULL == operation)
@@ -237,6 +237,9 @@ ldbm_back_modify( Slapi_PBlock *pb )
 	inst = (ldbm_instance *) be->be_instance_info;
 
 	dblayer_txn_init(li,&txn);
+	/* the calls to search for entries require the parent txn if any
+	   so set txn to the parent_txn until we begin the child transaction */
+	txn.back_txn_txn = parent_txn;
 	if (NULL == addr)
 	{
 		goto error_return;
@@ -263,7 +266,7 @@ ldbm_back_modify( Slapi_PBlock *pb )
 	}
 
 	/* find and lock the entry we are about to modify */
-	if ( (e = find_entry2modify( pb, be, addr, NULL )) == NULL ) {
+	if ( (e = find_entry2modify( pb, be, addr, &txn )) == NULL ) {
 		ldap_result_code= -1;
 		goto error_return;	  /* error result sent by find_entry2modify() */
 	}
@@ -399,10 +402,14 @@ ldbm_back_modify( Slapi_PBlock *pb )
 		}
 	}
 
+	txn.back_txn_txn = NULL; /* ready to create the child transaction */
 	for (retry_count = 0; retry_count < RETRY_TIMES; retry_count++) {
 
 		if (retry_count > 0) {
 			dblayer_txn_abort(li,&txn);
+			/* txn is no longer valid - reset slapi_txn to the parent */
+			txn.back_txn_txn = NULL;
+			slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
 			LDAPDebug( LDAP_DEBUG_TRACE, "Modify Retrying Transaction\n", 0, 0, 0 );
 #ifndef LDBM_NO_BACKOFF_DELAY
 			{
@@ -422,8 +429,8 @@ ldbm_back_modify( Slapi_PBlock *pb )
 			goto error_return;
 		}
 
-		/* stash the transaction */
-		slapi_pblock_set(pb, SLAPI_TXN, (void *)txn.back_txn_txn);
+		/* stash the transaction for plugins */
+		slapi_pblock_set(pb, SLAPI_TXN, txn.back_txn_txn);
 
 		/* call the transaction pre modify plugins just after creating the transaction */
 		if ((retval = plugin_call_plugins(pb, SLAPI_PLUGIN_BE_TXN_PRE_MODIFY_FN))) {
@@ -557,6 +564,9 @@ ldbm_back_modify( Slapi_PBlock *pb )
 	}
 
 	retval = dblayer_txn_commit(li,&txn);
+	/* after commit - txn is no longer valid - replace SLAPI_TXN with parent */
+	txn.back_txn_txn = NULL;
+	slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
 	if (0 != retval) {
 		if (LDBM_OS_ERR_IS_DISKFULL(retval)) disk_full = 1;
 		ldap_result_code= LDAP_OPERATIONS_ERROR;
@@ -599,6 +609,9 @@ error_return:
 		if (retry_count > 0) {
 			/* It is safer not to abort when the transaction is not started. */
 			dblayer_txn_abort(li,&txn); /* abort crashes in case disk full */
+			/* txn is no longer valid - reset the txn pointer to the parent */
+			txn.back_txn_txn = NULL;
+			slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
 		}
 	    rc= SLAPI_FAIL_GENERAL;
 	}

+ 25 - 12
ldap/servers/slapd/back-ldbm/ldbm_modrdn.c

@@ -120,7 +120,7 @@ ldbm_back_modrdn( Slapi_PBlock *pb )
     slapi_pblock_get( pb, SLAPI_MODRDN_TARGET, &dn );
     slapi_pblock_get( pb, SLAPI_BACKEND, &be);
     slapi_pblock_get( pb, SLAPI_PLUGIN_PRIVATE, &li );
-    slapi_pblock_get( pb, SLAPI_PARENT_TXN, (void**)&parent_txn );
+    slapi_pblock_get( pb, SLAPI_TXN, (void**)&parent_txn );
     slapi_pblock_get( pb, SLAPI_REQUESTOR_ISROOT, &isroot );
     slapi_pblock_get( pb, SLAPI_OPERATION, &operation );
     slapi_pblock_get( pb, SLAPI_IS_REPLICATED_OPERATION, &is_replicated_operation );
@@ -129,6 +129,9 @@ ldbm_back_modrdn( Slapi_PBlock *pb )
 
     /* dblayer_txn_init needs to be called before "goto error_return" */
     dblayer_txn_init(li,&txn);
+    /* the calls to search for entries require the parent txn if any
+       so set txn to the parent_txn until we begin the child transaction */
+    txn.back_txn_txn = parent_txn;
 
     if (pb->pb_conn)
     {
@@ -313,7 +316,7 @@ ldbm_back_modrdn( Slapi_PBlock *pb )
     /* find and lock the entry we are about to modify */
     /* JCMREPL - Argh, what happens about the stinking referrals? */
     slapi_pblock_get (pb, SLAPI_TARGET_ADDRESS, &old_addr);
-    e = find_entry2modify( pb, be, old_addr, NULL );
+    e = find_entry2modify( pb, be, old_addr, &txn );
     if ( e == NULL )
     {
         ldap_result_code= -1;
@@ -336,14 +339,14 @@ ldbm_back_modrdn( Slapi_PBlock *pb )
     /* Fetch and lock the parent of the entry that is moving */
     oldparent_addr.dn = (char*)slapi_sdn_get_dn (&dn_parentdn);
     oldparent_addr.uniqueid = NULL;            
-    parententry = find_entry2modify_only( pb, be, &oldparent_addr, NULL );
+    parententry = find_entry2modify_only( pb, be, &oldparent_addr, &txn );
     modify_init(&parent_modify_context,parententry);
 
     /* Fetch and lock the new parent of the entry that is moving */            
     if(slapi_sdn_get_ndn(&dn_newsuperiordn)!=NULL)
     {
         slapi_pblock_get (pb, SLAPI_MODRDN_NEWSUPERIOR_ADDRESS, &newsuperior_addr);
-        newparententry = find_entry2modify_only( pb, be, newsuperior_addr, NULL);
+        newparententry = find_entry2modify_only( pb, be, newsuperior_addr, &txn );
         modify_init(&newparent_modify_context,newparententry);
     }
 
@@ -680,11 +683,15 @@ ldbm_back_modrdn( Slapi_PBlock *pb )
      * So, we believe that no code up till here actually added anything
      * to persistent store. From now on, we're transacted
      */
+	txn.back_txn_txn = NULL; /* ready to create the child transaction */
     for (retry_count = 0; retry_count < RETRY_TIMES; retry_count++)
     {
         if (retry_count > 0)
         {
             dblayer_txn_abort(li,&txn);
+            /* txn is no longer valid - reset slapi_txn to the parent */
+            txn.back_txn_txn = NULL;
+            slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
             /* We're re-trying */
             LDAPDebug( LDAP_DEBUG_TRACE, "Modrdn Retrying Transaction\n", 0, 0, 0 );
         }
@@ -909,15 +916,18 @@ ldbm_back_modrdn( Slapi_PBlock *pb )
         modify_switch_entries( &newparent_modify_context,be);
     }
 
-	/* call the transaction post modrdn plugins just before the commit */
-	if ((retval = plugin_call_plugins(pb, SLAPI_PLUGIN_BE_TXN_POST_MODRDN_FN))) {
-		LDAPDebug1Arg( LDAP_DEBUG_ANY, "SLAPI_PLUGIN_BE_TXN_POST_MODRDN_FN plugin "
-					   "returned error code %d\n", retval );
-		slapi_pblock_get(pb, SLAPI_RESULT_CODE, &ldap_result_code);
-		goto error_return;
-	}
+    /* call the transaction post modrdn plugins just before the commit */
+    if ((retval = plugin_call_plugins(pb, SLAPI_PLUGIN_BE_TXN_POST_MODRDN_FN))) {
+        LDAPDebug1Arg( LDAP_DEBUG_ANY, "SLAPI_PLUGIN_BE_TXN_POST_MODRDN_FN plugin "
+                       "returned error code %d\n", retval );
+        slapi_pblock_get(pb, SLAPI_RESULT_CODE, &ldap_result_code);
+        goto error_return;
+    }
 
     retval = dblayer_txn_commit(li,&txn);
+    /* after commit - txn is no longer valid - replace SLAPI_TXN with parent */
+    txn.back_txn_txn = NULL;
+    slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
     if (0 != retval)
     {
         if (LDBM_OS_ERR_IS_DISKFULL(retval)) disk_full = 1;
@@ -1047,6 +1057,9 @@ error_return:
         /* It is safer not to abort when the transaction is not started. */
         if (retry_count > 0) {
             dblayer_txn_abort(li,&txn); /* abort crashes in case disk full */
+            /* txn is no longer valid - reset the txn pointer to the parent */
+            txn.back_txn_txn = NULL;
+            slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
         }
         retval= SLAPI_FAIL_GENERAL;
     }
@@ -1737,7 +1750,7 @@ moddn_get_children(back_txn *ptxn,
             if ( id!=NOID )
             {
                 int err= 0;
-                e = id2entry( be, id, NULL, &err );
+                e = id2entry( be, id, ptxn, &err );
                 if (e!=NULL)
                 {
                     /* The subtree search will have included the parent 

+ 18 - 10
ldap/servers/slapd/back-ldbm/ldbm_search.c

@@ -203,6 +203,7 @@ ldbm_back_search( Slapi_PBlock *pb )
     int lookup_returned_allids = 0;
     int backend_count = 1;
     static int print_once = 1;
+    back_txn txn = {NULL};
 
     slapi_pblock_get( pb, SLAPI_BACKEND, &be );
     slapi_pblock_get( pb, SLAPI_OPERATION, &operation);
@@ -212,7 +213,8 @@ ldbm_back_search( Slapi_PBlock *pb )
     slapi_pblock_get( pb, SLAPI_SEARCH_SCOPE, &scope );
     slapi_pblock_get( pb, SLAPI_REQCONTROLS, &controls );
     slapi_pblock_get( pb, SLAPI_BACKEND_COUNT, &backend_count );
-    
+    slapi_pblock_get( pb, SLAPI_TXN, &txn.back_txn_txn );
+
     inst = (ldbm_instance *) be->be_instance_info;
 
     slapi_sdn_init_dn_ndn_byref(&basesdn,base);  /* normalized by front end*/
@@ -401,7 +403,7 @@ ldbm_back_search( Slapi_PBlock *pb )
     }
     else
     {
-        if ( ( e = find_entry( pb, be, addr, NULL )) == NULL )
+        if ( ( e = find_entry( pb, be, addr, &txn )) == NULL )
         {
             /* error or referral sent by find_entry */
             return ldbm_back_search_cleanup(pb, li, sort_control, 
@@ -431,9 +433,9 @@ ldbm_back_search( Slapi_PBlock *pb )
         if ((NULL != controls) && (sort) && (vlv)) {
             /* This candidate list is for vlv, no need for sort only. */
             switch (vlv_search_build_candidate_list(pb, &basesdn, &vlv_rc,
-                                          sort_control,
-                                          (vlv ? &vlv_request_control : NULL),
-                                          &candidates, &vlv_response_control)) {
+                                                    sort_control,
+                                                    (vlv ? &vlv_request_control : NULL),
+                                                    &candidates, &vlv_response_control)) {
             case VLV_ACCESS_DENIED:
                 return ldbm_back_search_cleanup(pb, li, sort_control,
                                                 vlv_rc, "VLV Control",
@@ -638,9 +640,11 @@ ldbm_back_search( Slapi_PBlock *pb )
                 if (NULL != candidates && candidates->b_nids>0)
                 {
                     IDList *idl= NULL;
+                    back_txn txn = {NULL};
+                    slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
                     vlv_response_control.result =
-                        vlv_trim_candidates(be, candidates, sort_control,
-                        &vlv_request_control, &idl, &vlv_response_control);
+                        vlv_trim_candidates_txn(be, candidates, sort_control,
+                        &vlv_request_control, &idl, &vlv_response_control, &txn);
                     if(vlv_response_control.result==0)
                     {
                         idl_free(candidates);
@@ -983,18 +987,20 @@ subtree_candidates(
      */
     if(candidates!=NULL && (idl_length(candidates)>FILTER_TEST_THRESHOLD)) {
         IDList *tmp = candidates, *descendants = NULL;
+        back_txn txn = {NULL};
 
+        slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
         if (entryrdn_get_noancestorid()) {
             /* subtree-rename: on && no ancestorid */
             *err = entryrdn_get_subordinates(be,
                                          slapi_entry_get_sdn_const(e->ep_entry),
-                                         e->ep_id, &descendants, NULL);
+                                         e->ep_id, &descendants, &txn);
             idl_insert(&descendants, e->ep_id);
             candidates = idl_intersection(be, candidates, descendants);
             idl_free(tmp);
             idl_free(descendants);
         } else if (!has_tombstone_filter) {
-            *err = ldbm_ancestorid_read(be, NULL, e->ep_id, &descendants);
+            *err = ldbm_ancestorid_read(be, &txn, e->ep_id, &descendants);
             idl_insert(&descendants, e->ep_id);
             candidates = idl_intersection(be, candidates, descendants);
             idl_free(tmp);
@@ -1164,6 +1170,7 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
     char                   *target_uniqueid;
     int                    rc = 0; 
     int                    estimate = 0; /* estimated search result count */
+    back_txn               txn = {NULL};
 
     slapi_pblock_get( pb, SLAPI_BACKEND, &be );
     slapi_pblock_get( pb, SLAPI_PLUGIN_PRIVATE, &li );
@@ -1179,6 +1186,7 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
     slapi_pblock_get( pb, SLAPI_SEARCH_REFERRALS, &urls );
     slapi_pblock_get( pb, SLAPI_TARGET_UNIQUEID, &target_uniqueid );
     slapi_pblock_get( pb, SLAPI_SEARCH_RESULT_SET, &sr );
+    slapi_pblock_get( pb, SLAPI_TXN, &txn.back_txn_txn );
 
     if (NULL == sr) {
         goto bail;
@@ -1297,7 +1305,7 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
         ++sr->sr_lookthroughcount;    /* checked above */
 
         /* get the entry */
-        e = id2entry( be, id, NULL, &err );
+        e = id2entry( be, id, &txn, &err );
         if ( e == NULL )
         {
             if ( err != 0 && err != DB_NOTFOUND )

+ 4 - 2
ldap/servers/slapd/back-ldbm/misc.c

@@ -405,9 +405,11 @@ ldbm_txn_ruv_modify_context( Slapi_PBlock *pb, modify_context *mc )
     entry_address bentry_addr;
     IFP fn = NULL;
     int rc = 0;
+    back_txn txn = {NULL};
 
     slapi_pblock_get(pb, SLAPI_TXN_RUV_MODS_FN, (void *)&fn);
-
+    slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
+    
     if (NULL == fn) {
         return (0);
     }
@@ -428,7 +430,7 @@ ldbm_txn_ruv_modify_context( Slapi_PBlock *pb, modify_context *mc )
 
     /* Note: if we find the bentry, it will stay locked until someone calls
      * modify_term on the mc we'll be associating the bentry with */
-    bentry = find_entry2modify_only( pb, be, &bentry_addr, NULL );
+    bentry = find_entry2modify_only( pb, be, &bentry_addr, &txn );
 
     if (NULL == bentry) {
 	/* Uh oh, we couldn't find and lock the RUV entry! */

+ 3 - 0
ldap/servers/slapd/back-ldbm/proto-back-ldbm.h

@@ -527,6 +527,7 @@ int vlv_search_build_candidate_list(Slapi_PBlock *pb, const Slapi_DN *base, int
 int vlv_update_index(struct vlvIndex* p, back_txn *txn, struct ldbminfo *li, Slapi_PBlock *pb, struct backentry* oldEntry, struct backentry* newEntry);
 int vlv_update_all_indexes(back_txn *txn, backend *be, Slapi_PBlock *pb, struct backentry* oldEntry, struct backentry* newEntry);
 int vlv_filter_candidates(backend *be, Slapi_PBlock *pb, const IDList *candidates, const Slapi_DN *base, int scope, Slapi_Filter *filter, IDList** filteredCandidates,int lookthrough_limit, time_t time_up);
+int vlv_trim_candidates_txn(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, IDList** filteredCandidates,struct vlv_response *pResponse, back_txn *txn);
 int vlv_trim_candidates(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, IDList** filteredCandidates,struct vlv_response *pResponse);
 int vlv_parse_request_control(backend *be, struct berval *vlv_spec_ber, struct vlv_request* vlvp);
 int vlv_make_response_control(Slapi_PBlock *pb, const struct vlv_response* vlvp);
@@ -535,6 +536,8 @@ void vlv_print_access_log(Slapi_PBlock *pb,struct vlv_request* vlvi, struct vlv_
 void vlv_grok_new_import_entry(const struct backentry *e, backend *be);
 IDList *vlv_find_index_by_filter(struct backend *be, const char *base, 
                  Slapi_Filter *f);
+IDList *vlv_find_index_by_filter_txn(struct backend *be, const char *base, 
+                 Slapi_Filter *f, back_txn *txn);
 int vlv_delete_search_entry(Slapi_PBlock *pb, Slapi_Entry* e, ldbm_instance *inst); 
 void vlv_acquire_lock(backend *be);
 void vlv_release_lock(backend *be);

+ 4 - 2
ldap/servers/slapd/back-ldbm/sort.c

@@ -625,9 +625,11 @@ static int compare_entries_sv(ID *id_a, ID *id_b, sort_spec *s,baggage_carrier *
 	backend *be = bc->be;
 	ldbm_instance *inst = (ldbm_instance *) be->be_instance_info;
 	int err;
+    back_txn txn = {NULL};
 
+    slapi_pblock_get(bc->pb, SLAPI_TXN, &txn.back_txn_txn);
 	*error = 1;
-	a = id2entry(be,*id_a,NULL,&err);
+	a = id2entry(be,*id_a,&txn,&err);
 	if (NULL == a) {
 		if (0 != err ) {
 			LDAPDebug(LDAP_DEBUG_TRACE,"compare_entries db err %d\n",err,0,0);
@@ -636,7 +638,7 @@ static int compare_entries_sv(ID *id_a, ID *id_b, sort_spec *s,baggage_carrier *
 		/* Best to log error and set some flag */
 		return 0;
 	}
-	b = id2entry(be,*id_b,NULL,&err);
+	b = id2entry(be,*id_b,&txn,&err);
 	if (NULL == b) {
 		if (0 != err ) {
 			LDAPDebug(LDAP_DEBUG_TRACE,"compare_entries db err %d\n",err,0,0);

+ 39 - 16
ldap/servers/slapd/back-ldbm/vlv.c

@@ -58,8 +58,8 @@
 #include "vlv_key.h"
 
 static PRUint32 vlv_trim_candidates_byindex(PRUint32 length, const struct vlv_request *vlv_request_control);
-static PRUint32 vlv_trim_candidates_byvalue(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control);
-static int vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_request *vlv_request_control, IDList** candidates, struct vlv_response *vlv_response_control, int is_srchlist_locked);
+static PRUint32 vlv_trim_candidates_byvalue(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, back_txn *txn);
+static int vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_request *vlv_request_control, IDList** candidates, struct vlv_response *vlv_response_control, int is_srchlist_locked, back_txn *txn);
 
 /* New mutex for vlv locking
 Slapi_RWLock * vlvSearchList_lock=NULL;
@@ -1150,7 +1150,9 @@ vlv_search_build_candidate_list(Slapi_PBlock *pb, const Slapi_DN *base, int *vlv
 	backend *be;
 	int scope, rc=LDAP_SUCCESS;
 	char *fstr;
+    back_txn txn = {NULL};
 
+    slapi_pblock_get( pb, SLAPI_TXN, &txn.back_txn_txn );
 	slapi_pblock_get( pb, SLAPI_BACKEND, &be );
     slapi_pblock_get( pb, SLAPI_SEARCH_SCOPE, &scope );
     slapi_pblock_get( pb, SLAPI_SEARCH_STRFILTER, &fstr );
@@ -1164,7 +1166,8 @@ vlv_search_build_candidate_list(Slapi_PBlock *pb, const Slapi_DN *base, int *vlv
 	} else if((*vlv_rc=vlvIndex_accessallowed(pi, pb)) != LDAP_SUCCESS) {
 	    slapi_rwlock_unlock(be->vlvSearchList_lock);
 		rc = VLV_ACCESS_DENIED;
-	} else if ((*vlv_rc=vlv_build_candidate_list(be,pi,vlv_request_control,candidates,vlv_response_control, 1)) != LDAP_SUCCESS) {
+	} else if ((*vlv_rc=vlv_build_candidate_list(be,pi,vlv_request_control,candidates,
+                                                 vlv_response_control, 1, &txn)) != LDAP_SUCCESS) {
 		rc = VLV_BLD_LIST_FAILED;
 		vlv_response_control->result=*vlv_rc;
 	}
@@ -1187,7 +1190,7 @@ vlv_search_build_candidate_list(Slapi_PBlock *pb, const Slapi_DN *base, int *vlv
  
 
 static int
-vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_request *vlv_request_control, IDList** candidates, struct vlv_response *vlv_response_control, int is_srchlist_locked)
+vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_request *vlv_request_control, IDList** candidates, struct vlv_response *vlv_response_control, int is_srchlist_locked, back_txn *txn)
 {
     int return_value = LDAP_SUCCESS;
     DB *db = NULL;
@@ -1196,6 +1199,7 @@ vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_requ
     PRUint32 si = 0;       /* The Selected Index */
     PRUint32 length;
     int do_trim= 1;
+    DB_TXN *db_txn = NULL;
 
     LDAPDebug(LDAP_DEBUG_TRACE,
               "=> vlv_build_candidate_list: %s %s Using VLV Index %s\n",
@@ -1226,7 +1230,10 @@ vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_requ
     if (is_srchlist_locked) {
         slapi_rwlock_unlock(be->vlvSearchList_lock);
     }
-    err = db->cursor(db, 0 /* txn */, &dbc, 0);
+    if (txn) {
+        db_txn = txn->back_txn_txn;
+    }
+    err = db->cursor(db, db_txn, &dbc, 0);
     if (err != 0) {
         /* shouldn't happen */
         LDAPDebug(LDAP_DEBUG_ANY, "VLV: couldn't get cursor (err %d)\n",
@@ -1241,8 +1248,7 @@ vlv_build_candidate_list( backend *be, struct vlvIndex* p, const struct vlv_requ
             si = vlv_trim_candidates_byindex(length, vlv_request_control);
             break;
         case 1: /* byValue */
-            si = vlv_build_candidate_list_byvalue(p, dbc, length,
-                                                     vlv_request_control);
+            si = vlv_build_candidate_list_byvalue(p, dbc, length, vlv_request_control);
             if (si==length) {
                 do_trim = 0;
                 /* minimum idl_alloc size should be 1; 0 is considered ALLID */
@@ -1314,8 +1320,10 @@ vlv_filter_candidates(backend *be, Slapi_PBlock *pb, const IDList *candidates, c
         int done= 0;
         int counter= 0;
         ID id = NOID;
+        back_txn txn = {NULL};
     	idl_iterator current = idl_iterator_init(candidates);
         resultIdl= idl_alloc(candidates->b_nids);
+        slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
         do
         {
         	id = idl_iterator_dereference_increment(&current, candidates);
@@ -1323,7 +1331,7 @@ vlv_filter_candidates(backend *be, Slapi_PBlock *pb, const IDList *candidates, c
         	{
                 int err= 0;
                 struct backentry *e= NULL;
-                e = id2entry( be, id, NULL, &err );
+                e = id2entry( be, id, &txn, &err );
             	if ( e == NULL )
             	{
                     /*
@@ -1393,7 +1401,7 @@ vlv_filter_candidates(backend *be, Slapi_PBlock *pb, const IDList *candidates, c
  *       other (80)
  */
 int
-vlv_trim_candidates(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, IDList** trimmedCandidates,struct vlv_response *vlv_response_control)
+vlv_trim_candidates_txn(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, IDList** trimmedCandidates,struct vlv_response *vlv_response_control, back_txn *txn)
 {
     IDList* resultIdl= NULL;
     int return_value= LDAP_SUCCESS;
@@ -1412,7 +1420,7 @@ vlv_trim_candidates(backend *be, const IDList *candidates, const sort_spec* sort
         si= vlv_trim_candidates_byindex(candidates->b_nids, vlv_request_control);
         break;
     case 1: /* byValue */
-        si= vlv_trim_candidates_byvalue(be, candidates, sort_control, vlv_request_control);
+        si= vlv_trim_candidates_byvalue(be, candidates, sort_control, vlv_request_control, txn);
         /* Don't bother sending results if the attribute value wasn't found */
         if(si==candidates->b_nids)
         {
@@ -1457,6 +1465,12 @@ vlv_trim_candidates(backend *be, const IDList *candidates, const sort_spec* sort
     return return_value;
 }
 
+int
+vlv_trim_candidates(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, IDList** trimmedCandidates,struct vlv_response *vlv_response_control)
+{
+    return vlv_trim_candidates_txn(be, candidates, sort_control, vlv_request_control, trimmedCandidates, vlv_response_control, NULL);
+}
+
 /*
  * Work out the Selected Index given the length of the candidate list
  * and the request control from the client.
@@ -1523,7 +1537,7 @@ vlv_trim_candidates_byindex(PRUint32 length, const struct vlv_request *vlv_reque
  * Iterate over the Candidate ID List looking for an entry >= the provided attribute value.
  */
 static PRUint32
-vlv_trim_candidates_byvalue(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control)
+vlv_trim_candidates_byvalue(backend *be, const IDList *candidates, const sort_spec* sort_control, const struct vlv_request *vlv_request_control, back_txn *txn)
 {
     PRUint32 si= 0; /* The Selected Index */
     PRUint32 low= 0;
@@ -1592,7 +1606,7 @@ retry:
             current = (1 + low + high)/2;
         }
         id= candidates->b_ids[current];
-        e = id2entry( be, id, NULL, &err );
+        e = id2entry( be, id, txn, &err );
         if ( e == NULL )
         {
             int rval;
@@ -1942,8 +1956,8 @@ vlv_parse_request_control( backend *be, struct berval *vlv_spec_ber,struct vlv_r
  * has the same search base and search filter.
  * added read lock */
 
-IDList *vlv_find_index_by_filter(struct backend *be, const char *base, 
-				 Slapi_Filter *f)
+IDList *vlv_find_index_by_filter_txn(struct backend *be, const char *base, 
+                                     Slapi_Filter *f, back_txn *txn)
 {
     struct vlvSearch *t = NULL;
     struct vlvIndex *vi;
@@ -1954,6 +1968,11 @@ IDList *vlv_find_index_by_filter(struct backend *be, const char *base,
     DBC *dbc = NULL;
     IDList *idl;
     Slapi_Filter *vlv_f;
+    DB_TXN *db_txn = NULL;
+
+    if (txn) {
+        db_txn = txn->back_txn_txn;
+    }
 
 	slapi_sdn_init_dn_byref(&base_sdn, base);
 	slapi_rwlock_rdlock(be->vlvSearchList_lock);
@@ -1984,7 +2003,7 @@ IDList *vlv_find_index_by_filter(struct backend *be, const char *base,
 			if (dblayer_get_index_file(be, vi->vlv_attrinfo, &db, 0) == 0) {
 				length = vlvIndex_get_indexlength(vi, db, 0 /* txn */);
 				slapi_rwlock_unlock(be->vlvSearchList_lock);
-				err = db->cursor(db, 0 /* txn */, &dbc, 0);
+				err = db->cursor(db, db_txn, &dbc, 0);
 				if (err == 0) {
 					if (length == 0) /* 609377: index size could be 0 */
 					{
@@ -2015,7 +2034,11 @@ IDList *vlv_find_index_by_filter(struct backend *be, const char *base,
     return NULL;
 }
 
-
+IDList *vlv_find_index_by_filter(struct backend *be, const char *base, 
+				 Slapi_Filter *f)
+{
+    return vlv_find_index_by_filter_txn(be, base, f, NULL);
+}
 
 /* replace c with c2 in string -- probably exists somewhere but I can't find it slapi maybe? */
 

+ 3 - 1
ldap/servers/slapd/back-ldbm/vlv_srch.c

@@ -181,15 +181,17 @@ vlvSearch_init(struct vlvSearch* p, Slapi_PBlock *pb, const Slapi_Entry *e, ldbm
     	if ( !slapi_sdn_isempty(p->vlv_base)) {
             Slapi_Backend *oldbe = NULL;
             entry_address addr;
+            back_txn txn = {NULL};
 
             /* switch context to the target backend */
             slapi_pblock_get(pb, SLAPI_BACKEND, &oldbe);
             slapi_pblock_set(pb, SLAPI_BACKEND, inst->inst_be);
             slapi_pblock_set(pb, SLAPI_PLUGIN, inst->inst_be->be_database);
+            slapi_pblock_get(pb, SLAPI_TXN, &txn.back_txn_txn);
 
             addr.dn = (char*)slapi_sdn_get_ndn (p->vlv_base);
             addr.uniqueid = NULL;
-            e = find_entry( pb, inst->inst_be, &addr, NULL );
+            e = find_entry( pb, inst->inst_be, &addr, &txn );
             /* Check to see if the entry is absent. If it is, mark this search
              * as not initialized */
             if (NULL == e) {