repl5_protocol_util.c 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. #include <config.h>
  11. #endif
  12. /* repl5_protocol_util.c */
  13. /*
  14. Code common to both incremental and total protocols.
  15. */
  16. #include "repl5.h"
  17. #include "repl5_prot_private.h"
  18. /*
  19. * Obtain a current CSN (e.g. one that would have been
  20. * generated for an operation occurring at this time)
  21. * for a given replica.
  22. */
  23. CSN *
  24. get_current_csn(Slapi_DN *replarea_sdn)
  25. {
  26. Replica *replica;
  27. Object *gen_obj;
  28. CSNGen *gen;
  29. CSN *current_csn = NULL;
  30. if (NULL != replarea_sdn) {
  31. replica = replica_get_replica_from_dn(replarea_sdn);
  32. if (NULL != replica) {
  33. gen_obj = replica_get_csngen(replica);
  34. if (NULL != gen_obj) {
  35. gen = (CSNGen *)object_get_data(gen_obj);
  36. if (NULL != gen) {
  37. if (csngen_new_csn(gen, &current_csn,
  38. PR_FALSE /* notify */) != CSN_SUCCESS) {
  39. csn_free(&current_csn);
  40. }
  41. object_release(gen_obj);
  42. }
  43. }
  44. }
  45. }
  46. return current_csn;
  47. }
  48. /*
  49. * Acquire exclusive access to a replica. Send a start replication extended
  50. * operation to the replica. The response will contain a success code, and
  51. * optionally the replica's update vector if acquisition is successful.
  52. * This function returns one of the following:
  53. * ACQUIRE_SUCCESS - the replica was acquired, and we have exclusive update access
  54. * ACQUIRE_REPLICA_BUSY - another supplier was updating the replica
  55. * ACQUIRE_FATAL_ERROR - something bad happened, and it's not likely to improve
  56. * if we wait.
  57. * ACQUIRE_TRANSIENT_ERROR - something bad happened, but it's probably worth
  58. * another try after waiting a while.
  59. * If ACQUIRE_SUCCESS is returned, then ruv will point to the replica's update
  60. * vector. It's possible that the replica does something goofy and doesn't
  61. * return us an update vector, so be prepared for ruv to be NULL (but this is
  62. * an error).
  63. */
  64. int
  65. acquire_replica(Private_Repl_Protocol *prp, char *prot_oid, RUV **ruv)
  66. {
  67. int return_value;
  68. ConnResult crc;
  69. Repl_Connection *conn;
  70. struct berval *retdata = NULL;
  71. char *retoid = NULL;
  72. Slapi_DN *replarea_sdn = NULL;
  73. struct berval **ruv_bervals = NULL;
  74. CSN *current_csn = NULL;
  75. PR_ASSERT(prp && prot_oid);
  76. if (prp->replica_acquired) /* we already acquire replica */
  77. {
  78. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name,
  79. "acquire_replica - %s: Remote replica already acquired\n",
  80. agmt_get_long_name(prp->agmt));
  81. return_value = ACQUIRE_FATAL_ERROR;
  82. return ACQUIRE_SUCCESS;
  83. }
  84. if (NULL != ruv) {
  85. ruv_destroy(ruv);
  86. }
  87. if (strcmp(prot_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0) {
  88. Object *supl_ruv_obj, *cons_ruv_obj;
  89. PRBool is_newer = PR_FALSE;
  90. supl_ruv_obj = replica_get_ruv(prp->replica);
  91. cons_ruv_obj = agmt_get_consumer_ruv(prp->agmt);
  92. is_newer = ruv_is_newer(supl_ruv_obj, cons_ruv_obj);
  93. if (supl_ruv_obj)
  94. object_release(supl_ruv_obj);
  95. if (cons_ruv_obj)
  96. object_release(cons_ruv_obj);
  97. if (is_newer == PR_FALSE) {
  98. prp->last_acquire_response_code = NSDS50_REPL_UPTODATE;
  99. return ACQUIRE_CONSUMER_WAS_UPTODATE;
  100. }
  101. }
  102. prp->last_acquire_response_code = NSDS50_REPL_REPLICA_NO_RESPONSE;
  103. /* Get the connection */
  104. conn = prp->conn;
  105. crc = conn_connect(conn);
  106. if (CONN_OPERATION_FAILED == crc) {
  107. int operation, error;
  108. conn_get_error(conn, &operation, &error);
  109. agmt_set_last_update_status(prp->agmt, error, NSDS50_REPL_CONN_ERROR,
  110. "Problem connecting to replica");
  111. return_value = ACQUIRE_TRANSIENT_ERROR;
  112. } else if (CONN_SSL_NOT_ENABLED == crc) {
  113. int operation, error;
  114. conn_get_error(conn, &operation, &error);
  115. agmt_set_last_update_status(prp->agmt, error, NSDS50_REPL_CONN_ERROR,
  116. "Problem connecting to replica (SSL not enabled)");
  117. return_value = ACQUIRE_FATAL_ERROR;
  118. } else {
  119. /* we don't want the timer to go off in the middle of an operation */
  120. conn_cancel_linger(conn);
  121. /* Does the remote replica support the 5.0 protocol? */
  122. crc = conn_replica_supports_ds5_repl(conn);
  123. if (CONN_DOES_NOT_SUPPORT_DS5_REPL == crc) {
  124. return_value = ACQUIRE_FATAL_ERROR;
  125. } else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) {
  126. /* We don't know anything about the remote replica. Try again later. */
  127. return_value = ACQUIRE_TRANSIENT_ERROR;
  128. goto error;
  129. }
  130. /* Find out what level of replication the replica supports. */
  131. crc = conn_replica_supports_ds90_repl(conn);
  132. if (CONN_DOES_NOT_SUPPORT_DS90_REPL == crc) {
  133. /* Does the remote replica support the 7.1 protocol? */
  134. crc = conn_replica_supports_ds71_repl(conn);
  135. if (CONN_DOES_NOT_SUPPORT_DS71_REPL == crc) {
  136. /* This is a pre-7.1 replica. */
  137. prp->repl50consumer = 1;
  138. } else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) {
  139. /* We don't know anything about the remote replica. Try again later. */
  140. return_value = ACQUIRE_TRANSIENT_ERROR;
  141. goto error;
  142. } else {
  143. /* This replica is later than 7.1, but pre-9.0. */
  144. prp->repl71consumer = 1;
  145. }
  146. } else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) {
  147. /* We don't know anything about the remote replica. Try again later. */
  148. return_value = ACQUIRE_TRANSIENT_ERROR;
  149. goto error;
  150. } else {
  151. /* This replica is a 9.0 or later replica. */
  152. prp->repl90consumer = 1;
  153. }
  154. /* Good to go. Start the protocol. */
  155. /* Obtain a current CSN */
  156. replarea_sdn = agmt_get_replarea(prp->agmt);
  157. current_csn = get_current_csn(replarea_sdn);
  158. if (NULL != current_csn) {
  159. struct berval *payload = NULL;
  160. int send_msgid = 0;
  161. if (prp->repl90consumer) {
  162. int is_total = 0;
  163. char *data_guid = NULL;
  164. struct berval *data = NULL;
  165. /* Check if this is a total or incremental update. */
  166. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0) {
  167. is_total = 1;
  168. }
  169. /* Call pre-start replication session callback. This callback
  170. * may have extra data to be sent to the replica. */
  171. if (repl_session_plugin_call_pre_acquire_cb(prp->agmt, is_total,
  172. &data_guid, &data) == 0) {
  173. payload = NSDS90StartReplicationRequest_new(
  174. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  175. NULL, current_csn, data_guid, data);
  176. slapi_ch_free_string(&data_guid);
  177. ber_bvfree(data);
  178. data = NULL;
  179. } else {
  180. return_value = ACQUIRE_TRANSIENT_ERROR;
  181. slapi_ch_free_string(&data_guid);
  182. ber_bvfree(data);
  183. data = NULL;
  184. goto error;
  185. }
  186. } else {
  187. payload = NSDS50StartReplicationRequest_new(
  188. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  189. NULL /* XXXggood need to provide referral(s) */, current_csn);
  190. }
  191. /* JCMREPL - Need to extract the referrals from the RUV */
  192. crc = conn_send_extended_operation(conn,
  193. prp->repl90consumer ? REPL_START_NSDS90_REPLICATION_REQUEST_OID : REPL_START_NSDS50_REPLICATION_REQUEST_OID, payload,
  194. NULL /* update control */, &send_msgid /* Message ID */);
  195. if (CONN_OPERATION_SUCCESS != crc) {
  196. int operation, error;
  197. conn_get_error(conn, &operation, &error);
  198. /* Couldn't send the extended operation */
  199. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  200. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "acquire_replica - "
  201. "%s: Unable to send a startReplication "
  202. "extended operation to consumer (%s). Will retry later.\n",
  203. agmt_get_long_name(prp->agmt),
  204. error ? ldap_err2string(error) : "unknown error");
  205. }
  206. /* Since the operation request is async, we need to wait for the response here */
  207. crc = conn_read_result_ex(conn, &retoid, &retdata, NULL, send_msgid, NULL, 1);
  208. ber_bvfree(payload);
  209. payload = NULL;
  210. /* Look at the response we got. */
  211. if (CONN_OPERATION_SUCCESS == crc) {
  212. /*
  213. * Extop was processed. Look at extop response to see if we're
  214. * permitted to go ahead.
  215. */
  216. int extop_result;
  217. char *data_guid = NULL;
  218. struct berval *data = NULL;
  219. int extop_rc = decode_repl_ext_response(retdata, &extop_result,
  220. &ruv_bervals, &data_guid,
  221. &data);
  222. if (0 == extop_rc) {
  223. prp->last_acquire_response_code = extop_result;
  224. switch (extop_result) {
  225. /* XXXggood handle other error codes here */
  226. case NSDS50_REPL_INTERNAL_ERROR:
  227. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  228. "acquire_replica - "
  229. "%s: Unable to acquire replica: "
  230. "an internal error occurred on the remote replica. "
  231. "Replication is aborting.\n",
  232. agmt_get_long_name(prp->agmt));
  233. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  234. "Failed to acquire replica: "
  235. "Internal error occurred on the remote replica");
  236. return_value = ACQUIRE_FATAL_ERROR;
  237. break;
  238. case NSDS50_REPL_PERMISSION_DENIED:
  239. /* Not allowed to send updates */
  240. {
  241. char *repl_binddn = agmt_get_binddn(prp->agmt);
  242. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  243. "acquire_replica - "
  244. "%s: Unable to acquire replica: permission denied. "
  245. "The bind dn \"%s\" does not have permission to "
  246. "supply replication updates to the replica. "
  247. "Will retry later.\n",
  248. agmt_get_long_name(prp->agmt), repl_binddn);
  249. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  250. "Unable to acquire replica: permission denied. "
  251. "The bind dn does not have permission to "
  252. "supply replication updates to the replica. "
  253. "Will retry later.");
  254. slapi_ch_free((void **)&repl_binddn);
  255. return_value = ACQUIRE_TRANSIENT_ERROR;
  256. break;
  257. }
  258. case NSDS50_REPL_NO_SUCH_REPLICA:
  259. /* There is no such replica on the consumer */
  260. {
  261. Slapi_DN *repl_root = agmt_get_replarea(prp->agmt);
  262. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  263. "acquire_replica - "
  264. "%s: Unable to acquire replica: there is no "
  265. "replicated area \"%s\" on the consumer server. "
  266. "Replication is aborting.\n",
  267. agmt_get_long_name(prp->agmt),
  268. slapi_sdn_get_dn(repl_root));
  269. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  270. "Unable to acquire replica: there is no "
  271. "replicated area on the consumer server. "
  272. "Replication is aborting.");
  273. slapi_sdn_free(&repl_root);
  274. return_value = ACQUIRE_FATAL_ERROR;
  275. break;
  276. }
  277. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW:
  278. /* Large clock skew between the consumer and the supplier */
  279. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  280. "acquire_replica - "
  281. "%s: Unable to acquire replica: "
  282. "Excessive clock skew between the supplier and "
  283. "the consumer. Replication is aborting.\n",
  284. agmt_get_long_name(prp->agmt));
  285. return_value = ACQUIRE_FATAL_ERROR;
  286. break;
  287. case NSDS50_REPL_DECODING_ERROR:
  288. /* We sent something the replica couldn't understand. */
  289. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  290. "acquire_replica - "
  291. "%s: Unable to acquire replica: "
  292. "the consumer was unable to decode the "
  293. "startReplicationRequest extended operation sent by the "
  294. "supplier. Replication is aborting.\n",
  295. agmt_get_long_name(prp->agmt));
  296. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  297. "Unable to acquire replica: "
  298. "the consumer was unable to decode the "
  299. "startReplicationRequest extended operation sent "
  300. "by the supplier. Replication is aborting.");
  301. return_value = ACQUIRE_FATAL_ERROR;
  302. break;
  303. case NSDS50_REPL_REPLICA_BUSY:
  304. /* Someone else is updating the replica. Try later. */
  305. /* if acquire_replica is called for replica
  306. initialization, log REPLICA_BUSY, too */
  307. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID,
  308. prot_oid) == 0) {
  309. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name,
  310. "acquire_replica - "
  311. "%s: Unable to acquire replica: "
  312. "the replica is currently being updated"
  313. "by another supplier.\n",
  314. agmt_get_long_name(prp->agmt));
  315. } else /* REPL_NSDS50_INCREMENTAL_PROTOCOL_OID */
  316. {
  317. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  318. "acquire_replica - "
  319. "%s: Unable to acquire replica: "
  320. "the replica is currently being updated"
  321. "by another supplier. Will try later\n",
  322. agmt_get_long_name(prp->agmt));
  323. }
  324. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  325. "Unable to acquire replica: "
  326. "the replica is currently being updated by another "
  327. "supplier.");
  328. return_value = ACQUIRE_REPLICA_BUSY;
  329. break;
  330. case NSDS50_REPL_REPLICAID_ERROR:
  331. /* remote replica detected a duplicate ReplicaID */
  332. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  333. "acquire_replica - "
  334. "%s: Unable to acquire replica: the replica "
  335. "has the same Replica ID as this one. "
  336. "Replication is aborting.\n",
  337. agmt_get_long_name(prp->agmt));
  338. agmt_set_last_update_status(prp->agmt, 0, NSDS50_REPL_REPLICAID_ERROR,
  339. "Unable to acquire replica: the replica has the same "
  340. "Replica ID as this one. Replication is aborting.");
  341. return_value = ACQUIRE_FATAL_ERROR;
  342. break;
  343. case NSDS50_REPL_BACKOFF:
  344. /* A replication session hook on the replica
  345. * wants us to go into backoff mode. */
  346. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  347. "acquire_replica - "
  348. "%s: Unable to acquire replica: "
  349. "the replica instructed us to go into "
  350. "backoff mode. Will retry later.\n",
  351. agmt_get_long_name(prp->agmt));
  352. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  353. "Unable to acquire replica: the replica instructed "
  354. "us to go into backoff mode. Will retry later.");
  355. return_value = ACQUIRE_TRANSIENT_ERROR;
  356. break;
  357. case NSDS50_REPL_REPLICA_READY:
  358. /* Call any registered replication session post
  359. * acquire callback if we are dealing with a 9.0
  360. * style replica. We want to bail on sending
  361. * updates if the return value is non-0. */
  362. if (prp->repl90consumer) {
  363. int is_total = 0;
  364. /* Check if this is a total or incremental update. */
  365. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0) {
  366. is_total = 1;
  367. }
  368. if (repl_session_plugin_call_post_acquire_cb(prp->agmt, is_total, data_guid, data)) {
  369. slapi_ch_free_string(&data_guid);
  370. ber_bvfree(data);
  371. data = NULL;
  372. return_value = ACQUIRE_TRANSIENT_ERROR;
  373. break;
  374. }
  375. slapi_ch_free_string(&data_guid);
  376. ber_bvfree(data);
  377. data = NULL;
  378. }
  379. /* We've acquired the replica. */
  380. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  381. "acquire_replica - "
  382. "%s: Replica was successfully acquired.\n",
  383. agmt_get_long_name(prp->agmt));
  384. /* Parse the update vector */
  385. if (NULL != ruv_bervals && NULL != ruv) {
  386. if (ruv_init_from_bervals(ruv_bervals, ruv) != RUV_SUCCESS) {
  387. /* Couldn't parse the update vector */
  388. *ruv = NULL;
  389. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  390. "acquire_replica - "
  391. "%s: acquired replica, but could not parse update vector. "
  392. "The replica must be reinitialized.\n",
  393. agmt_get_long_name(prp->agmt));
  394. }
  395. }
  396. /* Save consumer's RUV in the replication agreement.
  397. It is used by the changelog trimming code */
  398. if (ruv && *ruv)
  399. agmt_set_consumer_ruv(prp->agmt, *ruv);
  400. return_value = ACQUIRE_SUCCESS;
  401. break;
  402. default:
  403. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  404. "Unable to acquire replica");
  405. return_value = ACQUIRE_FATAL_ERROR;
  406. }
  407. } else {
  408. /* Couldn't parse the response */
  409. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  410. "acquire_replica - "
  411. "%s: Unable to parse the response to the "
  412. "startReplication extended operation. "
  413. "Replication is aborting.\n",
  414. agmt_get_long_name(prp->agmt));
  415. agmt_set_last_update_status(prp->agmt, 0, NSDS50_REPL_DECODING_ERROR,
  416. "Unable to parse the response to the "
  417. "startReplication extended operation. "
  418. "Replication is aborting.");
  419. prp->last_acquire_response_code = NSDS50_REPL_INTERNAL_ERROR;
  420. return_value = ACQUIRE_FATAL_ERROR;
  421. }
  422. } else {
  423. int operation, error;
  424. conn_get_error(conn, &operation, &error);
  425. /* Couldn't send the extended operation */
  426. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  427. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name,
  428. "acquire_replica - "
  429. "%s: Unable to receive the response for a startReplication "
  430. "extended operation to consumer (%s). Will retry later.\n",
  431. agmt_get_long_name(prp->agmt),
  432. error ? ldap_err2string(error) : "unknown error");
  433. agmt_set_last_update_status(prp->agmt, error, NSDS50_REPL_CONN_ERROR,
  434. "Unable to receive the response for a startReplication "
  435. "extended operation to consumer. Will retry later.");
  436. }
  437. } else {
  438. /* Couldn't get a current CSN */
  439. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  440. "acquire_replica - "
  441. "%s: Unable to obtain current CSN. "
  442. "Replication is aborting.\n",
  443. agmt_get_long_name(prp->agmt));
  444. agmt_set_last_update_status(prp->agmt, 0, NSDS50_REPL_INTERNAL_ERROR,
  445. "Unable to obtain current CSN. Replication is aborting.");
  446. return_value = ACQUIRE_FATAL_ERROR;
  447. }
  448. }
  449. error:
  450. csn_free(&current_csn);
  451. if (NULL != ruv_bervals)
  452. ber_bvecfree(ruv_bervals);
  453. if (NULL != replarea_sdn)
  454. slapi_sdn_free(&replarea_sdn);
  455. if (NULL != retoid)
  456. ldap_memfree(retoid);
  457. if (NULL != retdata)
  458. ber_bvfree(retdata);
  459. if (ACQUIRE_SUCCESS != return_value) {
  460. /* could not acquire the replica, so reinstate the linger timer, since this
  461. means we won't call release_replica, which also reinstates the timer */
  462. conn_start_linger(conn);
  463. } else {
  464. /* replica successfully acquired */
  465. prp->replica_acquired = PR_TRUE;
  466. }
  467. return return_value;
  468. }
  469. /*
  470. * Release a replica by sending an "end replication" extended request.
  471. */
  472. void
  473. release_replica(Private_Repl_Protocol *prp)
  474. {
  475. int rc;
  476. struct berval *retdata = NULL;
  477. char *retoid = NULL;
  478. struct berval *payload = NULL;
  479. Slapi_DN *replarea_sdn = NULL;
  480. int sent_message_id = 0;
  481. int ret_message_id = 0;
  482. ConnResult conres = 0;
  483. PR_ASSERT(NULL != prp);
  484. PR_ASSERT(NULL != prp->conn);
  485. if (!prp->replica_acquired) {
  486. return;
  487. }
  488. replarea_sdn = agmt_get_replarea(prp->agmt);
  489. payload = NSDS50EndReplicationRequest_new((char *)slapi_sdn_get_dn(replarea_sdn)); /* XXXggood had to cast away const */
  490. slapi_sdn_free(&replarea_sdn);
  491. rc = conn_send_extended_operation(prp->conn,
  492. REPL_END_NSDS50_REPLICATION_REQUEST_OID, payload, NULL /* update control */, &sent_message_id /* Message ID */);
  493. ber_bvfree(payload); /* done with this - free it now */
  494. if (0 != rc) {
  495. int operation, error;
  496. conn_get_error(prp->conn, &operation, &error);
  497. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  498. "release_replica - %s: Unable to send endReplication extended operation (%s)\n",
  499. agmt_get_long_name(prp->agmt),
  500. error ? ldap_err2string(error) : "unknown error");
  501. goto error;
  502. }
  503. /* Since the operation request is async, we need to wait for the response here */
  504. conres = conn_read_result_ex(prp->conn, &retoid, &retdata, NULL, sent_message_id, &ret_message_id, 1);
  505. if (CONN_OPERATION_SUCCESS != conres) {
  506. int operation, error;
  507. conn_get_error(prp->conn, &operation, &error);
  508. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  509. "release_replica - %s: Attempting to release replica, but unable to receive endReplication extended "
  510. "operation response from the replica. Error %d (%s)\n",
  511. agmt_get_long_name(prp->agmt), error,
  512. error ? ldap_err2string(error) : "unknown error");
  513. } else {
  514. struct berval **ruv_bervals = NULL; /* Shouldn't actually be returned */
  515. int extop_result;
  516. int extop_rc = 0;
  517. char *data_guid = NULL;
  518. struct berval *data = NULL;
  519. /* Check the message id's match */
  520. if (sent_message_id != ret_message_id) {
  521. int operation, error;
  522. conn_get_error(prp->conn, &operation, &error);
  523. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  524. "release_replica - %s: Response message id does not match the request (%s)\n",
  525. agmt_get_long_name(prp->agmt),
  526. error ? ldap_err2string(error) : "unknown error");
  527. }
  528. /* We need to pass data_guid and data in even though they
  529. * are not used here. We will free them anyway in case they
  530. * are used in the future. */
  531. extop_rc = decode_repl_ext_response(retdata, &extop_result,
  532. (struct berval ***)&ruv_bervals, &data_guid, &data);
  533. slapi_ch_free_string(&data_guid);
  534. ber_bvfree(data);
  535. data = NULL;
  536. if (0 == extop_rc) {
  537. if (NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED == extop_result) {
  538. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  539. "release_replica - %s: Successfully released consumer\n", agmt_get_long_name(prp->agmt));
  540. } else {
  541. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  542. "release_replica - %s: Unable to release consumer: response code %d\n",
  543. agmt_get_long_name(prp->agmt), extop_result);
  544. /* disconnect from the consumer so that it does not stay locked */
  545. conn_disconnect(prp->conn);
  546. }
  547. } else {
  548. /* Couldn't parse the response */
  549. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  550. "release_replica - %s: Unable to parse the response "
  551. " to the endReplication extended operation.\n",
  552. agmt_get_long_name(prp->agmt));
  553. }
  554. if (NULL != ruv_bervals)
  555. ber_bvecfree(ruv_bervals);
  556. /* XXXggood free ruv_bervals if we got them for some reason */
  557. }
  558. if (NULL != retoid)
  559. ldap_memfree(retoid);
  560. if (NULL != retdata)
  561. ber_bvfree(retdata);
  562. /* replica is released, start the linger timer on the connection, which
  563. was stopped in acquire_replica */
  564. conn_start_linger(prp->conn);
  565. error:
  566. prp->replica_acquired = PR_FALSE;
  567. }
  568. /* converts consumer's response to a string */
  569. char *
  570. protocol_response2string(int response)
  571. {
  572. switch (response) {
  573. case NSDS50_REPL_REPLICA_READY:
  574. return "replica acquired";
  575. case NSDS50_REPL_REPLICA_BUSY:
  576. return "replica busy";
  577. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW:
  578. return "excessive clock skew";
  579. case NSDS50_REPL_PERMISSION_DENIED:
  580. return "permission denied";
  581. case NSDS50_REPL_DECODING_ERROR:
  582. return "decoding error";
  583. case NSDS50_REPL_UNKNOWN_UPDATE_PROTOCOL:
  584. return "unknown update protocol";
  585. case NSDS50_REPL_NO_SUCH_REPLICA:
  586. return "no such replica";
  587. case NSDS50_REPL_BELOW_PURGEPOINT:
  588. return "csn below purge point";
  589. case NSDS50_REPL_INTERNAL_ERROR:
  590. return "internal error";
  591. case NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED:
  592. return "replica released";
  593. case NSDS50_REPL_REPLICAID_ERROR:
  594. return "duplicate replica ID detected";
  595. case NSDS50_REPL_UPTODATE:
  596. return "no change to send";
  597. case NSDS50_REPL_CL_ERROR:
  598. return "changelog error";
  599. case NSDS50_REPL_CONN_ERROR:
  600. return "connection error";
  601. case NSDS50_REPL_CONN_TIMEOUT:
  602. return "connection timeout";
  603. case NSDS50_REPL_TRANSIENT_ERROR:
  604. return "transient warning";
  605. case NSDS50_REPL_RUV_ERROR:
  606. return "RUV error";
  607. case NSDS50_REPL_REPLICA_NO_RESPONSE:
  608. return "no response received";
  609. default:
  610. return "unknown error";
  611. }
  612. }
  613. int
  614. repl5_strip_fractional_mods(Repl_Agmt *agmt, LDAPMod **mods)
  615. {
  616. char **a;
  617. char **attrs_to_strip;
  618. int retval = 0;
  619. int strip = 1;
  620. int i, j, k;
  621. if (mods == NULL) {
  622. return retval;
  623. }
  624. a = agmt_get_fractional_attrs(agmt);
  625. if (a) {
  626. /* Iterate through the fractional attr list */
  627. for (i = 0; a[i] != NULL; i++) {
  628. for (j = 0; NULL != mods[j];) {
  629. /*
  630. * Iterate through the attrs in this mod list.
  631. * If any match the fractional attr then remove the mod.
  632. */
  633. if (0 == slapi_attr_type_cmp(mods[j]->mod_type, a[i], SLAPI_TYPE_CMP_SUBTYPE)) {
  634. /* Adjust value of j, implicit in not incrementing it */
  635. /* Free this mod */
  636. ber_bvecfree(mods[j]->mod_bvalues);
  637. slapi_ch_free((void **)&(mods[j]->mod_type));
  638. slapi_ch_free((void **)&mods[j]);
  639. /* Move down all subsequent mods */
  640. for (k = j; mods[k + 1]; k++) {
  641. mods[k] = mods[k + 1];
  642. }
  643. /* Zero the end of the array */
  644. mods[k] = NULL;
  645. } else {
  646. j++;
  647. }
  648. }
  649. }
  650. /*
  651. * Check if "all" the remaining mods are on attributes we want to strip from the update.
  652. * If all the mods are on attrs_to_strip, then free them.
  653. */
  654. if ((attrs_to_strip = agmt_get_attrs_to_strip(agmt)) != NULL) {
  655. for (j = 0; mods[j] != NULL; j++) {
  656. if (slapi_ch_array_utf8_inlist(attrs_to_strip, mods[j]->mod_type) == 0) {
  657. /* at least one of the mods is "real", so don't strip anything */
  658. strip = 0;
  659. break;
  660. }
  661. }
  662. if (strip) {
  663. /* free the remaining mods */
  664. for (j = 0; mods[j] != NULL; j++) {
  665. ber_bvecfree(mods[j]->mod_bvalues);
  666. slapi_ch_free((void **)&(mods[j]->mod_type));
  667. slapi_ch_free((void **)&mods[j]);
  668. }
  669. }
  670. }
  671. slapi_ch_array_free(a);
  672. }
  673. return retval;
  674. }