浏览代码

Ticket 47767 - Nested tombstones become orphaned after purge

Bug Description:  If there are nested tombstone entries, the parents will
                  always be purged first, which leaves its child entries orphaned.

Fix Description:  When doing the tombstone purge, process the candidate list in
                  reverse order, which will remove the child entries before the
                  parent entries.

https://fedorahosted.org/389/ticket/47767

Reviewed by: nhosoi(Thanks!)
Mark Reynolds 11 年之前
父节点
当前提交
ce23d5d885

+ 6 - 2
ldap/servers/plugins/replication/repl5_replica.c

@@ -2972,7 +2972,10 @@ process_reap_entry (Slapi_Entry *entry, void *cb_data)
 							"%s\n", slapi_entry_get_dn(entry));
 		}
 	}
-	(*num_entriesp)++;
+	if(!is_ruv_tombstone_entry(entry)){
+		/* Don't update the count for the database tombstone entry */
+		(*num_entriesp)++;
+	}
 
 	return 0;
 }
@@ -3057,7 +3060,8 @@ _replica_reap_tombstones(void *arg)
 		slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(replica->repl_root),
 									 LDAP_SCOPE_SUBTREE, "(objectclass=nstombstone)",
 									 attrs, 0, ctrls, NULL,
-									 repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
+									 repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
+									 OP_FLAG_REVERSE_CANDIDATE_ORDER);
 
 		cb_data.rc = 0;
 		cb_data.num_entries = 0UL;

+ 6 - 0
ldap/servers/slapd/back-ldbm/idl_common.c

@@ -509,3 +509,9 @@ ID idl_iterator_dereference_increment(idl_iterator *i, const IDList *idl)
 	return t;
 }
 
+ID idl_iterator_dereference_decrement(idl_iterator *i, const IDList *idl)
+{
+	idl_iterator_decrement(i);
+	return idl_iterator_dereference(*i,idl);
+
+}

+ 7 - 1
ldap/servers/slapd/back-ldbm/ldbm_delete.c

@@ -98,6 +98,7 @@ ldbm_back_delete( Slapi_PBlock *pb )
 	int opreturn = 0;
 	int free_delete_existing_entry = 0;
 	int not_an_error = 0;
+	int updated_num = 0;
 
 	slapi_pblock_get( pb, SLAPI_BACKEND, &be);
 	slapi_pblock_get( pb, SLAPI_PLUGIN_PRIVATE, &li );
@@ -501,7 +502,8 @@ ldbm_back_delete( Slapi_PBlock *pb )
 						retval = -1;
 						goto error_return;
 					}
-
+					/* MARK */
+					updated_num = 1;
 					/*
 					 * Replication urp_post_delete will delete the parent entry
 					 * if it is a glue entry without any more children.
@@ -1294,5 +1296,9 @@ diskfull_return:
 		slapi_log_error (SLAPI_LOG_TRACE, "ldbm_back_delete", "leave conn=%" NSPRIu64 " op=%d\n",
 				(long long unsigned int)pb->pb_conn->c_connid, operation->o_opid);
 	}
+
+	if(!updated_num && ldap_result_code != 32){
+		slapi_log_error (SLAPI_LOG_FATAL,"MARK", "Failed to update numsubordinates\n");
+	}
 	return rc;
 }

+ 45 - 2
ldap/servers/slapd/back-ldbm/ldbm_search.c

@@ -1388,6 +1388,7 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
     int                    pr_idx = -1;
     Slapi_Connection       *conn;
     Slapi_Operation        *op;
+    int                    reverse_list = 0;
 
     slapi_pblock_get( pb, SLAPI_SEARCH_TARGET_SDN, &basesdn );
     if (NULL == basesdn) {
@@ -1415,6 +1416,18 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
     slapi_pblock_get( pb, SLAPI_CONNECTION, &conn );
     slapi_pblock_get( pb, SLAPI_OPERATION, &op );
 
+    if((reverse_list = operation_is_flag_set(op, OP_FLAG_REVERSE_CANDIDATE_ORDER))){
+        /*
+         * Start at the end of the list and work our way forward.  Since a single
+         * search can enter this function multiple times, we need to keep track
+         * of our state, and only initialize sr_current once.
+         */
+        if(!op->o_reverse_search_state){
+            sr->sr_current = sr->sr_candidates->b_nids;
+            op->o_reverse_search_state = REV_STARTED;
+        }
+    }
+
     if ( !txn.back_txn_txn ) {
         dblayer_txn_init( li, &txn );
         slapi_pblock_set( pb, SLAPI_TXN, txn.back_txn_txn );
@@ -1514,8 +1527,32 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
             goto bail;
         }
             
