|
|
@@ -146,7 +146,6 @@ static void protocol_sleep(Private_Repl_Protocol *prp, PRIntervalTime duration);
|
|
|
static int send_updates(Private_Repl_Protocol *prp, RUV *ruv, PRUint32 *num_changes_sent);
|
|
|
static void repl5_inc_backoff_expired(time_t timer_fire_time, void *arg);
|
|
|
static int examine_update_vector(Private_Repl_Protocol *prp, RUV *ruv);
|
|
|
-static PRBool ignore_error_and_keep_going(int error);
|
|
|
static const char* state2name (int state);
|
|
|
static const char* event2name (int event);
|
|
|
static const char* op2string (int op);
|
|
|
@@ -450,11 +449,13 @@ repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
|
|
|
PR_Unlock(rd->lock);
|
|
|
}
|
|
|
|
|
|
-static void
|
|
|
+static int
|
|
|
repl5_inc_waitfor_async_results(result_data *rd)
|
|
|
{
|
|
|
int done = 0;
|
|
|
int loops = 0;
|
|
|
+ int rc = UPDATE_NO_MORE_UPDATES;
|
|
|
+
|
|
|
/* Keep pulling results off the LDAP connection until we catch up to the last message id stored in the rd */
|
|
|
while (!done && !slapi_is_shutting_down())
|
|
|
{
|
|
|
@@ -470,6 +471,10 @@ repl5_inc_waitfor_async_results(result_data *rd)
|
|
|
} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) {
|
|
|
done = 1; /* no connection == no more results */
|
|
|
}
|
|
|
+ /*
|
|
|
+ * Return the last operation result
|
|
|
+ */
|
|
|
+ rc = rd->result;
|
|
|
PR_Unlock(rd->lock);
|
|
|
if (!done) {
|
|
|
/* If not then sleep a bit */
|
|
|
@@ -487,6 +492,7 @@ repl5_inc_waitfor_async_results(result_data *rd)
|
|
|
done = 1;
|
|
|
}
|
|
|
}
|
|
|
+ return rc;
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -1467,78 +1473,84 @@ static int
|
|
|
repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_crc, int connection_error, char *csn_str, char *uniqueid, ReplicaId replica_id, int* finished, PRUint32 *num_changes_sent)
|
|
|
{
|
|
|
int return_value = 0;
|
|
|
-
|
|
|
- /* Indentation is wrong here so we can get a sensible cvs diff */
|
|
|
- if (CONN_OPERATION_SUCCESS != replay_crc)
|
|
|
- {
|
|
|
- /* Figure out what to do next */
|
|
|
- if (CONN_OPERATION_FAILED == replay_crc)
|
|
|
- {
|
|
|
- /* Map ldap error code to return value */
|
|
|
- if (!ignore_error_and_keep_going(connection_error))
|
|
|
- {
|
|
|
- return_value = UPDATE_TRANSIENT_ERROR;
|
|
|
- *finished = 1;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
|
|
|
- }
|
|
|
- slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
|
|
|
- "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
|
|
|
- agmt_get_long_name(prp->agmt),
|
|
|
- uniqueid, csn_str,
|
|
|
- ldap_err2string(connection_error), connection_error,
|
|
|
- *finished ? "Will retry later" : "Skipping");
|
|
|
- }
|
|
|
- else if (CONN_NOT_CONNECTED == replay_crc)
|
|
|
- {
|
|
|
- /* We lost the connection - enter backoff state */
|
|
|
|
|
|
- return_value = UPDATE_CONNECTION_LOST;
|
|
|
- *finished = 1;
|
|
|
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
- "%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
|
|
|
- "%s(%d). Will retry later.\n",
|
|
|
- agmt_get_long_name(prp->agmt),
|
|
|
- uniqueid, csn_str,
|
|
|
- connection_error ? ldap_err2string(connection_error) : "Connection lost",
|
|
|
- connection_error);
|
|
|
- }
|
|
|
- else if (CONN_TIMEOUT == replay_crc)
|
|
|
- {
|
|
|
- return_value = UPDATE_TIMEOUT;
|
|
|
- *finished = 1;
|
|
|
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
- "%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
|
|
|
- "%s.\n",
|
|
|
- agmt_get_long_name(prp->agmt),
|
|
|
- uniqueid, csn_str,
|
|
|
- connection_error ? ldap_err2string(connection_error) : "Timeout");
|
|
|
- }
|
|
|
- else if (CONN_LOCAL_ERROR == replay_crc)
|
|
|
- {
|
|
|
- /*
|
|
|
- * Something bad happened on the local server - enter
|
|
|
- * backoff state.
|
|
|
- */
|
|
|
- return_value = UPDATE_TRANSIENT_ERROR;
|
|
|
- *finished = 1;
|
|
|
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
- "%s: Failed to replay change (uniqueid %s, CSN %s): "
|
|
|
- "Local error. Will retry later.\n",
|
|
|
- agmt_get_long_name(prp->agmt),
|
|
|
- uniqueid, csn_str);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* Positive response received */
|
|
|
- (*num_changes_sent)++;
|
|
|
- agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
|
|
|
- }
|
|
|
- return return_value;
|
|
|
+ if (CONN_OPERATION_SUCCESS != replay_crc)
|
|
|
+ {
|
|
|
+ /* Figure out what to do next */
|
|
|
+ if (CONN_OPERATION_FAILED == replay_crc)
|
|
|
+ {
|
|
|
+ /* Map ldap error code to return value */
|
|
|
+ if (!ignore_error_and_keep_going(connection_error))
|
|
|
+ {
|
|
|
+ return_value = UPDATE_TRANSIENT_ERROR;
|
|
|
+ *finished = 1;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
|
|
|
+ }
|
|
|
+ slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
|
|
|
+ "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
|
|
|
+ agmt_get_long_name(prp->agmt),
|
|
|
+ uniqueid, csn_str,
|
|
|
+ ldap_err2string(connection_error), connection_error,
|
|
|
+ *finished ? "Will retry later" : "Skipping");
|
|
|
+ }
|
|
|
+ else if (CONN_NOT_CONNECTED == replay_crc)
|
|
|
+ {
|
|
|
+ /* We lost the connection - enter backoff state */
|
|
|
+
|
|
|
+ return_value = UPDATE_CONNECTION_LOST;
|
|
|
+ *finished = 1;
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
+ "%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
|
|
|
+ "%s(%d). Will retry later.\n",
|
|
|
+ agmt_get_long_name(prp->agmt),
|
|
|
+ uniqueid, csn_str,
|
|
|
+ connection_error ? ldap_err2string(connection_error) : "Connection lost",
|
|
|
+ connection_error);
|
|
|
+ }
|
|
|
+ else if (CONN_TIMEOUT == replay_crc)
|
|
|
+ {
|
|
|
+ return_value = UPDATE_TIMEOUT;
|
|
|
+ *finished = 1;
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
+ "%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
|
|
|
+ "%s.\n",
|
|
|
+ agmt_get_long_name(prp->agmt),
|
|
|
+ uniqueid, csn_str,
|
|
|
+ connection_error ? ldap_err2string(connection_error) : "Timeout");
|
|
|
+ }
|
|
|
+ else if (CONN_LOCAL_ERROR == replay_crc)
|
|
|
+ {
|
|
|
+ /*
|
|
|
+ * Something bad happened on the local server - enter
|
|
|
+ * backoff state.
|
|
|
+ */
|
|
|
+ return_value = UPDATE_TRANSIENT_ERROR;
|
|
|
+ *finished = 1;
|
|
|
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
+ "%s: Failed to replay change (uniqueid %s, CSN %s): "
|
|
|
+ "Local error. Will retry later.\n",
|
|
|
+ agmt_get_long_name(prp->agmt),
|
|
|
+ uniqueid, csn_str);
|
|
|
+ }
|
|
|
+ if (*finished){
|
|
|
+ /*
|
|
|
+ * A serious error has occurred, the consumer might have closed
|
|
|
+ * the connection already, but we need to close the conn on the
|
|
|
+ * supplier side to properly set the conn structure as closed.
|
|
|
+ */
|
|
|
+ conn_disconnect(prp->conn);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* Positive response received */
|
|
|
+ (*num_changes_sent)++;
|
|
|
+ agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
|
|
|
+ }
|
|
|
+ return return_value;
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -1556,7 +1568,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
{
|
|
|
CL5Entry entry;
|
|
|
slapi_operation_parameters op;
|
|
|
- int return_value;
|
|
|
+ int return_value = 0;
|
|
|
int rc;
|
|
|
CL5ReplayIterator *changelog_iterator;
|
|
|
int message_id = 0;
|
|
|
@@ -1929,9 +1941,23 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
{
|
|
|
/* We need to ensure that we wait until all the responses have been received from our operations */
|
|
|
if (return_value != UPDATE_CONNECTION_LOST) {
|
|
|
- rd->WaitForAsyncResults = agmt_get_WaitForAsyncResults(prp->agmt);
|
|
|
- /* if connection was lost/closed, there will be nothing to read */
|
|
|
- repl5_inc_waitfor_async_results(rd);
|
|
|
+ /*
|
|
|
+ * If we already have an error, there is no need to check the
|
|
|
+ * async result thread anymore.
|
|
|
+ */
|
|
|
+ if (return_value == UPDATE_NO_MORE_UPDATES)
|
|
|
+ {
|
|
|
+ /*
|
|
|
+ * We need to double check that an error hasn't popped up from
|
|
|
+ * the async result thread since our last check.
|
|
|
+ */
|
|
|
+ int final_result;
|
|
|
+
|
|
|
+ rd->WaitForAsyncResults = agmt_get_WaitForAsyncResults(prp->agmt);
|
|
|
+ if((final_result = repl5_inc_waitfor_async_results(rd))){
|
|
|
+ return_value = final_result;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
rc = repl5_inc_destroy_async_result_thread(rd);
|
|
|
@@ -2220,7 +2246,7 @@ examine_update_vector(Private_Repl_Protocol *prp, RUV *remote_ruv)
|
|
|
* We stop if there's some indication that the server just completely
|
|
|
* failed to process the operation, e.g. LDAP_OPERATIONS_ERROR.
|
|
|
*/
|
|
|
-static PRBool
|
|
|
+PRBool
|
|
|
ignore_error_and_keep_going(int error)
|
|
|
{
|
|
|
int return_value = PR_FALSE;
|