|
|
@@ -329,6 +329,7 @@ static void repl5_inc_result_threadmain(void *param)
|
|
|
}
|
|
|
if (conres != CONN_TIMEOUT)
|
|
|
{
|
|
|
+ int return_value;
|
|
|
int should_finish = 0;
|
|
|
if (message_id)
|
|
|
{
|
|
|
@@ -347,17 +348,26 @@ static void repl5_inc_result_threadmain(void *param)
|
|
|
|
|
|
conn_get_error_ex(conn, &operation_code, &connection_error, &ldap_error_string);
|
|
|
slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n", operation_code,connection_error,conres,message_id,ldap_error_string);
|
|
|
- rd->result = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, csn_str, uniqueid, replica_id, &should_finish, &(rd->num_changes_sent));
|
|
|
- if (rd->result || should_finish)
|
|
|
+ return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, csn_str, uniqueid, replica_id, &should_finish, &(rd->num_changes_sent));
|
|
|
+ if (return_value || should_finish)
|
|
|
{
|
|
|
- slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: got op result %d should finish %d\n", rd->result, should_finish);
|
|
|
+ slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: got op result %d should finish %d\n", return_value, should_finish);
|
|
|
/* If so then we need to take steps to abort the update process */
|
|
|
PR_Lock(rd->lock);
|
|
|
+ rd->result = return_value;
|
|
|
rd->abort = 1;
|
|
|
PR_Unlock(rd->lock);
|
|
|
/* We also need to log the error, including details stored from when the operation was sent */
|
|
|
/* we cannot finish yet - we still need to waitfor the pending results, then
|
|
|
the main repl code will shut down this thread */
|
|
|
+ /* we can finish if we have disconnected - in that case, there will be nothing
|
|
|
+ to read */
|
|
|
+ if (return_value == UPDATE_CONNECTION_LOST) {
|
|
|
+ finished = 1;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ /* old semantics had result set outside of lock */
|
|
|
+ rd->result = return_value;
|
|
|
}
|
|
|
}
|
|
|
/* Should we stop ? */
|
|
|
@@ -470,13 +480,17 @@ repl5_inc_waitfor_async_results(result_data *rd)
|
|
|
/* If so then we're done */
|
|
|
done = 1;
|
|
|
}
|
|
|
+ if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST))
|
|
|
+ {
|
|
|
+ done = 1; /* no connection == no more results */
|
|
|
+ }
|
|
|
PR_Unlock(rd->lock);
|
|
|
/* If not then sleep a bit */
|
|
|
DS_Sleep(PR_SecondsToInterval(1));
|
|
|
loops++;
|
|
|
/* If we sleep forever then we can conclude that something bad happened, and bail... */
|
|
|
/* Arbitrary 30 second delay : basically we should only expect to wait as long as it takes to process a few operations, which should be on the order of a second at most */
|
|
|
- if (loops > 300)
|
|
|
+ if (!done && (loops > 300))
|
|
|
{
|
|
|
/* Log a warning */
|
|
|
slapi_log_error(SLAPI_LOG_FATAL, NULL,
|
|
|
@@ -1551,7 +1565,7 @@ repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_cr
|
|
|
{
|
|
|
/* We lost the connection - enter backoff state */
|
|
|
|
|
|
- return_value = UPDATE_TRANSIENT_ERROR;
|
|
|
+ return_value = UPDATE_CONNECTION_LOST;
|
|
|
*finished = 1;
|
|
|
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
"%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
|
|
|
@@ -1794,7 +1808,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
{
|
|
|
/* We lost the connection - enter backoff state */
|
|
|
|
|
|
- return_value = UPDATE_TRANSIENT_ERROR;
|
|
|
+ return_value = UPDATE_CONNECTION_LOST;
|
|
|
finished = 1;
|
|
|
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
|
|
|
"%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
|
|
|
@@ -1914,19 +1928,24 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
|
|
|
return_value = UPDATE_YIELD;
|
|
|
finished = 1;
|
|
|
}
|
|
|
+ PR_Lock(rd->lock);
|
|
|
/* See if the result thread has hit a problem */
|
|
|
if (!finished && rd->abort)
|
|
|
{
|
|
|
return_value = rd->result;
|
|
|
finished = 1;
|
|
|
}
|
|
|
+ PR_Unlock(rd->lock);
|
|
|
} while (!finished);
|
|
|
|
|
|
/* Terminate the results reading thread */
|
|
|
if (!prp->repl50consumer)
|
|
|
{
|
|
|
/* We need to ensure that we wait until all the responses have been recived from our operations */
|
|
|
- repl5_inc_waitfor_async_results(rd);
|
|
|
+ if (return_value != UPDATE_CONNECTION_LOST) {
|
|
|
+ /* if connection was lost/closed, there will be nothing to read */
|
|
|
+ repl5_inc_waitfor_async_results(rd);
|
|
|
+ }
|
|
|
|
|
|
rc = repl5_inc_destroy_async_result_thread(rd);
|
|
|
if (rc) {
|