|
|
@@ -36,6 +36,11 @@ Perhaps these events should be properties of the main protocol.
|
|
|
#include "repl5_prot_private.h"
|
|
|
#include "cl5_api.h"
|
|
|
|
|
|
+#include "repl5.h"
|
|
|
+#include "repl5_prot_private.h"
|
|
|
+#include "cl5_api.h"
|
|
|
+#include "slapi-plugin.h"
|
|
|
+
|
|
|
extern int slapi_log_urp;
|
|
|
|
|
|
/*** from proto-slap.h ***/
|
|
|
@@ -82,6 +87,7 @@ typedef struct result_data
|
|
|
int flowcontrol_detection;
|
|
|
int result; /* The UPDATE_TRANSIENT_ERROR etc */
|
|
|
int WaitForAsyncResults;
|
|
|
+ time_t abort_time;
|
|
|
} result_data;
|
|
|
|
|
|
/* Various states the incremental protocol can pass through */
|
|
|
@@ -121,6 +127,7 @@ typedef struct result_data
|
|
|
#define EXAMINE_RUV_PARAM_ERROR 405
|
|
|
|
|
|
#define MAX_CHANGES_PER_SESSION 10000
|
|
|
+
|
|
|
/*
|
|
|
* Maximum time to wait between replication sessions. If we
|
|
|
* don't see any updates for a period equal to this interval,
|
|
|
@@ -240,19 +247,21 @@ repl5_inc_result_threadmain(void *param)
|
|
|
Repl_Connection *conn = rd->prp->conn;
|
|
|
int finished = 0;
|
|
|
int message_id = 0;
|
|
|
+ int yield_session = 0;
|
|
|
|
|
|
slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain starting\n");
|
|
|
while (!finished)
|
|
|
{
|
|
|
+ LDAPControl **returned_controls = NULL;
|
|
|
repl5_inc_operation *op = NULL;
|
|
|
- int connection_error = 0;
|
|
|
+ ReplicaId replica_id = 0;
|
|
|
char *csn_str = NULL;
|
|
|
char *uniqueid = NULL;
|
|
|
- ReplicaId replica_id = 0;
|
|
|
- int operation_code = 0;
|
|
|
char *ldap_error_string = NULL;
|
|
|
time_t time_now = 0;
|
|
|
time_t start_time = time( NULL );
|
|
|
+ int connection_error = 0;
|
|
|
+ int operation_code = 0;
|
|
|
int backoff_time = 1;
|
|
|
|
|
|
/* Read the next result */
|
|
|
@@ -264,7 +273,7 @@ repl5_inc_result_threadmain(void *param)
|
|
|
|
|
|
while (!finished)
|
|
|
{
|
|
|
- conres = conn_read_result_ex(conn, NULL, NULL, NULL, LDAP_RES_ANY, &message_id, 0);
|
|
|
+ conres = conn_read_result_ex(conn, NULL, NULL, &returned_controls, LDAP_RES_ANY, &message_id, 0);
|
|
|
slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: read result for message_id %d\n", message_id);
|
|
|
/* Timeout here means that we didn't block, not a real timeout */
|
|
|
if (CONN_TIMEOUT == conres)
|
|
|
@@ -292,9 +301,19 @@ repl5_inc_result_threadmain(void *param)
|
|
|
finished = 1;
|
|
|
}
|
|
|
PR_Unlock(rd->lock);
|
|
|
- } else
|
|
|
- {
|
|
|
- /* Something other than a timeout, so we exit the loop */
|
|
|
+ } else {
|
|
|
+ /*
|
|
|
+ * Something other than a timeout, so we exit the loop.
|
|
|
+ * First check if we were told to abort the session
|
|
|
+ */;
|
|
|
+ Replica *r = (Replica*)object_get_data(rd->prp->replica_object);
|
|
|
+ if (replica_get_release_timeout(r) &&
|
|
|
+ slapi_control_present(returned_controls,
|
|
|
+ REPL_ABORT_SESSION_OID,
|
|
|
+ NULL, NULL))
|
|
|
+ {
|
|
|
+ yield_session = 1;
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
@@ -318,21 +337,29 @@ repl5_inc_result_threadmain(void *param)
|
|
|
}
|
|
|
|
|
|
conn_get_error_ex(conn, &operation_code, &connection_error, &ldap_error_string);
|
|
|
- slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n", operation_code,connection_error,conres,message_id,ldap_error_string);
|
|
|
- return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, csn_str, uniqueid, replica_id, &should_finish, &(rd->num_changes_sent));
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, NULL,
|
|
|
+ "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n",
|
|
|
+ operation_code,connection_error,conres,message_id,ldap_error_string);
|
|
|
+ return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error,
|
|
|
+ csn_str, uniqueid, replica_id, &should_finish,
|
|
|
+ &(rd->num_changes_sent));
|
|
|
if (return_value || should_finish)
|
|
|
{
|
|
|
- slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: got op result %d should finish %d\n", return_value, should_finish);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, NULL,
|
|
|
+ "repl5_inc_result_threadmain: got op result %d should finish %d\n",
|
|
|
+ return_value, should_finish);
|
|
|
/* If so then we need to take steps to abort the update process */
|
|
|
PR_Lock(rd->lock);
|
|
|
rd->result = return_value;
|
|
|
- rd->abort = 1;
|
|
|
+ rd->abort = ABORT_SESSION;
|
|
|
PR_Unlock(rd->lock);
|
|
|
- /* We also need to log the error, including details stored from when the operation was sent */
|
|
|
- /* we cannot finish yet - we still need to waitfor the pending results, then
|
|
|
- the main repl code will shut down this thread */
|
|
|
- /* we can finish if we have disconnected - in that case, there will be nothing
|
|
|
- to read */
|
|
|
+ /*
|
|
|
+ * We also need to log the error, including details stored from
|
|
|
+ * when the operation was sent. We cannot finish yet - we still
|
|
|
+ * need to wait for the pending results, then the main repl code
|
|
|
+ * will shut down this thread. We can finish if we have
|
|
|
+ * disconnected - in that case, there will be nothing to read
|
|
|
+ */
|
|
|
if (return_value == UPDATE_CONNECTION_LOST) {
|
|
|
finished = 1;
|
|
|
}
|
|
|
@@ -341,8 +368,16 @@ repl5_inc_result_threadmain(void *param)
|
|
|
rd->result = return_value;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
/* Should we stop ? */
|
|
|
PR_Lock(rd->lock);
|
|
|
+ if (!finished && yield_session && rd->abort != SESSION_ABORTED && rd->abort_time == 0) {
|
|
|
+ rd->abort_time = time( NULL );
|
|
|
+ rd->abort = SESSION_ABORTED; /* only set the abort time once */
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, "repl5_inc_result_threadmain",
|
|
|
+ "Abort control detected, setting abort time...(%s)\n",
|
|
|
+ agmt_get_long_name(rd->prp->agmt));
|
|
|
+ }
|
|
|
if (rd->stop_result_thread)
|
|
|
{
|
|
|
finished = 1;
|
|
|
@@ -468,7 +503,8 @@ repl5_inc_waitfor_async_results(result_data *rd)
|
|
|
if (rd->last_message_id_received >= rd->last_message_id_sent) {
|
|
|
/* If so then we're done */
|
|
|
done = 1;
|
|
|
- } else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) {
|
|
|
+ } else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST))
|
|
|
+ {
|
|
|
done = 1; /* no connection == no more results */
|
|
|
}
|
|
|
/*
|
|
|
@@ -846,10 +882,10 @@ repl5_inc_run(Private_Repl_Protocol *prp)
|
|
|
if (!busywaittime){
|
|
|
busywaittime = repl5_get_backoff_min(prp);
|
|
|
}
|
|
|
- prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime, busywaittime);
|
|
|
+ prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime , busywaittime);
|
|
|
} else {
|
|
|
prp_priv->backoff = backoff_new(BACKOFF_EXPONENTIAL, repl5_get_backoff_min(prp),
|
|
|
- repl5_get_backoff_max(prp));
|
|
|
+ repl5_get_backoff_max(prp));
|
|
|
}
|
|
|
next_state = STATE_BACKOFF;
|
|
|
backoff_reset(prp_priv->backoff, repl5_inc_backoff_expired, (void *)prp);
|
|
|
@@ -1055,6 +1091,7 @@ repl5_inc_run(Private_Repl_Protocol *prp)
|
|
|
} else if (rc == UPDATE_YIELD){
|
|
|
dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_YIELD -> STATE_BACKOFF_START");
|
|
|
agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update succeeded and yielded");
|
|
|
+ use_busy_backoff_timer = PR_TRUE;
|
|
|
next_state = STATE_BACKOFF_START;
|
|
|
} else if (rc == UPDATE_TRANSIENT_ERROR){
|
|
|
dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_TRANSIENT_ERROR -> STATE_BACKOFF_START");
|
|
|
@@ -1099,6 +1136,7 @@ repl5_inc_run(Private_Repl_Protocol *prp)
|
|
|
ruv_destroy(&ruv); ruv = NULL;
|
|
|
}
|
|
|
agmt_update_done(prp->agmt, 0);
|
|
|
+
|
|
|
/* If timed out, close the connection after released the replica */
|
|
|
release_replica(prp);
|
|
|
if (rc == UPDATE_TIMEOUT) {
|
|
|
@@ -1681,12 +1719,14 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- int finished = 0;
|
|
|
ConnResult replay_crc;
|
|
|
- char csn_str[CSN_STRSIZE];
|
|
|
+ Replica *replica = (Replica*) object_get_data(prp->replica_object);
|
|
|
PRBool subentry_update_needed = PR_FALSE;
|
|
|
+ PRUint64 release_timeout = replica_get_release_timeout(replica);
|
|
|
+ char csn_str[CSN_STRSIZE];
|
|
|
int skipped_updates = 0;
|
|
|
int fractional_repl;
|
|
|
+ int finished = 0;
|
|
|
#define FRACTIONAL_SKIPPED_THRESHOLD 100
|
|
|
|
|
|
/* Start the results reading thread */
|
|
|
@@ -1906,7 +1946,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
}
|
|
|
PR_Lock(rd->lock);
|
|
|
/* See if the result thread has hit a problem */
|
|
|
- if (!finished && rd->abort)
|
|
|
+
|
|
|
+ if(!finished && rd->abort_time){
|
|
|
+ time_t current_time = time ( NULL );
|
|
|
+ if ((current_time - rd->abort_time) >= release_timeout){
|
|
|
+ rd->result = UPDATE_YIELD;
|
|
|
+ return_value = UPDATE_YIELD;
|
|
|
+ finished = 1;
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
|
|
|
+ "Aborting send_updates...(%s)\n",
|
|
|
+ agmt_get_long_name(rd->prp->agmt));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!finished && rd->abort == ABORT_SESSION)
|
|
|
{
|
|
|
return_value = rd->result;
|
|
|
finished = 1;
|
|
|
@@ -1916,10 +1969,9 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
|
|
|
if (fractional_repl && subentry_update_needed)
|
|
|
{
|
|
|
- Replica *replica;
|
|
|
ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
|
|
|
Slapi_DN *replarea_sdn = NULL;
|
|
|
- replica = (Replica*) object_get_data(prp->replica_object);
|
|
|
+
|
|
|
if (replica)
|
|
|
{
|
|
|
rid = replica_get_rid(replica);
|
|
|
@@ -1945,7 +1997,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
* If we already have an error, there is no need to check the
|
|
|
* async result thread anymore.
|
|
|
*/
|
|
|
- if (return_value == UPDATE_NO_MORE_UPDATES)
|
|
|
+ if (return_value == UPDATE_NO_MORE_UPDATES || return_value == UPDATE_YIELD)
|
|
|
{
|
|
|
/*
|
|
|
* We need to double check that an error hasn't popped up from
|