-        /* get the entry */
-        id = idl_iterator_dereference_increment(&(sr->sr_current), sr->sr_candidates);
+        /*
+         * Get the entry ID
+         */
+        if(reverse_list){
+            /*
+             * This is probably a tombstone reaping, we need to process in the candidate
+             * list in reserve order, or else we can orphan tombstone entries by removing
+             * it's parent tombstone entry first.
+             */
+            id = idl_iterator_dereference_decrement(&(sr->sr_current), sr->sr_candidates);
+            if((sr->sr_current == 0) && op->o_reverse_search_state != LAST_REV_ENTRY){
+                /*
+                 * We hit the last entry and we need to process it, but the decrement
+                 * function will keep returning the last entry.  So we need to mark that
+                 * we have hit the last entry so we know to stop on the next pass.
+                 */
+                op->o_reverse_search_state = LAST_REV_ENTRY;
+            } else if(op->o_reverse_search_state == LAST_REV_ENTRY){
+                /* we're done */
+                id = NOID;
+            }
+        } else {
+            /* Process the candidate list in the normal order. */
+            id = idl_iterator_dereference_increment(&(sr->sr_current), sr->sr_candidates);
+        }
+
         if ( id == NOID )
         {
             /* No more entries */
@@ -1526,6 +1563,7 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
             }
             slapi_pblock_set( pb, SLAPI_SEARCH_RESULT_ENTRY, NULL );
             delete_search_result_set(pb, &sr);
+            op->o_reverse_search_state = 0;
             rc = 0;
             goto bail;
         }
@@ -1731,7 +1769,12 @@ ldbm_back_next_search_entry_ext( Slapi_PBlock *pb, int use_extension )
           }
         }
     }
+
 bail:
+    if(rc){
+        op->o_reverse_search_state = 0;
+    }
+
     return rc;
 }
 

+ 1 - 0
ldap/servers/slapd/back-ldbm/proto-back-ldbm.h

@@ -277,6 +277,7 @@ idl_iterator idl_iterator_increment(idl_iterator *i);
 idl_iterator idl_iterator_decrement(idl_iterator *i);
 ID idl_iterator_dereference(idl_iterator i, const IDList *idl);
 ID idl_iterator_dereference_increment(idl_iterator *i, const IDList *idl);
+ID idl_iterator_dereference_decrement(idl_iterator *i, const IDList *idl);
 size_t idl_sizeof(IDList *idl);
 int idl_store_block(backend *be,DB *db,DBT *key,IDList *idl,DB_TXN *txn,struct attrinfo *a);
 void idl_set_tune(int val);

+ 1 - 0
ldap/servers/slapd/operation.c

@@ -181,6 +181,7 @@ operation_init(Slapi_Operation *o, int flags)
 		o->o_connid = 0;
 		o->o_next = NULL;
 		o->o_flags= flags;
