repl5_protocol_util.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. /* repl5_protocol_util.c */
  42. /*
  43. Code common to both incremental and total protocols.
  44. */
  45. #include "repl5.h"
  46. #include "repl5_prot_private.h"
  47. /*
  48. * Obtain a current CSN (e.g. one that would have been
  49. * generated for an operation occurring at this time)
  50. * for a given replica.
  51. */
  52. CSN *
  53. get_current_csn(Slapi_DN *replarea_sdn)
  54. {
  55. Object *replica_obj;
  56. Replica *replica;
  57. Object *gen_obj;
  58. CSNGen *gen;
  59. CSN *current_csn = NULL;
  60. if (NULL != replarea_sdn)
  61. {
  62. replica_obj = replica_get_replica_from_dn(replarea_sdn);
  63. if (NULL != replica_obj)
  64. {
  65. replica = object_get_data(replica_obj);
  66. if (NULL != replica)
  67. {
  68. gen_obj = replica_get_csngen(replica);
  69. if (NULL != gen_obj)
  70. {
  71. gen = (CSNGen *)object_get_data(gen_obj);
  72. if (NULL != gen)
  73. {
  74. if (csngen_new_csn(gen, &current_csn,
  75. PR_FALSE /* notify */) != CSN_SUCCESS)
  76. {
  77. csn_free(&current_csn);
  78. }
  79. object_release(gen_obj);
  80. }
  81. }
  82. }
  83. }
  84. }
  85. return current_csn;
  86. }
  87. /*
  88. * Acquire exclusive access to a replica. Send a start replication extended
  89. * operation to the replica. The response will contain a success code, and
  90. * optionally the replica's update vector if acquisition is successful.
  91. * This function returns one of the following:
  92. * ACQUIRE_SUCCESS - the replica was acquired, and we have exclusive update access
  93. * ACQUIRE_REPLICA_BUSY - another master was updating the replica
  94. * ACQUIRE_FATAL_ERROR - something bad happened, and it's not likely to improve
  95. * if we wait.
  96. * ACQUIRE_TRANSIENT_ERROR - something bad happened, but it's probably worth
  97. * another try after waiting a while.
  98. * If ACQUIRE_SUCCESS is returned, then ruv will point to the replica's update
  99. * vector. It's possible that the replica does something goofy and doesn't
  100. * return us an update vector, so be prepared for ruv to be NULL (but this is
  101. * an error).
  102. */
  103. int
  104. acquire_replica(Private_Repl_Protocol *prp, char *prot_oid, RUV **ruv)
  105. {
  106. int return_value;
  107. ConnResult crc;
  108. Repl_Connection *conn;
  109. struct berval *retdata = NULL;
  110. char *retoid = NULL;
  111. Slapi_DN *replarea_sdn = NULL;
  112. struct berval **ruv_bervals = NULL;
  113. CSN *current_csn = NULL;
  114. PR_ASSERT(prp && prot_oid);
  115. if (prp->replica_acquired) /* we already acquire replica */
  116. {
  117. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  118. "%s: Remote replica already acquired\n",
  119. agmt_get_long_name(prp->agmt));
  120. return_value = ACQUIRE_FATAL_ERROR;
  121. return ACQUIRE_SUCCESS;
  122. }
  123. if (NULL != ruv)
  124. {
  125. ruv_destroy ( ruv );
  126. }
  127. if (strcmp(prot_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0)
  128. {
  129. Replica *replica;
  130. Object *supl_ruv_obj, *cons_ruv_obj;
  131. PRBool is_newer = PR_FALSE;
  132. object_acquire(prp->replica_object);
  133. replica = object_get_data(prp->replica_object);
  134. supl_ruv_obj = replica_get_ruv ( replica );
  135. cons_ruv_obj = agmt_get_consumer_ruv ( prp->agmt );
  136. is_newer = ruv_is_newer ( supl_ruv_obj, cons_ruv_obj );
  137. if ( supl_ruv_obj ) object_release ( supl_ruv_obj );
  138. if ( cons_ruv_obj ) object_release ( cons_ruv_obj );
  139. object_release (prp->replica_object);
  140. replica = NULL;
  141. if (is_newer == PR_FALSE) {
  142. prp->last_acquire_response_code = NSDS50_REPL_UPTODATE;
  143. return ACQUIRE_CONSUMER_WAS_UPTODATE;
  144. }
  145. }
  146. prp->last_acquire_response_code = NSDS50_REPL_REPLICA_NO_RESPONSE;
  147. /* Get the connection */
  148. conn = prp->conn;
  149. crc = conn_connect(conn);
  150. if (CONN_OPERATION_FAILED == crc)
  151. {
  152. return_value = ACQUIRE_TRANSIENT_ERROR;
  153. }
  154. else if (CONN_SSL_NOT_ENABLED == crc)
  155. {
  156. return_value = ACQUIRE_FATAL_ERROR;
  157. }
  158. else
  159. {
  160. /* we don't want the timer to go off in the middle of an operation */
  161. conn_cancel_linger(conn);
  162. /* Does the remote replica support the 5.0 protocol? */
  163. crc = conn_replica_supports_ds5_repl(conn);
  164. if (CONN_DOES_NOT_SUPPORT_DS5_REPL == crc)
  165. {
  166. return_value = ACQUIRE_FATAL_ERROR;
  167. }
  168. else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc)
  169. {
  170. /* We don't know anything about the remote replica. Try again later. */
  171. return_value = ACQUIRE_TRANSIENT_ERROR;
  172. goto error;
  173. }
  174. /* Find out what level of replication the replica supports. */
  175. crc = conn_replica_supports_ds90_repl(conn);
  176. if (CONN_DOES_NOT_SUPPORT_DS90_REPL == crc)
  177. {
  178. /* Does the remote replica support the 7.1 protocol? */
  179. crc = conn_replica_supports_ds71_repl(conn);
  180. if (CONN_DOES_NOT_SUPPORT_DS71_REPL == crc)
  181. {
  182. /* This is a pre-7.1 replica. */
  183. prp->repl50consumer = 1;
  184. }
  185. else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc)
  186. {
  187. /* We don't know anything about the remote replica. Try again later. */
  188. return_value = ACQUIRE_TRANSIENT_ERROR;
  189. goto error;
  190. }
  191. else
  192. {
  193. /* This replica is later than 7.1, but pre-9.0. */
  194. prp->repl71consumer = 1;
  195. }
  196. }
  197. else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc)
  198. {
  199. /* We don't know anything about the remote replica. Try again later. */
  200. return_value = ACQUIRE_TRANSIENT_ERROR;
  201. goto error;
  202. }
  203. else
  204. {
  205. /* This replica is a 9.0 or later replica. */
  206. prp->repl90consumer = 1;
  207. }
  208. /* Good to go. Start the protocol. */
  209. /* Obtain a current CSN */
  210. replarea_sdn = agmt_get_replarea(prp->agmt);
  211. current_csn = get_current_csn(replarea_sdn);
  212. if (NULL != current_csn)
  213. {
  214. struct berval *payload = NULL;
  215. int send_msgid = 0;
  216. if (prp->repl90consumer)
  217. {
  218. int is_total = 0;
  219. char *data_guid = NULL;
  220. struct berval *data = NULL;
  221. /* Check if this is a total or incremental update. */
  222. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0)
  223. {
  224. is_total = 1;
  225. }
  226. /* Call pre-start replication session callback. This callback
  227. * may have extra data to be sent to the replica. */
  228. if (repl_session_plugin_call_pre_acquire_cb(prp->agmt, is_total,
  229. &data_guid, &data) == 0) {
  230. payload = NSDS90StartReplicationRequest_new(
  231. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  232. NULL, current_csn, data_guid, data);
  233. slapi_ch_free_string(&data_guid);
  234. ber_bvfree(data);
  235. data = NULL;
  236. } else {
  237. return_value = ACQUIRE_TRANSIENT_ERROR;
  238. slapi_ch_free_string(&data_guid);
  239. ber_bvfree(data);
  240. data = NULL;
  241. goto error;
  242. }
  243. }
  244. else
  245. {
  246. payload = NSDS50StartReplicationRequest_new(
  247. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  248. NULL /* XXXggood need to provide referral(s) */, current_csn);
  249. }
  250. /* JCMREPL - Need to extract the referrals from the RUV */
  251. crc = conn_send_extended_operation(conn,
  252. prp->repl90consumer ? REPL_START_NSDS90_REPLICATION_REQUEST_OID :
  253. REPL_START_NSDS50_REPLICATION_REQUEST_OID, payload,
  254. NULL /* update control */, &send_msgid /* Message ID */);
  255. if (CONN_OPERATION_SUCCESS != crc)
  256. {
  257. int operation, error;
  258. conn_get_error(conn, &operation, &error);
  259. /* Couldn't send the extended operation */
  260. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  261. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  262. "%s: Unable to send a startReplication "
  263. "extended operation to consumer (%s). Will retry later.\n",
  264. agmt_get_long_name(prp->agmt),
  265. error ? ldap_err2string(error) : "unknown error");
  266. }
  267. /* Since the operation request is async, we need to wait for the response here */
  268. crc = conn_read_result_ex(conn,&retoid,&retdata,NULL,send_msgid,NULL,1);
  269. ber_bvfree(payload);
  270. payload = NULL;
  271. /* Look at the response we got. */
  272. if (CONN_OPERATION_SUCCESS == crc)
  273. {
  274. /*
  275. * Extop was processed. Look at extop response to see if we're
  276. * permitted to go ahead.
  277. */
  278. int extop_result;
  279. char *data_guid = NULL;
  280. struct berval *data = NULL;
  281. int extop_rc = decode_repl_ext_response(retdata, &extop_result,
  282. &ruv_bervals, &data_guid,
  283. &data);
  284. if (0 == extop_rc)
  285. {
  286. prp->last_acquire_response_code = extop_result;
  287. switch (extop_result)
  288. {
  289. /* XXXggood handle other error codes here */
  290. case NSDS50_REPL_INTERNAL_ERROR:
  291. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  292. "%s: Unable to acquire replica: "
  293. "an internal error occurred on the remote replica. "
  294. "Replication is aborting.\n",
  295. agmt_get_long_name(prp->agmt));
  296. return_value = ACQUIRE_FATAL_ERROR;
  297. break;
  298. case NSDS50_REPL_PERMISSION_DENIED:
  299. /* Not allowed to send updates */
  300. {
  301. char *repl_binddn = agmt_get_binddn(prp->agmt);
  302. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  303. "%s: Unable to acquire replica: permission denied. "
  304. "The bind dn \"%s\" does not have permission to "
  305. "supply replication updates to the replica. "
  306. "Will retry later.\n",
  307. agmt_get_long_name(prp->agmt), repl_binddn);
  308. slapi_ch_free((void **)&repl_binddn);
  309. return_value = ACQUIRE_TRANSIENT_ERROR;
  310. break;
  311. }
  312. case NSDS50_REPL_NO_SUCH_REPLICA:
  313. /* There is no such replica on the consumer */
  314. {
  315. Slapi_DN *repl_root = agmt_get_replarea(prp->agmt);
  316. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  317. "%s: Unable to acquire replica: there is no "
  318. "replicated area \"%s\" on the consumer server. "
  319. "Replication is aborting.\n",
  320. agmt_get_long_name(prp->agmt),
  321. slapi_sdn_get_dn(repl_root));
  322. slapi_sdn_free(&repl_root);
  323. return_value = ACQUIRE_FATAL_ERROR;
  324. break;
  325. }
  326. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW:
  327. /* Large clock skew between the consumer and the supplier */
  328. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  329. "%s: Unable to acquire replica: "
  330. "Excessive clock skew between the supplier and "
  331. "the consumer. Replication is aborting.\n",
  332. agmt_get_long_name(prp->agmt));
  333. return_value = ACQUIRE_FATAL_ERROR;
  334. break;
  335. case NSDS50_REPL_DECODING_ERROR:
  336. /* We sent something the replica couldn't understand. */
  337. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  338. "%s: Unable to acquire replica: "
  339. "the consumer was unable to decode the "
  340. "startReplicationRequest extended operation sent by the "
  341. "supplier. Replication is aborting.\n",
  342. agmt_get_long_name(prp->agmt));
  343. return_value = ACQUIRE_FATAL_ERROR;
  344. break;
  345. case NSDS50_REPL_REPLICA_BUSY:
  346. /* Someone else is updating the replica. Try later. */
  347. /* if acquire_replica is called for replica
  348. initialization, log REPLICA_BUSY, too */
  349. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID,
  350. prot_oid) == 0)
  351. {
  352. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  353. "%s: Unable to acquire replica: "
  354. "the replica is currently being updated"
  355. "by another supplier.\n",
  356. agmt_get_long_name(prp->agmt));
  357. }
  358. else /* REPL_NSDS50_INCREMENTAL_PROTOCOL_OID */
  359. {
  360. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
  361. "%s: Unable to acquire replica: "
  362. "the replica is currently being updated"
  363. "by another supplier. Will try later\n",
  364. agmt_get_long_name(prp->agmt));
  365. }
  366. return_value = ACQUIRE_REPLICA_BUSY;
  367. break;
  368. case NSDS50_REPL_LEGACY_CONSUMER:
  369. /* remote replica is a legacy consumer */
  370. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  371. "%s: Unable to acquire replica: the replica "
  372. "is supplied by a legacy supplier. "
  373. "Replication is aborting.\n", agmt_get_long_name(prp->agmt));
  374. return_value = ACQUIRE_FATAL_ERROR;
  375. break;
  376. case NSDS50_REPL_REPLICAID_ERROR:
  377. /* remote replica detected a duplicate ReplicaID */
  378. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  379. "%s: Unable to aquire replica: the replica "
  380. "has the same Replica ID as this one. "
  381. "Replication is aborting.\n",
  382. agmt_get_long_name(prp->agmt));
  383. return_value = ACQUIRE_FATAL_ERROR;
  384. break;
  385. case NSDS50_REPL_BACKOFF:
  386. /* A replication sesssion hook on the replica
  387. * wants us to go into backoff mode. */
  388. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  389. "%s: Unable to acquire replica: "
  390. "the replica instructed us to go into "
  391. "backoff mode. Will retry later.\n",
  392. agmt_get_long_name(prp->agmt));
  393. return_value = ACQUIRE_TRANSIENT_ERROR;
  394. break;
  395. case NSDS50_REPL_REPLICA_READY:
  396. /* Call any registered replication session post
  397. * acquire callback if we are dealing with a 9.0
  398. * style replica. We want to bail on sending
  399. * updates if the return value is non-0. */
  400. if (prp->repl90consumer)
  401. {
  402. int is_total = 0;
  403. /* Check if this is a total or incremental update. */
  404. if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0)
  405. {
  406. is_total = 1;
  407. }
  408. if (repl_session_plugin_call_post_acquire_cb(prp->agmt, is_total, data_guid, data))
  409. {
  410. slapi_ch_free_string(&data_guid);
  411. ber_bvfree(data);
  412. data = NULL;
  413. return_value = ACQUIRE_TRANSIENT_ERROR;
  414. break;
  415. }
  416. slapi_ch_free_string(&data_guid);
  417. ber_bvfree(data);
  418. data = NULL;
  419. }
  420. /* We've acquired the replica. */
  421. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
  422. "%s: Replica was successfully acquired.\n",
  423. agmt_get_long_name(prp->agmt));
  424. /* Parse the update vector */
  425. if (NULL != ruv_bervals && NULL != ruv)
  426. {
  427. if (ruv_init_from_bervals(ruv_bervals, ruv) != RUV_SUCCESS)
  428. {
  429. /* Couldn't parse the update vector */
  430. *ruv = NULL;
  431. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  432. "%s: Warning: acquired replica, "
  433. "but could not parse update vector. "
  434. "The replica must be reinitialized.\n",
  435. agmt_get_long_name(prp->agmt));
  436. }
  437. }
  438. /* Save consumer's RUV in the replication agreement.
  439. It is used by the changelog trimming code */
  440. if (ruv && *ruv)
  441. agmt_set_consumer_ruv (prp->agmt, *ruv);
  442. return_value = ACQUIRE_SUCCESS;
  443. break;
  444. default:
  445. return_value = ACQUIRE_FATAL_ERROR;
  446. }
  447. }
  448. else
  449. {
  450. /* Couldn't parse the response */
  451. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  452. "%s: Unable to parse the response to the "
  453. "startReplication extended operation. "
  454. "Replication is aborting.\n",
  455. agmt_get_long_name(prp->agmt));
  456. prp->last_acquire_response_code = NSDS50_REPL_INTERNAL_ERROR;
  457. return_value = ACQUIRE_FATAL_ERROR;
  458. }
  459. }
  460. else
  461. {
  462. int operation, error;
  463. conn_get_error(conn, &operation, &error);
  464. /* Couldn't send the extended operation */
  465. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  466. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  467. "%s: Unable to receive the response for a startReplication "
  468. "extended operation to consumer (%s). Will retry later.\n",
  469. agmt_get_long_name(prp->agmt),
  470. error ? ldap_err2string(error) : "unknown error");
  471. }
  472. }
  473. else
  474. {
  475. /* Couldn't get a current CSN */
  476. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  477. "%s: Unable to obtain current CSN. "
  478. "Replication is aborting.\n",
  479. agmt_get_long_name(prp->agmt));
  480. return_value = ACQUIRE_FATAL_ERROR;
  481. }
  482. }
  483. error:
  484. csn_free(&current_csn);
  485. if (NULL != ruv_bervals)
  486. ber_bvecfree(ruv_bervals);
  487. if (NULL != replarea_sdn)
  488. slapi_sdn_free(&replarea_sdn);
  489. if (NULL != retoid)
  490. ldap_memfree(retoid);
  491. if (NULL != retdata)
  492. ber_bvfree(retdata);
  493. if (ACQUIRE_SUCCESS != return_value)
  494. {
  495. /* could not acquire the replica, so reinstate the linger timer, since this
  496. means we won't call release_replica, which also reinstates the timer */
  497. conn_start_linger(conn);
  498. }
  499. else
  500. {
  501. /* replica successfully acquired */
  502. prp->replica_acquired = PR_TRUE;
  503. }
  504. return return_value;
  505. }
  506. /*
  507. * Release a replica by sending an "end replication" extended request.
  508. */
  509. void
  510. release_replica(Private_Repl_Protocol *prp)
  511. {
  512. int rc;
  513. struct berval *retdata = NULL;
  514. char *retoid = NULL;
  515. struct berval *payload = NULL;
  516. Slapi_DN *replarea_sdn = NULL;
  517. int sent_message_id = 0;
  518. int ret_message_id = 0;
  519. ConnResult conres = 0;
  520. PR_ASSERT(NULL != prp);
  521. PR_ASSERT(NULL != prp->conn);
  522. if (!prp->replica_acquired)
  523. return;
  524. replarea_sdn = agmt_get_replarea(prp->agmt);
  525. payload = NSDS50EndReplicationRequest_new((char *)slapi_sdn_get_dn(replarea_sdn)); /* XXXggood had to cast away const */
  526. slapi_sdn_free(&replarea_sdn);
  527. rc = conn_send_extended_operation(prp->conn,
  528. REPL_END_NSDS50_REPLICATION_REQUEST_OID, payload, NULL /* update control */, &sent_message_id /* Message ID */);
  529. ber_bvfree(payload); /* done with this - free it now */
  530. if (0 != rc)
  531. {
  532. int operation, error;
  533. conn_get_error(prp->conn, &operation, &error);
  534. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  535. "%s: Warning: unable to send endReplication extended operation (%s)\n",
  536. agmt_get_long_name(prp->agmt),
  537. error ? ldap_err2string(error) : "unknown error");
  538. goto error;
  539. }
  540. /* Since the operation request is async, we need to wait for the response here */
  541. conres = conn_read_result_ex(prp->conn,&retoid,&retdata,NULL,sent_message_id,&ret_message_id,1);
  542. if (CONN_OPERATION_SUCCESS != conres)
  543. {
  544. int operation, error;
  545. conn_get_error(prp->conn, &operation, &error);
  546. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  547. "%s: Warning: unable to receive endReplication extended operation response (%s)\n",
  548. agmt_get_long_name(prp->agmt),
  549. error ? ldap_err2string(error) : "unknown error");
  550. }
  551. else
  552. {
  553. struct berval **ruv_bervals = NULL; /* Shouldn't actually be returned */
  554. int extop_result;
  555. int extop_rc = 0;
  556. char *data_guid = NULL;
  557. struct berval *data = NULL;
  558. /* Check the message id's match */
  559. if (sent_message_id != ret_message_id)
  560. {
  561. int operation, error;
  562. conn_get_error(prp->conn, &operation, &error);
  563. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  564. "%s: Warning: response message id does not match the request (%s)\n",
  565. agmt_get_long_name(prp->agmt),
  566. error ? ldap_err2string(error) : "unknown error");
  567. }
  568. /* We need to pass data_guid and data in even though they
  569. * are not used here. We will free them anyway in case they
  570. * are used in the future. */
  571. extop_rc = decode_repl_ext_response(retdata, &extop_result,
  572. (struct berval ***)&ruv_bervals, &data_guid, &data);
  573. slapi_ch_free_string(&data_guid);
  574. ber_bvfree(data);
  575. data = NULL;
  576. if (0 == extop_rc)
  577. {
  578. if (NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED == extop_result)
  579. {
  580. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
  581. "%s: Successfully released consumer\n", agmt_get_long_name(prp->agmt));
  582. }
  583. else
  584. {
  585. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  586. "%s: Unable to release consumer: response code %d\n",
  587. agmt_get_long_name(prp->agmt), extop_result);
  588. /* disconnect from the consumer so that it does not stay locked */
  589. conn_disconnect (prp->conn);
  590. }
  591. }
  592. else
  593. {
  594. /* Couldn't parse the response */
  595. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  596. "%s: Warning: Unable to parse the response "
  597. " to the endReplication extended operation.\n",
  598. agmt_get_long_name(prp->agmt));
  599. }
  600. if (NULL != ruv_bervals)
  601. ber_bvecfree(ruv_bervals);
  602. /* XXXggood free ruv_bervals if we got them for some reason */
  603. }
  604. if (NULL != retoid)
  605. ldap_memfree(retoid);
  606. if (NULL != retdata)
  607. ber_bvfree(retdata);
  608. /* replica is released, start the linger timer on the connection, which
  609. was stopped in acquire_replica */
  610. conn_start_linger(prp->conn);
  611. error:
  612. prp->replica_acquired = PR_FALSE;
  613. }
  614. /* converts consumer's response to a string */
  615. char *
  616. protocol_response2string (int response)
  617. {
  618. switch (response)
  619. {
  620. case NSDS50_REPL_REPLICA_READY: return "replica acquired";
  621. case NSDS50_REPL_REPLICA_BUSY: return "replica busy";
  622. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW: return "excessive clock skew";
  623. case NSDS50_REPL_PERMISSION_DENIED: return "permission denied";
  624. case NSDS50_REPL_DECODING_ERROR: return "decoding error";
  625. case NSDS50_REPL_UNKNOWN_UPDATE_PROTOCOL: return "unknown update protocol";
  626. case NSDS50_REPL_NO_SUCH_REPLICA: return "no such replica";
  627. case NSDS50_REPL_BELOW_PURGEPOINT: return "csn below purge point";
  628. case NSDS50_REPL_INTERNAL_ERROR: return "internal error";
  629. case NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED: return "replica released";
  630. case NSDS50_REPL_LEGACY_CONSUMER: return "replica is a legacy consumer";
  631. case NSDS50_REPL_REPLICAID_ERROR: return "duplicate replica ID detected";
  632. case NSDS50_REPL_UPTODATE: return "no change to send";
  633. default: return "unknown error";
  634. }
  635. }
  636. int
  637. repl5_strip_fractional_mods(Repl_Agmt *agmt, LDAPMod ** mods)
  638. {
  639. char **a = agmt_get_fractional_attrs(agmt);
  640. char **attrs_to_strip;
  641. int retval = 0;
  642. int strip = 1;
  643. int i, j, k;
  644. if (a) {
  645. /* Iterate through the fractional attr list */
  646. for ( i = 0; a[i] != NULL; i++ )
  647. {
  648. for ( j = 0; NULL != mods[ j ]; )
  649. {
  650. /*
  651. * Iterate through the attrs in this mod list.
  652. * If any match the fractional attr then remove the mod.
  653. */
  654. if (0 == slapi_attr_type_cmp(mods[j]->mod_type, a[i], SLAPI_TYPE_CMP_SUBTYPE))
  655. {
  656. /* Adjust value of j, implicit in not incrementing it */
  657. /* Free this mod */
  658. ber_bvecfree(mods[j]->mod_bvalues);
  659. slapi_ch_free((void **)&(mods[j]->mod_type));
  660. slapi_ch_free((void **)&mods[j]);
  661. /* Move down all subsequent mods */
  662. for (k = j; mods[k+1] ; k++)
  663. {
  664. mods[k] = mods[k+1];
  665. }
  666. /* Zero the end of the array */
  667. mods[k] = NULL;
  668. } else {
  669. j++;
  670. }
  671. }
  672. }
  673. /*
  674. * Check if "all" the remaining mods are on attributes we want to strip from the update.
  675. * If all the mods are on attrs_to_strip, then free them.
  676. */
  677. if((attrs_to_strip = agmt_get_attrs_to_strip(agmt)) != NULL){
  678. for(j = 0; mods[j] != NULL; j++)
  679. {
  680. if(slapi_ch_array_utf8_inlist(attrs_to_strip, mods[j]->mod_type) == 0){
  681. /* at least one of the mods is "real", so don't strip anything */
  682. strip = 0;
  683. break;
  684. }
  685. }
  686. if(strip){
  687. /* free the remaining mods */
  688. for(j = 0; mods[j] != NULL; j++)
  689. {
  690. ber_bvecfree(mods[j]->mod_bvalues);
  691. slapi_ch_free((void **)&(mods[j]->mod_type));
  692. slapi_ch_free((void **)&mods[j]);
  693. }
  694. }
  695. }
  696. slapi_ch_array_free(a);
  697. }
  698. return retval;
  699. }