|
|
@@ -323,6 +323,10 @@ repl5_tot_run(Private_Repl_Protocol *prp)
|
|
|
int init_retry = 0;
|
|
|
Replica *replica;
|
|
|
ReplicaId rid = 0; /* Used to create the replica keep alive subentry */
|
|
|
+ Slapi_Entry *suffix = NULL;
|
|
|
+ char **instances = NULL;
|
|
|
+ Slapi_Backend *be = NULL;
|
|
|
+ int is_entryrdn = 0;
|
|
|
|
|
|
PR_ASSERT(NULL != prp);
|
|
|
|
|
|
@@ -354,21 +358,21 @@ retry:
|
|
|
*/
|
|
|
if (rc != ACQUIRE_SUCCESS)
|
|
|
{
|
|
|
- int optype, ldaprc, wait_retry;
|
|
|
- conn_get_error(prp->conn, &optype, &ldaprc);
|
|
|
- if (rc == ACQUIRE_TRANSIENT_ERROR && INIT_RETRY_MAX > init_retry++) {
|
|
|
- wait_retry = init_retry * INIT_RETRY_INT;
|
|
|
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
|
|
|
- "acquire replica for total update, error: %d,"
|
|
|
+ int optype, ldaprc, wait_retry;
|
|
|
+ conn_get_error(prp->conn, &optype, &ldaprc);
|
|
|
+ if (rc == ACQUIRE_TRANSIENT_ERROR && INIT_RETRY_MAX > init_retry++) {
|
|
|
+ wait_retry = init_retry * INIT_RETRY_INT;
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
|
|
|
+ "acquire replica for total update, error: %d,"
|
|
|
" retrying in %d seconds.\n",
|
|
|
- ldaprc, wait_retry);
|
|
|
- DS_Sleep(PR_SecondsToInterval(wait_retry));
|
|
|
- goto retry;
|
|
|
- } else {
|
|
|
- agmt_set_last_init_status(prp->agmt, ldaprc,
|
|
|
- prp->last_acquire_response_code, 0, NULL);
|
|
|
- goto done;
|
|
|
- }
|
|
|
+ ldaprc, wait_retry);
|
|
|
+ DS_Sleep(PR_SecondsToInterval(wait_retry));
|
|
|
+ goto retry;
|
|
|
+ } else {
|
|
|
+ agmt_set_last_init_status(prp->agmt, ldaprc,
|
|
|
+ prp->last_acquire_response_code, 0, NULL);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
}
|
|
|
else if (prp->terminate)
|
|
|
{
|
|
|
@@ -405,48 +409,121 @@ retry:
|
|
|
and that the order implies that perent entry is always ahead of the
|
|
|
child entry in the list. Otherwise, the consumer would not be
|
|
|
properly updated because bulk import at the moment skips orphand entries. */
|
|
|
- /* XXXggood above assumption may not be valid if orphaned entry moved???? */
|
|
|
+ /* XXXggood above assumption may not be valid if orphaned entry moved???? */
|
|
|
|
|
|
agmt_set_last_init_status(prp->agmt, 0, 0, 0, "Total update in progress");
|
|
|
|
|
|
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Beginning total update of replica "
|
|
|
- "\"%s\".\n", agmt_get_long_name(prp->agmt));
|
|
|
+ "\"%s\".\n", agmt_get_long_name(prp->agmt));
|
|
|
|
|
|
/* RMREPL - need to send schema here */
|
|
|
|
|
|
pb = slapi_pblock_new ();
|
|
|
|
|
|
- /* we need to provide managedsait control so that referral entries can
|
|
|
- be replicated */
|
|
|
- ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
|
|
|
- ctrls[0] = create_managedsait_control ();
|
|
|
- ctrls[1] = create_backend_control(area_sdn);
|
|
|
+ replica = (Replica*) object_get_data(prp->replica_object);
|
|
|
+ /*
|
|
|
+ * Get the info about the entryrdn vs. entrydn from the backend.
|
|
|
+ * If NOT is_entryrdn, its ancestor entries are always found prior to an entry.
|
|
|
+ */
|
|
|
+ rc = slapi_lookup_instance_name_by_suffix((char *)slapi_sdn_get_dn(area_sdn), NULL, &instances, 1);
|
|
|
+ if (rc || !instances) {
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
|
|
|
+ "get the instance name for the suffix \"%s\".\n", slapi_sdn_get_dn(area_sdn));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ be = slapi_be_select_by_instance_name(instances[0]);
|
|
|
+ if (!be) {
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
|
|
|
+ "get the instance for the suffix \"%s\".\n", slapi_sdn_get_dn(area_sdn));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ rc = slapi_back_get_info(be, BACK_INFO_IS_ENTRYRDN, (void **)&is_entryrdn);
|
|
|
+ if (is_entryrdn) {
|
|
|
+ /*
|
|
|
+ * Supporting entries out of order -- parent could have a larger id than its children.
|
|
|
+ * Entires are retireved sorted by parentid without the allid threshold.
|
|
|
+ */
|
|
|
+ /* Get suffix */
|
|
|
+ rc = slapi_search_internal_get_entry(area_sdn, NULL, &suffix, repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION));
|
|
|
+ if (rc) {
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
|
|
|
+ "get the suffix entry \"%s\".\n", slapi_sdn_get_dn(area_sdn));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
|
|
|
- /* Time to make sure it exists a keep alive subentry for that replica */
|
|
|
- replica = (Replica*) object_get_data(prp->replica_object);
|
|
|
- if (replica)
|
|
|
- {
|
|
|
- rid = replica_get_rid(replica);
|
|
|
- }
|
|
|
- replica_subentry_check(area_sdn, rid);
|
|
|
-
|
|
|
- slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn),
|
|
|
- LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
|
|
|
- repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
|
|
|
-
|
|
|
- cb_data.prp = prp;
|
|
|
- cb_data.rc = 0;
|
|
|
- cb_data.num_entries = 0UL;
|
|
|
- cb_data.sleep_on_busy = 0UL;
|
|
|
- cb_data.last_busy = current_time ();
|
|
|
- cb_data.flowcontrol_detection = 0;
|
|
|
- cb_data.lock = PR_NewLock();
|
|
|
-
|
|
|
- /* This allows during perform_operation to check the callback data
|
|
|
- * especially to do flow contol on delta send msgid / recv msgid
|
|
|
- */
|
|
|
- conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
|
|
|
+ cb_data.prp = prp;
|
|
|
+ cb_data.rc = 0;
|
|
|
+ cb_data.num_entries = 1UL;
|
|
|
+ cb_data.sleep_on_busy = 0UL;
|
|
|
+ cb_data.last_busy = current_time ();
|
|
|
+ cb_data.flowcontrol_detection = 0;
|
|
|
+ cb_data.lock = PR_NewLock();
|
|
|
+
|
|
|
+ /* This allows during perform_operation to check the callback data
|
|
|
+ * especially to do flow contol on delta send msgid / recv msgid
|
|
|
+ */
|
|
|
+ conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
|
|
|
+
|
|
|
+ /* Send suffix first. */
|
|
|
+ rc = send_entry(suffix, (void *)&cb_data);
|
|
|
+ if (rc) {
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
|
|
|
+ "send the suffix entry \"%s\" to the consumer.\n", slapi_sdn_get_dn(area_sdn));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* we need to provide managedsait control so that referral entries can
|
|
|
+ be replicated */
|
|
|
+ ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
|
|
|
+ ctrls[0] = create_managedsait_control ();
|
|
|
+ ctrls[1] = create_backend_control(area_sdn);
|
|
|
+
|
|
|
+ /* Time to make sure it exists a keep alive subentry for that replica */
|
|
|
+ if (replica)
|
|
|
+ {
|
|
|
+ rid = replica_get_rid(replica);
|
|
|
+ }
|
|
|
+ replica_subentry_check(area_sdn, rid);
|
|
|
|
|
|
+ /* Send the subtree of the suffix in the order of parentid index plus ldapsubentry and nstombstone. */
|
|
|
+ slapi_search_internal_set_pb(pb, slapi_sdn_get_dn (area_sdn),
|
|
|
+ LDAP_SCOPE_SUBTREE, "(parentid>=1)", NULL, 0, ctrls, NULL,
|
|
|
+ repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), OP_FLAG_BULK_IMPORT);
|
|
|
+ cb_data.num_entries = 0UL;
|
|
|
+ } else {
|
|
|
+ /* Original total update */
|
|
|
+ /* we need to provide managedsait control so that referral entries can
|
|
|
+ be replicated */
|
|
|
+ ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
|
|
|
+ ctrls[0] = create_managedsait_control ();
|
|
|
+ ctrls[1] = create_backend_control(area_sdn);
|
|
|
+
|
|
|
+ /* Time to make sure it exists a keep alive subentry for that replica */
|
|
|
+ replica = (Replica*) object_get_data(prp->replica_object);
|
|
|
+ if (replica)
|
|
|
+ {
|
|
|
+ rid = replica_get_rid(replica);
|
|
|
+ }
|
|
|
+ replica_subentry_check(area_sdn, rid);
|
|
|
+
|
|
|
+ slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn),
|
|
|
+ LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
|
|
|
+ repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
|
|
|
+
|
|
|
+ cb_data.prp = prp;
|
|
|
+ cb_data.rc = 0;
|
|
|
+ cb_data.num_entries = 0UL;
|
|
|
+ cb_data.sleep_on_busy = 0UL;
|
|
|
+ cb_data.last_busy = current_time ();
|
|
|
+ cb_data.flowcontrol_detection = 0;
|
|
|
+ cb_data.lock = PR_NewLock();
|
|
|
+
|
|
|
+ /* This allows during perform_operation to check the callback data
|
|
|
+ * especially to do flow contol on delta send msgid / recv msgid
|
|
|
+ */
|
|
|
+ conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
|
|
|
+ }
|
|
|
+
|
|
|
/* Before we get started on sending entries to the replica, we need to
|
|
|
* setup things for async propagation:
|
|
|
* 1. Create a thread that will read the LDAP results from the connection.
|
|
|
@@ -470,7 +547,7 @@ retry:
|
|
|
slapi_search_internal_callback_pb (pb, &cb_data /* callback data */,
|
|
|
get_result /* result callback */,
|
|
|
send_entry /* entry callback */,
|
|
|
- NULL /* referral callback*/);
|
|
|
+ NULL /* referral callback*/);
|
|
|
|
|
|
/*
|
|
|
* After completing the sending operation (or optionally failing), we need to clean up
|