+		o->o_reverse_search_state = 0;
 		if ( config_get_accesslog_level() & LDAP_DEBUG_TIMING ) {
 			o->o_interval = PR_IntervalNow();
 		} else {

+ 1 - 0
ldap/servers/slapd/slap.h

@@ -1397,6 +1397,7 @@ typedef struct op {
 	struct slapi_operation_parameters o_params;
 	struct slapi_operation_results o_results;
 	int o_pagedresults_sizelimit;
+	int o_reverse_search_state;
 } Operation;
 
 /*

+ 4 - 4
ldap/servers/slapd/slapi-plugin.h

@@ -217,10 +217,10 @@ NSPR_API(PRUint32) PR_fprintf(struct PRFileDesc* fd, const char *fmt, ...)
 #define SLAPI_ATTR_FLAG_DSA_OPERATION	0x2000	/* USAGE dSAOperation */
 
 /* operation flags */
-#define SLAPI_OP_FLAG_INTERNAL		0x00020 /* An operation generated by the core server or a plugin. */
-#define SLAPI_OP_FLAG_NEVER_CHAIN	0x00800 /* Do not chain the operation */	
-#define SLAPI_OP_FLAG_NO_ACCESS_CHECK  	0x10000 /* Do not check for access control - bypass them */
-#define SLAPI_OP_FLAG_BYPASS_REFERRALS  0x40000 /* Useful for performing internal operations on read-only replica */
+#define SLAPI_OP_FLAG_INTERNAL		0x000020 /* An operation generated by the core server or a plugin. */
+#define SLAPI_OP_FLAG_NEVER_CHAIN	0x000800 /* Do not chain the operation */
+#define SLAPI_OP_FLAG_NO_ACCESS_CHECK	0x10000 /* Do not check for access control - bypass them */
+#define SLAPI_OP_FLAG_BYPASS_REFERRALS	0x40000 /* Useful for performing internal operations on read-only replica */
 
 #define SLAPI_OC_FLAG_REQUIRED	0x0001
 #define SLAPI_OC_FLAG_ALLOWED	0x0002

+ 24 - 20
ldap/servers/slapd/slapi-private.h

@@ -405,43 +405,47 @@ char *slapi_filter_to_string_internal( const struct slapi_filter *f, char *buf,
 
 /* operation.c */
 
-#define OP_FLAG_PS                       0x00001
-#define OP_FLAG_PS_CHANGESONLY           0x00002
-#define OP_FLAG_GET_EFFECTIVE_RIGHTS     0x00004  
-#define OP_FLAG_REPLICATED               0x00008 /* A Replicated Operation */
-#define OP_FLAG_REPL_FIXUP               0x00010 /* A Fixup Operation, 
+#define OP_FLAG_PS                       0x000001
+#define OP_FLAG_PS_CHANGESONLY           0x000002
+#define OP_FLAG_GET_EFFECTIVE_RIGHTS     0x000004
+#define OP_FLAG_REPLICATED               0x000008 /* A Replicated Operation */
+#define OP_FLAG_REPL_FIXUP               0x000010 /* A Fixup Operation,
                                                   * generated as a consequence
                                                   * of a Replicated Operation.
                                                   */
-#define OP_FLAG_INTERNAL                 SLAPI_OP_FLAG_INTERNAL  /* 0x00020 */ 
-#define OP_FLAG_ACTION_LOG_ACCESS        0x00040
-#define OP_FLAG_ACTION_LOG_AUDIT         0x00080
-#define OP_FLAG_ACTION_SCHEMA_CHECK      0x00100
-#define OP_FLAG_ACTION_LOG_CHANGES       0x00200
-#define OP_FLAG_ACTION_INVOKE_FOR_REPLOP 0x00400
-#define OP_FLAG_NEVER_CHAIN              SLAPI_OP_FLAG_NEVER_CHAIN  /* 0x0800 */
-#define OP_FLAG_TOMBSTONE_ENTRY          0x01000
-#define OP_FLAG_RESURECT_ENTRY           0x02000
-#define OP_FLAG_LEGACY_REPLICATION_DN    0x04000 /* Operation done by legacy 
+#define OP_FLAG_INTERNAL                 SLAPI_OP_FLAG_INTERNAL  /* 0x000020 */
+#define OP_FLAG_ACTION_LOG_ACCESS        0x000040
+#define OP_FLAG_ACTION_LOG_AUDIT         0x000080
+#define OP_FLAG_ACTION_SCHEMA_CHECK      0x000100
+#define OP_FLAG_ACTION_LOG_CHANGES       0x000200
+#define OP_FLAG_ACTION_INVOKE_FOR_REPLOP 0x000400
+#define OP_FLAG_NEVER_CHAIN              SLAPI_OP_FLAG_NEVER_CHAIN  /* 0x000800 */
+#define OP_FLAG_TOMBSTONE_ENTRY          0x001000
+#define OP_FLAG_RESURECT_ENTRY           0x002000
+#define OP_FLAG_LEGACY_REPLICATION_DN    0x004000 /* Operation done by legacy
                                                   * replication DN
                                                   */
-#define OP_FLAG_ACTION_NOLOG             0x08000 /* Do not log the entry in 
+#define OP_FLAG_ACTION_NOLOG             0x008000 /* Do not log the entry in
                                                   * audit log or change log
                                                   */
-#define OP_FLAG_SKIP_MODIFIED_ATTRS      0x10000 /* Do not update the 
+#define OP_FLAG_SKIP_MODIFIED_ATTRS      0x010000 /* Do not update the
                                                   * modifiersname,
                                                   * modifiedtimestamp, etc.
                                                   * attributes
                                                   */
-#define OP_FLAG_REPL_RUV                 0x20000 /* Flag to tell to the backend
+#define OP_FLAG_REPL_RUV                 0x020000 /* Flag to tell to the backend
                                                   * that the entry to be added/
                                                   * modified is RUV. This info
                                                   * is used to skip VLV op.
                                                   * (see #329951)
                                                   */
-#define OP_FLAG_PAGED_RESULTS            0x40000 /* simple paged results */
-#define OP_FLAG_SERVER_SIDE_SORTING      0x80000 /* server side sorting  */
+#define OP_FLAG_PAGED_RESULTS            0x040000 /* simple paged results */
+#define OP_FLAG_SERVER_SIDE_SORTING      0x080000 /* server side sorting  */
+#define OP_FLAG_REVERSE_CANDIDATE_ORDER  0x100000 /* reverse the search candidate list */
 
+/* reverse search states */
+#define REV_STARTED 1
+#define LAST_REV_ENTRY 2
 
 CSN *operation_get_csn(Slapi_Operation *op);
 void operation_set_csn(Slapi_Operation *op,CSN *csn);