repl5_protocol_util.c 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. #include <config.h>
  11. #endif
  12. /* repl5_protocol_util.c */
  13. /*
  14. Code common to both incremental and total protocols.
  15. */
  16. #include "repl5.h"
  17. #include "repl5_prot_private.h"
  18. /*
  19. * Obtain a current CSN (e.g. one that would have been
  20. * generated for an operation occurring at this time)
  21. * for a given replica.
  22. */
  23. CSN *
  24. get_current_csn(Slapi_DN *replarea_sdn)
  25. {
  26. Object *replica_obj;
  27. Replica *replica;
  28. Object *gen_obj;
  29. CSNGen *gen;
  30. CSN *current_csn = NULL;
  31. if (NULL != replarea_sdn) {
  32. replica_obj = replica_get_replica_from_dn(replarea_sdn);
  33. if (NULL != replica_obj) {
  34. replica = object_get_data(replica_obj);
  35. if (NULL != replica) {
  36. gen_obj = replica_get_csngen(replica);
  37. if (NULL != gen_obj) {
  38. gen = (CSNGen *)object_get_data(gen_obj);
  39. if (NULL != gen) {
  40. if (csngen_new_csn(gen, &current_csn,
  41. PR_FALSE /* notify */) != CSN_SUCCESS) {
  42. csn_free(&current_csn);
  43. }
  44. object_release(gen_obj);
  45. }
  46. }
  47. }
  48. }
  49. }
  50. return current_csn;
  51. }
  52. /*
  53. * Acquire exclusive access to a replica. Send a start replication extended
  54. * operation to the replica. The response will contain a success code, and
  55. * optionally the replica's update vector if acquisition is successful.
  56. * This function returns one of the following:
  57. * ACQUIRE_SUCCESS - the replica was acquired, and we have exclusive update access
  58. * ACQUIRE_REPLICA_BUSY - another master was updating the replica
  59. * ACQUIRE_FATAL_ERROR - something bad happened, and it's not likely to improve
  60. * if we wait.
  61. * ACQUIRE_TRANSIENT_ERROR - something bad happened, but it's probably worth
  62. * another try after waiting a while.
  63. * If ACQUIRE_SUCCESS is returned, then ruv will point to the replica's update
  64. * vector. It's possible that the replica does something goofy and doesn't
  65. * return us an update vector, so be prepared for ruv to be NULL (but this is
  66. * an error).
  67. */
  68. int
  69. acquire_replica(Private_Repl_Protocol *prp, char *prot_oid, RUV **ruv)
  70. {
  71. int return_value;
  72. ConnResult crc;
  73. Repl_Connection *conn;
  74. struct berval *retdata = NULL;
  75. char *retoid = NULL;
  76. Slapi_DN *replarea_sdn = NULL;
  77. struct berval **ruv_bervals = NULL;
  78. CSN *current_csn = NULL;
  79. PR_ASSERT(prp && prot_oid);
  80. if (prp->replica_acquired) /* we already acquire replica */
  81. {
  82. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name,
  83. "acquire_replica - %s: Remote replica already acquired\n",
  84. agmt_get_long_name(prp->agmt));
  85. return_value = ACQUIRE_FATAL_ERROR;
  86. return ACQUIRE_SUCCESS;
  87. }
  88. if (NULL != ruv) {
  89. ruv_destroy(ruv);
  90. }
  91. if (strcmp(prot_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0) {
  92. Replica *replica;
  93. Object *supl_ruv_obj, *cons_ruv_obj;
  94. PRBool is_newer = PR_FALSE;
  95. object_acquire(prp->replica_object);
  96. replica = object_get_data(prp->replica_object);
  97. supl_ruv_obj = replica_get_ruv(replica);
  98. cons_ruv_obj = agmt_get_consumer_ruv(prp->agmt);
  99. is_newer = ruv_is_newer(supl_ruv_obj, cons_ruv_obj);
  100. if (supl_ruv_obj)
  101. object_release(supl_ruv_obj);
  102. if (cons_ruv_obj)
  103. object_release(cons_ruv_obj);
  104. object_release(prp->replica_object);
  105. replica = NULL;
  106. if (is_newer == PR_FALSE) {
  107. prp->last_acquire_response_code = NSDS50_REPL_UPTODATE;
  108. return ACQUIRE_CONSUMER_WAS_UPTODATE;
  109. }
  110. }
  111. prp->last_acquire_response_code = NSDS50_REPL_REPLICA_NO_RESPONSE;
  112. /* Get the connection */
  113. conn = prp->conn;
  114. crc = conn_connect(conn);
  115. if (CONN_OPERATION_FAILED == crc) {
  116. int operation, error;
  117. conn_get_error(conn, &operation, &error);
  118. agmt_set_last_update_status(prp->agmt, error, NSDS50_REPL_CONN_ERROR,
  119. "Problem connecting to replica");
  120. return_value = ACQUIRE_TRANSIENT_ERROR;
  121. } else if (CONN_SSL_NOT_ENABLED == crc) {
  122. int operation, error;
  123. conn_get_error(conn, &operation, &error);
  124. agmt_set_last_update_status(prp->agmt, error, NSDS50_REPL_CONN_ERROR,
  125. "Problem connecting to replica (SSL not enabled)");
  126. return_value = ACQUIRE_FATAL_ERROR;
  127. } else {
  128. /* we don't want the timer to go off in the middle of an operation */
  129. conn_cancel_linger(conn);
  130. /* Does the remote replica support the 5.0 protocol? */
  131. crc = conn_replica_supports_ds5_repl(conn);
  132. if (CONN_DOES_NOT_SUPPORT_DS5_REPL == crc) {
  133. return_value = ACQUIRE_FATAL_ERROR;
  134. } else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) {
  135. /* We don't know anything about the remote replica. Try again later. */
  136. return_value = ACQUIRE_TRANSIENT_ERROR;
  137. goto error;
  138. }
  139. /* Find out what level of replication the replica supports. */
  140. crc = conn_replica_supports_ds90_repl(conn);
  141. if (CONN_DOES_NOT_SUPPORT_DS90_REPL == crc) {
  142. /* Does the remote replica support the 7.1 protocol? */
  143. crc = conn_replica_supports_ds71_repl(conn);
  144. if (CONN_DOES_NOT_SUPPORT_DS71_REPL == crc) {
  145. /* This is a pre-7.1 replica. */
  146. prp->repl50consumer = 1;
  147. } else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) {
  148. /* We don't know anything about the remote replica. Try again later. */
  149. return_value = ACQUIRE_TRANSIENT_ERROR;
  150. goto error;
  151. } else {
  152. /* This replica is later than 7.1, but pre-9.0. */
  153. prp->repl71consumer = 1;
  154. }
  155. } else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) {
  156. /* We don't know anything about the remote replica. Try again later. */
  157. return_value = ACQUIRE_TRANSIENT_ERROR;
  158. goto error;
  159. } else {
  160. /* This replica is a 9.0 or later replica. */
  161. prp->repl90consumer = 1;
  162. }
  163. /* Good to go. Start the protocol. */
  164. /* Obtain a current CSN */
  165. replarea_sdn = agmt_get_replarea(prp->agmt);
  166. current_csn = get_current_csn(replarea_sdn);
  167. if (NULL != current_csn) {
  168. struct berval *payload = NULL;
  169. int send_msgid = 0;
  170. if (prp->repl90consumer) {
  171. int is_total = 0;
  172. char *data_guid = NULL;
  173. struct berval *data = NULL;
  174. /* Check if this is a total or incremental update. */
  175. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0) {
  176. is_total = 1;
  177. }
  178. /* Call pre-start replication session callback. This callback
  179. * may have extra data to be sent to the replica. */
  180. if (repl_session_plugin_call_pre_acquire_cb(prp->agmt, is_total,
  181. &data_guid, &data) == 0) {
  182. payload = NSDS90StartReplicationRequest_new(
  183. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  184. NULL, current_csn, data_guid, data);
  185. slapi_ch_free_string(&data_guid);
  186. ber_bvfree(data);
  187. data = NULL;
  188. } else {
  189. return_value = ACQUIRE_TRANSIENT_ERROR;
  190. slapi_ch_free_string(&data_guid);
  191. ber_bvfree(data);
  192. data = NULL;
  193. goto error;
  194. }
  195. } else {
  196. payload = NSDS50StartReplicationRequest_new(
  197. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  198. NULL /* XXXggood need to provide referral(s) */, current_csn);
  199. }
  200. /* JCMREPL - Need to extract the referrals from the RUV */
  201. crc = conn_send_extended_operation(conn,
  202. prp->repl90consumer ? REPL_START_NSDS90_REPLICATION_REQUEST_OID : REPL_START_NSDS50_REPLICATION_REQUEST_OID, payload,
  203. NULL /* update control */, &send_msgid /* Message ID */);
  204. if (CONN_OPERATION_SUCCESS != crc) {
  205. int operation, error;
  206. conn_get_error(conn, &operation, &error);
  207. /* Couldn't send the extended operation */
  208. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  209. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "acquire_replica - "
  210. "%s: Unable to send a startReplication "
  211. "extended operation to consumer (%s). Will retry later.\n",
  212. agmt_get_long_name(prp->agmt),
  213. error ? ldap_err2string(error) : "unknown error");
  214. }
  215. /* Since the operation request is async, we need to wait for the response here */
  216. crc = conn_read_result_ex(conn, &retoid, &retdata, NULL, send_msgid, NULL, 1);
  217. ber_bvfree(payload);
  218. payload = NULL;
  219. /* Look at the response we got. */
  220. if (CONN_OPERATION_SUCCESS == crc) {
  221. /*
  222. * Extop was processed. Look at extop response to see if we're
  223. * permitted to go ahead.
  224. */
  225. int extop_result;
  226. char *data_guid = NULL;
  227. struct berval *data = NULL;
  228. int extop_rc = decode_repl_ext_response(retdata, &extop_result,
  229. &ruv_bervals, &data_guid,
  230. &data);
  231. if (0 == extop_rc) {
  232. prp->last_acquire_response_code = extop_result;
  233. switch (extop_result) {
  234. /* XXXggood handle other error codes here */
  235. case NSDS50_REPL_INTERNAL_ERROR:
  236. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  237. "acquire_replica - "
  238. "%s: Unable to acquire replica: "
  239. "an internal error occurred on the remote replica. "
  240. "Replication is aborting.\n",
  241. agmt_get_long_name(prp->agmt));
  242. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  243. "Failed to acquire replica: "
  244. "Internal error occurred on the remote replica");
  245. return_value = ACQUIRE_FATAL_ERROR;
  246. break;
  247. case NSDS50_REPL_PERMISSION_DENIED:
  248. /* Not allowed to send updates */
  249. {
  250. char *repl_binddn = agmt_get_binddn(prp->agmt);
  251. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  252. "acquire_replica - "
  253. "%s: Unable to acquire replica: permission denied. "
  254. "The bind dn \"%s\" does not have permission to "
  255. "supply replication updates to the replica. "
  256. "Will retry later.\n",
  257. agmt_get_long_name(prp->agmt), repl_binddn);
  258. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  259. "Unable to acquire replica: permission denied. "
  260. "The bind dn does not have permission to "
  261. "supply replication updates to the replica. "
  262. "Will retry later.");
  263. slapi_ch_free((void **)&repl_binddn);
  264. return_value = ACQUIRE_TRANSIENT_ERROR;
  265. break;
  266. }
  267. case NSDS50_REPL_NO_SUCH_REPLICA:
  268. /* There is no such replica on the consumer */
  269. {
  270. Slapi_DN *repl_root = agmt_get_replarea(prp->agmt);
  271. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  272. "acquire_replica - "
  273. "%s: Unable to acquire replica: there is no "
  274. "replicated area \"%s\" on the consumer server. "
  275. "Replication is aborting.\n",
  276. agmt_get_long_name(prp->agmt),
  277. slapi_sdn_get_dn(repl_root));
  278. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  279. "Unable to acquire replica: there is no "
  280. "replicated area on the consumer server. "
  281. "Replication is aborting.");
  282. slapi_sdn_free(&repl_root);
  283. return_value = ACQUIRE_FATAL_ERROR;
  284. break;
  285. }
  286. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW:
  287. /* Large clock skew between the consumer and the supplier */
  288. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  289. "acquire_replica - "
  290. "%s: Unable to acquire replica: "
  291. "Excessive clock skew between the supplier and "
  292. "the consumer. Replication is aborting.\n",
  293. agmt_get_long_name(prp->agmt));
  294. return_value = ACQUIRE_FATAL_ERROR;
  295. break;
  296. case NSDS50_REPL_DECODING_ERROR:
  297. /* We sent something the replica couldn't understand. */
  298. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  299. "acquire_replica - "
  300. "%s: Unable to acquire replica: "
  301. "the consumer was unable to decode the "
  302. "startReplicationRequest extended operation sent by the "
  303. "supplier. Replication is aborting.\n",
  304. agmt_get_long_name(prp->agmt));
  305. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  306. "Unable to acquire replica: "
  307. "the consumer was unable to decode the "
  308. "startReplicationRequest extended operation sent "
  309. "by the supplier. Replication is aborting.");
  310. return_value = ACQUIRE_FATAL_ERROR;
  311. break;
  312. case NSDS50_REPL_REPLICA_BUSY:
  313. /* Someone else is updating the replica. Try later. */
  314. /* if acquire_replica is called for replica
  315. initialization, log REPLICA_BUSY, too */
  316. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID,
  317. prot_oid) == 0) {
  318. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name,
  319. "acquire_replica - "
  320. "%s: Unable to acquire replica: "
  321. "the replica is currently being updated"
  322. "by another supplier.\n",
  323. agmt_get_long_name(prp->agmt));
  324. } else /* REPL_NSDS50_INCREMENTAL_PROTOCOL_OID */
  325. {
  326. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  327. "acquire_replica - "
  328. "%s: Unable to acquire replica: "
  329. "the replica is currently being updated"
  330. "by another supplier. Will try later\n",
  331. agmt_get_long_name(prp->agmt));
  332. }
  333. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  334. "Unable to acquire replica: "
  335. "the replica is currently being updated by another "
  336. "supplier.");
  337. return_value = ACQUIRE_REPLICA_BUSY;
  338. break;
  339. case NSDS50_REPL_REPLICAID_ERROR:
  340. /* remote replica detected a duplicate ReplicaID */
  341. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  342. "acquire_replica - "
  343. "%s: Unable to aquire replica: the replica "
  344. "has the same Replica ID as this one. "
  345. "Replication is aborting.\n",
  346. agmt_get_long_name(prp->agmt));
  347. agmt_set_last_update_status(prp->agmt, 0, 0,
  348. "Unable to aquire replica: the replica has the same "
  349. "Replica ID as this one. Replication is aborting.");
  350. return_value = ACQUIRE_FATAL_ERROR;
  351. break;
  352. case NSDS50_REPL_BACKOFF:
  353. /* A replication sesssion hook on the replica
  354. * wants us to go into backoff mode. */
  355. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  356. "acquire_replica - "
  357. "%s: Unable to acquire replica: "
  358. "the replica instructed us to go into "
  359. "backoff mode. Will retry later.\n",
  360. agmt_get_long_name(prp->agmt));
  361. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  362. "Unable to acquire replica: the replica instructed "
  363. "us to go into backoff mode. Will retry later.");
  364. return_value = ACQUIRE_TRANSIENT_ERROR;
  365. break;
  366. case NSDS50_REPL_REPLICA_READY:
  367. /* Call any registered replication session post
  368. * acquire callback if we are dealing with a 9.0
  369. * style replica. We want to bail on sending
  370. * updates if the return value is non-0. */
  371. if (prp->repl90consumer) {
  372. int is_total = 0;
  373. /* Check if this is a total or incremental update. */
  374. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0) {
  375. is_total = 1;
  376. }
  377. if (repl_session_plugin_call_post_acquire_cb(prp->agmt, is_total, data_guid, data)) {
  378. slapi_ch_free_string(&data_guid);
  379. ber_bvfree(data);
  380. data = NULL;
  381. return_value = ACQUIRE_TRANSIENT_ERROR;
  382. break;
  383. }
  384. slapi_ch_free_string(&data_guid);
  385. ber_bvfree(data);
  386. data = NULL;
  387. }
  388. /* We've acquired the replica. */
  389. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  390. "acquire_replica - "
  391. "%s: Replica was successfully acquired.\n",
  392. agmt_get_long_name(prp->agmt));
  393. /* Parse the update vector */
  394. if (NULL != ruv_bervals && NULL != ruv) {
  395. if (ruv_init_from_bervals(ruv_bervals, ruv) != RUV_SUCCESS) {
  396. /* Couldn't parse the update vector */
  397. *ruv = NULL;
  398. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  399. "acquire_replica - "
  400. "%s: acquired replica, but could not parse update vector. "
  401. "The replica must be reinitialized.\n",
  402. agmt_get_long_name(prp->agmt));
  403. }
  404. }
  405. /* Save consumer's RUV in the replication agreement.
  406. It is used by the changelog trimming code */
  407. if (ruv && *ruv)
  408. agmt_set_consumer_ruv(prp->agmt, *ruv);
  409. return_value = ACQUIRE_SUCCESS;
  410. break;
  411. default:
  412. agmt_set_last_update_status(prp->agmt, 0, extop_result,
  413. "Unable to acquire replica");
  414. return_value = ACQUIRE_FATAL_ERROR;
  415. }
  416. } else {
  417. /* Couldn't parse the response */
  418. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  419. "acquire_replica - "
  420. "%s: Unable to parse the response to the "
  421. "startReplication extended operation. "
  422. "Replication is aborting.\n",
  423. agmt_get_long_name(prp->agmt));
  424. agmt_set_last_update_status(prp->agmt, 0, NSDS50_REPL_DECODING_ERROR,
  425. "Unable to parse the response to the "
  426. "startReplication extended operation. "
  427. "Replication is aborting.");
  428. prp->last_acquire_response_code = NSDS50_REPL_INTERNAL_ERROR;
  429. return_value = ACQUIRE_FATAL_ERROR;
  430. }
  431. } else {
  432. int operation, error;
  433. conn_get_error(conn, &operation, &error);
  434. /* Couldn't send the extended operation */
  435. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  436. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name,
  437. "acquire_replica - "
  438. "%s: Unable to receive the response for a startReplication "
  439. "extended operation to consumer (%s). Will retry later.\n",
  440. agmt_get_long_name(prp->agmt),
  441. error ? ldap_err2string(error) : "unknown error");
  442. agmt_set_last_update_status(prp->agmt, error, NSDS50_REPL_CONN_ERROR,
  443. "Unable to receive the response for a startReplication "
  444. "extended operation to consumer. Will retry later.");
  445. }
  446. } else {
  447. /* Couldn't get a current CSN */
  448. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  449. "acquire_replica - "
  450. "%s: Unable to obtain current CSN. "
  451. "Replication is aborting.\n",
  452. agmt_get_long_name(prp->agmt));
  453. agmt_set_last_update_status(prp->agmt, 0, 0,
  454. "Unable to obtain current CSN. "
  455. "Replication is aborting.");
  456. return_value = ACQUIRE_FATAL_ERROR;
  457. }
  458. }
  459. error:
  460. csn_free(&current_csn);
  461. if (NULL != ruv_bervals)
  462. ber_bvecfree(ruv_bervals);
  463. if (NULL != replarea_sdn)
  464. slapi_sdn_free(&replarea_sdn);
  465. if (NULL != retoid)
  466. ldap_memfree(retoid);
  467. if (NULL != retdata)
  468. ber_bvfree(retdata);
  469. if (ACQUIRE_SUCCESS != return_value) {
  470. /* could not acquire the replica, so reinstate the linger timer, since this
  471. means we won't call release_replica, which also reinstates the timer */
  472. conn_start_linger(conn);
  473. } else {
  474. /* replica successfully acquired */
  475. prp->replica_acquired = PR_TRUE;
  476. }
  477. return return_value;
  478. }
  479. /*
  480. * Release a replica by sending an "end replication" extended request.
  481. */
  482. void
  483. release_replica(Private_Repl_Protocol *prp)
  484. {
  485. int rc;
  486. struct berval *retdata = NULL;
  487. char *retoid = NULL;
  488. struct berval *payload = NULL;
  489. Slapi_DN *replarea_sdn = NULL;
  490. int sent_message_id = 0;
  491. int ret_message_id = 0;
  492. ConnResult conres = 0;
  493. PR_ASSERT(NULL != prp);
  494. PR_ASSERT(NULL != prp->conn);
  495. if (!prp->replica_acquired) {
  496. return;
  497. }
  498. replarea_sdn = agmt_get_replarea(prp->agmt);
  499. payload = NSDS50EndReplicationRequest_new((char *)slapi_sdn_get_dn(replarea_sdn)); /* XXXggood had to cast away const */
  500. slapi_sdn_free(&replarea_sdn);
  501. rc = conn_send_extended_operation(prp->conn,
  502. REPL_END_NSDS50_REPLICATION_REQUEST_OID, payload, NULL /* update control */, &sent_message_id /* Message ID */);
  503. ber_bvfree(payload); /* done with this - free it now */
  504. if (0 != rc) {
  505. int operation, error;
  506. conn_get_error(prp->conn, &operation, &error);
  507. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  508. "release_replica - %s: Unable to send endReplication extended operation (%s)\n",
  509. agmt_get_long_name(prp->agmt),
  510. error ? ldap_err2string(error) : "unknown error");
  511. goto error;
  512. }
  513. /* Since the operation request is async, we need to wait for the response here */
  514. conres = conn_read_result_ex(prp->conn, &retoid, &retdata, NULL, sent_message_id, &ret_message_id, 1);
  515. if (CONN_OPERATION_SUCCESS != conres) {
  516. int operation, error;
  517. conn_get_error(prp->conn, &operation, &error);
  518. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  519. "release_replica - %s: Attempting to release replica, but unable to receive endReplication extended "
  520. "operation response from the replica. Error %d (%s)\n",
  521. agmt_get_long_name(prp->agmt), error,
  522. error ? ldap_err2string(error) : "unknown error");
  523. } else {
  524. struct berval **ruv_bervals = NULL; /* Shouldn't actually be returned */
  525. int extop_result;
  526. int extop_rc = 0;
  527. char *data_guid = NULL;
  528. struct berval *data = NULL;
  529. /* Check the message id's match */
  530. if (sent_message_id != ret_message_id) {
  531. int operation, error;
  532. conn_get_error(prp->conn, &operation, &error);
  533. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  534. "release_replica - %s: Response message id does not match the request (%s)\n",
  535. agmt_get_long_name(prp->agmt),
  536. error ? ldap_err2string(error) : "unknown error");
  537. }
  538. /* We need to pass data_guid and data in even though they
  539. * are not used here. We will free them anyway in case they
  540. * are used in the future. */
  541. extop_rc = decode_repl_ext_response(retdata, &extop_result,
  542. (struct berval ***)&ruv_bervals, &data_guid, &data);
  543. slapi_ch_free_string(&data_guid);
  544. ber_bvfree(data);
  545. data = NULL;
  546. if (0 == extop_rc) {
  547. if (NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED == extop_result) {
  548. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  549. "release_replica - %s: Successfully released consumer\n", agmt_get_long_name(prp->agmt));
  550. } else {
  551. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  552. "release_replica - %s: Unable to release consumer: response code %d\n",
  553. agmt_get_long_name(prp->agmt), extop_result);
  554. /* disconnect from the consumer so that it does not stay locked */
  555. conn_disconnect(prp->conn);
  556. }
  557. } else {
  558. /* Couldn't parse the response */
  559. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  560. "release_replica - %s: Unable to parse the response "
  561. " to the endReplication extended operation.\n",
  562. agmt_get_long_name(prp->agmt));
  563. }
  564. if (NULL != ruv_bervals)
  565. ber_bvecfree(ruv_bervals);
  566. /* XXXggood free ruv_bervals if we got them for some reason */
  567. }
  568. if (NULL != retoid)
  569. ldap_memfree(retoid);
  570. if (NULL != retdata)
  571. ber_bvfree(retdata);
  572. /* replica is released, start the linger timer on the connection, which
  573. was stopped in acquire_replica */
  574. conn_start_linger(prp->conn);
  575. error:
  576. prp->replica_acquired = PR_FALSE;
  577. }
  578. /* converts consumer's response to a string */
  579. char *
  580. protocol_response2string(int response)
  581. {
  582. switch (response) {
  583. case NSDS50_REPL_REPLICA_READY:
  584. return "replica acquired";
  585. case NSDS50_REPL_REPLICA_BUSY:
  586. return "replica busy";
  587. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW:
  588. return "excessive clock skew";
  589. case NSDS50_REPL_PERMISSION_DENIED:
  590. return "permission denied";
  591. case NSDS50_REPL_DECODING_ERROR:
  592. return "decoding error";
  593. case NSDS50_REPL_UNKNOWN_UPDATE_PROTOCOL:
  594. return "unknown update protocol";
  595. case NSDS50_REPL_NO_SUCH_REPLICA:
  596. return "no such replica";
  597. case NSDS50_REPL_BELOW_PURGEPOINT:
  598. return "csn below purge point";
  599. case NSDS50_REPL_INTERNAL_ERROR:
  600. return "internal error";
  601. case NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED:
  602. return "replica released";
  603. case NSDS50_REPL_REPLICAID_ERROR:
  604. return "duplicate replica ID detected";
  605. case NSDS50_REPL_UPTODATE:
  606. return "no change to send";
  607. case NSDS50_REPL_CL_ERROR:
  608. return "changelog error";
  609. case NSDS50_REPL_CONN_ERROR:
  610. return "connection error";
  611. case NSDS50_REPL_CONN_TIMEOUT:
  612. return "connection timeout";
  613. case NSDS50_REPL_TRANSIENT_ERROR:
  614. return "transient error";
  615. case NSDS50_REPL_RUV_ERROR:
  616. return "RUV error";
  617. default:
  618. return "unknown error";
  619. }
  620. }
  621. int
  622. repl5_strip_fractional_mods(Repl_Agmt *agmt, LDAPMod **mods)
  623. {
  624. char **a;
  625. char **attrs_to_strip;
  626. int retval = 0;
  627. int strip = 1;
  628. int i, j, k;
  629. if (mods == NULL) {
  630. return retval;
  631. }
  632. a = agmt_get_fractional_attrs(agmt);
  633. if (a) {
  634. /* Iterate through the fractional attr list */
  635. for (i = 0; a[i] != NULL; i++) {
  636. for (j = 0; NULL != mods[j];) {
  637. /*
  638. * Iterate through the attrs in this mod list.
  639. * If any match the fractional attr then remove the mod.
  640. */
  641. if (0 == slapi_attr_type_cmp(mods[j]->mod_type, a[i], SLAPI_TYPE_CMP_SUBTYPE)) {
  642. /* Adjust value of j, implicit in not incrementing it */
  643. /* Free this mod */
  644. ber_bvecfree(mods[j]->mod_bvalues);
  645. slapi_ch_free((void **)&(mods[j]->mod_type));
  646. slapi_ch_free((void **)&mods[j]);
  647. /* Move down all subsequent mods */
  648. for (k = j; mods[k + 1]; k++) {
  649. mods[k] = mods[k + 1];
  650. }
  651. /* Zero the end of the array */
  652. mods[k] = NULL;
  653. } else {
  654. j++;
  655. }
  656. }
  657. }
  658. /*
  659. * Check if "all" the remaining mods are on attributes we want to strip from the update.
  660. * If all the mods are on attrs_to_strip, then free them.
  661. */
  662. if ((attrs_to_strip = agmt_get_attrs_to_strip(agmt)) != NULL) {
  663. for (j = 0; mods[j] != NULL; j++) {
  664. if (slapi_ch_array_utf8_inlist(attrs_to_strip, mods[j]->mod_type) == 0) {
  665. /* at least one of the mods is "real", so don't strip anything */
  666. strip = 0;
  667. break;
  668. }
  669. }
  670. if (strip) {
  671. /* free the remaining mods */
  672. for (j = 0; mods[j] != NULL; j++) {
  673. ber_bvecfree(mods[j]->mod_bvalues);
  674. slapi_ch_free((void **)&(mods[j]->mod_type));
  675. slapi_ch_free((void **)&mods[j]);
  676. }
  677. }
  678. }
  679. slapi_ch_array_free(a);
  680. }
  681. return retval;
  682. }