repl5_protocol_util.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright 2001 Sun Microsystems, Inc.
  3. * Portions copyright 1999, 2001-2003 Netscape Communications Corporation.
  4. * All rights reserved.
  5. * END COPYRIGHT BLOCK **/
  6. /* repl5_protocol_util.c */
  7. /*
  8. Code common to both incremental and total protocols.
  9. */
  10. #include "repl5.h"
  11. #include "repl5_prot_private.h"
  12. /*
  13. * Obtain a current CSN (e.g. one that would have been
  14. * generated for an operation occurring at this time)
  15. * for a given replica.
  16. */
  17. CSN *
  18. get_current_csn(Slapi_DN *replarea_sdn)
  19. {
  20. Object *replica_obj;
  21. Replica *replica;
  22. Object *gen_obj;
  23. CSNGen *gen;
  24. CSN *current_csn = NULL;
  25. if (NULL != replarea_sdn)
  26. {
  27. replica_obj = replica_get_replica_from_dn(replarea_sdn);
  28. if (NULL != replica_obj)
  29. {
  30. replica = object_get_data(replica_obj);
  31. if (NULL != replica)
  32. {
  33. gen_obj = replica_get_csngen(replica);
  34. if (NULL != gen_obj)
  35. {
  36. gen = (CSNGen *)object_get_data(gen_obj);
  37. if (NULL != gen)
  38. {
  39. if (csngen_new_csn(gen, &current_csn,
  40. PR_FALSE /* notify */) != CSN_SUCCESS)
  41. {
  42. current_csn = NULL;
  43. }
  44. object_release(gen_obj);
  45. }
  46. }
  47. }
  48. }
  49. }
  50. return current_csn;
  51. }
  52. /*
  53. * Acquire exclusive access to a replica. Send a start replication extended
  54. * operation to the replica. The response will contain a success code, and
  55. * optionally the replica's update vector if acquisition is successful.
  56. * This function returns one of the following:
  57. * ACQUIRE_SUCCESS - the replica was acquired, and we have exclusive update access
  58. * ACQUIRE_REPLICA_BUSY - another master was updating the replica
  59. * ACQUIRE_FATAL_ERROR - something bad happened, and it's not likely to improve
  60. * if we wait.
  61. * ACQUIRE_TRANSIENT_ERROR - something bad happened, but it's probably worth
  62. * another try after waiting a while.
  63. * If ACQUIRE_SUCCESS is returned, then ruv will point to the replica's update
  64. * vector. It's possible that the replica does something goofy and doesn't
  65. * return us an update vector, so be prepared for ruv to be NULL (but this is
  66. * an error).
  67. */
  68. int
  69. acquire_replica(Private_Repl_Protocol *prp, char *prot_oid, RUV **ruv)
  70. {
  71. int return_value;
  72. ConnResult crc;
  73. Repl_Connection *conn;
  74. PR_ASSERT(prp && prot_oid);
  75. if (prp->replica_acquired) /* we already acquire replica */
  76. {
  77. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  78. "%s: Remote replica already acquired\n",
  79. agmt_get_long_name(prp->agmt));
  80. return_value = ACQUIRE_FATAL_ERROR;
  81. return ACQUIRE_SUCCESS;
  82. }
  83. if (NULL != ruv)
  84. {
  85. ruv_destroy ( ruv );
  86. }
  87. if (strcmp(prot_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0)
  88. {
  89. Replica *replica;
  90. Object *supl_ruv_obj, *cons_ruv_obj;
  91. PRBool is_newer = PR_FALSE;
  92. object_acquire(prp->replica_object);
  93. replica = object_get_data(prp->replica_object);
  94. supl_ruv_obj = replica_get_ruv ( replica );
  95. cons_ruv_obj = agmt_get_consumer_ruv ( prp->agmt );
  96. is_newer = ruv_is_newer ( supl_ruv_obj, cons_ruv_obj );
  97. if ( supl_ruv_obj ) object_release ( supl_ruv_obj );
  98. if ( cons_ruv_obj ) object_release ( cons_ruv_obj );
  99. object_release (prp->replica_object);
  100. replica = NULL;
  101. if (is_newer == PR_FALSE) {
  102. prp->last_acquire_response_code = NSDS50_REPL_UPTODATE;
  103. return ACQUIRE_CONSUMER_WAS_UPTODATE;
  104. }
  105. }
  106. prp->last_acquire_response_code = NSDS50_REPL_REPLICA_NO_RESPONSE;
  107. /* Get the connection */
  108. conn = prp->conn;
  109. crc = conn_connect(conn);
  110. if (CONN_OPERATION_FAILED == crc)
  111. {
  112. return_value = ACQUIRE_TRANSIENT_ERROR;
  113. }
  114. else if (CONN_SSL_NOT_ENABLED == crc)
  115. {
  116. return_value = ACQUIRE_FATAL_ERROR;
  117. }
  118. else
  119. {
  120. /* we don't want the timer to go off in the middle of an operation */
  121. conn_cancel_linger(conn);
  122. /* Does the remote replica support the 5.0 protocol? */
  123. crc = conn_replica_supports_ds5_repl(conn);
  124. if (CONN_DOES_NOT_SUPPORT_DS5_REPL == crc)
  125. {
  126. return_value = ACQUIRE_FATAL_ERROR;
  127. }
  128. else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc)
  129. {
  130. /* We don't know anything about the remote replica. Try again later. */
  131. return_value = ACQUIRE_TRANSIENT_ERROR;
  132. }
  133. else
  134. {
  135. /* Good to go. Start the protocol. */
  136. CSN *current_csn = NULL;
  137. struct berval *retdata = NULL;
  138. char *retoid = NULL;
  139. Slapi_DN *replarea_sdn;
  140. /* Obtain a current CSN */
  141. replarea_sdn = agmt_get_replarea(prp->agmt);
  142. current_csn = get_current_csn(replarea_sdn);
  143. if (NULL != current_csn)
  144. {
  145. struct berval *payload = NSDS50StartReplicationRequest_new(
  146. prot_oid, slapi_sdn_get_ndn(replarea_sdn),
  147. NULL /* XXXggood need to provide referral(s) */, current_csn);
  148. /* JCMREPL - Need to extract the referrals from the RUV */
  149. csn_free(&current_csn);
  150. current_csn = NULL;
  151. crc = conn_send_extended_operation(conn,
  152. REPL_START_NSDS50_REPLICATION_REQUEST_OID, payload, &retoid,
  153. &retdata, NULL /* update control */, NULL /* returned controls */);
  154. ber_bvfree(payload);
  155. payload = NULL;
  156. /* Look at the response we got. */
  157. if (CONN_OPERATION_SUCCESS == crc)
  158. {
  159. /*
  160. * Extop was processed. Look at extop response to see if we're
  161. * permitted to go ahead.
  162. */
  163. struct berval **ruv_bervals = NULL;
  164. int extop_result;
  165. int extop_rc = decode_repl_ext_response(retdata, &extop_result,
  166. &ruv_bervals);
  167. if (0 == extop_rc)
  168. {
  169. prp->last_acquire_response_code = extop_result;
  170. switch (extop_result)
  171. {
  172. /* XXXggood handle other error codes here */
  173. case NSDS50_REPL_INTERNAL_ERROR:
  174. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  175. "%s: Unable to acquire replica: "
  176. "an internal error occurred on the remote replica. "
  177. "Replication is aborting.\n",
  178. agmt_get_long_name(prp->agmt));
  179. return_value = ACQUIRE_FATAL_ERROR;
  180. break;
  181. case NSDS50_REPL_PERMISSION_DENIED:
  182. /* Not allowed to send updates */
  183. {
  184. char *repl_binddn = agmt_get_binddn(prp->agmt);
  185. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  186. "%s: Unable to acquire replica: permission denied. "
  187. "The bind dn \"%s\" does not have permission to "
  188. "supply replication updates to the replica. "
  189. "Will retry later.\n",
  190. agmt_get_long_name(prp->agmt), repl_binddn);
  191. slapi_ch_free((void **)&repl_binddn);
  192. return_value = ACQUIRE_TRANSIENT_ERROR;
  193. break;
  194. }
  195. case NSDS50_REPL_NO_SUCH_REPLICA:
  196. /* There is no such replica on the consumer */
  197. {
  198. Slapi_DN *repl_root = agmt_get_replarea(prp->agmt);
  199. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  200. "%s: Unable to acquire replica: there is no "
  201. "replicated area \"%s\" on the consumer server. "
  202. "Replication is aborting.\n",
  203. agmt_get_long_name(prp->agmt),
  204. slapi_sdn_get_dn(repl_root));
  205. slapi_sdn_free(&repl_root);
  206. return_value = ACQUIRE_FATAL_ERROR;
  207. break;
  208. }
  209. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW:
  210. /* Large clock skew between the consumer and the supplier */
  211. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  212. "%s: Unable to acquire replica: "
  213. "Excessive clock skew between the supplier and "
  214. "the consumer. Replication is aborting.\n",
  215. agmt_get_long_name(prp->agmt));
  216. return_value = ACQUIRE_FATAL_ERROR;
  217. break;
  218. case NSDS50_REPL_DECODING_ERROR:
  219. /* We sent something the replica couldn't understand. */
  220. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  221. "%s: Unable to acquire replica: "
  222. "the consumer was unable to decode the "
  223. "startReplicationRequest extended operation sent by the "
  224. "supplier. Replication is aborting.\n",
  225. agmt_get_long_name(prp->agmt));
  226. return_value = ACQUIRE_FATAL_ERROR;
  227. break;
  228. case NSDS50_REPL_REPLICA_BUSY:
  229. /* Someone else is updating the replica. Try later. */
  230. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
  231. "%s: Unable to acquire replica: "
  232. "the replica is currently being updated"
  233. "by another supplier. Will try later\n",
  234. agmt_get_long_name(prp->agmt));
  235. return_value = ACQUIRE_REPLICA_BUSY;
  236. break;
  237. case NSDS50_REPL_LEGACY_CONSUMER:
  238. /* remote replica is a legacy consumer */
  239. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  240. "%s: Unable to acquire replica: the replica "
  241. "is supplied by a legacy supplier. "
  242. "Replication is aborting.\n", agmt_get_long_name(prp->agmt));
  243. return_value = ACQUIRE_FATAL_ERROR;
  244. break;
  245. case NSDS50_REPL_REPLICAID_ERROR:
  246. /* remote replica detected a duplicate ReplicaID */
  247. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  248. "%s: Unable to aquire replica: the replica "
  249. "has the same Replica ID as this one. "
  250. "Replication is aborting.\n",
  251. agmt_get_long_name(prp->agmt));
  252. return_value = ACQUIRE_FATAL_ERROR;
  253. break;
  254. case NSDS50_REPL_REPLICA_READY:
  255. /* We've acquired the replica. */
  256. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
  257. "%s: Replica was successfully acquired.\n",
  258. agmt_get_long_name(prp->agmt));
  259. /* Parse the update vector */
  260. if (NULL != ruv_bervals && NULL != ruv)
  261. {
  262. if (ruv_init_from_bervals(ruv_bervals, ruv) != RUV_SUCCESS)
  263. {
  264. /* Couldn't parse the update vector */
  265. *ruv = NULL;
  266. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  267. "%s: Warning: acquired replica, "
  268. "but could not parse update vector. "
  269. "The replica must be reinitialized.\n",
  270. agmt_get_long_name(prp->agmt));
  271. }
  272. }
  273. /* Save consumer's RUV in the replication agreement.
  274. It is used by the changelog trimming code */
  275. if (ruv && *ruv)
  276. agmt_set_consumer_ruv (prp->agmt, *ruv);
  277. return_value = ACQUIRE_SUCCESS;
  278. break;
  279. default:
  280. return_value = ACQUIRE_FATAL_ERROR;
  281. }
  282. }
  283. else
  284. {
  285. /* Couldn't parse the response */
  286. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  287. "%s: Unable to parse the response to the "
  288. "startReplication extended operation. "
  289. "Replication is aborting.\n",
  290. agmt_get_long_name(prp->agmt));
  291. prp->last_acquire_response_code = NSDS50_REPL_INTERNAL_ERROR;
  292. return_value = ACQUIRE_FATAL_ERROR;
  293. }
  294. if (NULL != ruv_bervals)
  295. ber_bvecfree(ruv_bervals);
  296. }
  297. else
  298. {
  299. int operation, error;
  300. conn_get_error(conn, &operation, &error);
  301. /* Couldn't send the extended operation */
  302. return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */
  303. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  304. "%s: Unable to send a startReplication "
  305. "extended operation to consumer (%s). Will retry later.\n",
  306. agmt_get_long_name(prp->agmt),
  307. error ? ldap_err2string(error) : "unknown error");
  308. }
  309. }
  310. else
  311. {
  312. /* Couldn't get a current CSN */
  313. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  314. "%s: Unable to obtain current CSN. "
  315. "Replication is aborting.\n",
  316. agmt_get_long_name(prp->agmt));
  317. return_value = ACQUIRE_FATAL_ERROR;
  318. }
  319. slapi_sdn_free(&replarea_sdn);
  320. if (NULL != retoid)
  321. ldap_memfree(retoid);
  322. if (NULL != retdata)
  323. ber_bvfree(retdata);
  324. }
  325. }
  326. if (ACQUIRE_SUCCESS != return_value)
  327. {
  328. /* could not acquire the replica, so reinstate the linger timer, since this
  329. means we won't call release_replica, which also reinstates the timer */
  330. conn_start_linger(conn);
  331. }
  332. else
  333. {
  334. /* replica successfully acquired */
  335. prp->replica_acquired = PR_TRUE;
  336. }
  337. return return_value;
  338. }
  339. /*
  340. * Release a replica by sending an "end replication" extended request.
  341. */
  342. void
  343. release_replica(Private_Repl_Protocol *prp)
  344. {
  345. int rc;
  346. struct berval *retdata = NULL;
  347. char *retoid = NULL;
  348. struct berval *payload = NULL;
  349. Slapi_DN *replarea_sdn = NULL;
  350. PR_ASSERT(NULL != prp);
  351. PR_ASSERT(NULL != prp->conn);
  352. if (!prp->replica_acquired)
  353. return;
  354. replarea_sdn = agmt_get_replarea(prp->agmt);
  355. payload = NSDS50EndReplicationRequest_new((char *)slapi_sdn_get_dn(replarea_sdn)); /* XXXggood had to cast away const */
  356. slapi_sdn_free(&replarea_sdn);
  357. rc = conn_send_extended_operation(prp->conn,
  358. REPL_END_NSDS50_REPLICATION_REQUEST_OID, payload, &retoid,
  359. &retdata, NULL /* update control */, NULL /* returned controls */);
  360. if (0 != rc)
  361. {
  362. int operation, error;
  363. conn_get_error(prp->conn, &operation, &error);
  364. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  365. "%s: Warning: unable to send endReplication extended operation (%s)\n",
  366. agmt_get_long_name(prp->agmt),
  367. error ? ldap_err2string(error) : "unknown error");
  368. }
  369. else
  370. {
  371. struct berval **ruv_bervals = NULL; /* Shouldn't actually be returned */
  372. int extop_result;
  373. int extop_rc = decode_repl_ext_response(retdata, &extop_result,
  374. (struct berval ***)&ruv_bervals);
  375. if (0 == extop_rc)
  376. {
  377. if (NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED == extop_result)
  378. {
  379. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
  380. "%s: Successfully released consumer\n", agmt_get_long_name(prp->agmt));
  381. }
  382. else
  383. {
  384. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  385. "%s: Unable to release consumer: response code %d\n",
  386. agmt_get_long_name(prp->agmt), extop_result);
  387. /* disconnect from the consumer so that it does not stay locked */
  388. conn_disconnect (prp->conn);
  389. }
  390. }
  391. else
  392. {
  393. /* Couldn't parse the response */
  394. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  395. "%s: Warning: Unable to parse the response "
  396. " to the endReplication extended operation.\n",
  397. agmt_get_long_name(prp->agmt));
  398. }
  399. if (NULL != ruv_bervals)
  400. ber_bvecfree(ruv_bervals);
  401. /* XXXggood free ruv_bervals if we got them for some reason */
  402. }
  403. if (NULL != payload)
  404. ber_bvfree(payload);
  405. if (NULL != retoid)
  406. ldap_memfree(retoid);
  407. if (NULL != retdata)
  408. ber_bvfree(retdata);
  409. /* replica is released, start the linger timer on the connection, which
  410. was stopped in acquire_replica */
  411. conn_start_linger(prp->conn);
  412. prp->replica_acquired = PR_FALSE;
  413. }
  414. /* converts consumer's response to a string */
  415. char *
  416. protocol_response2string (int response)
  417. {
  418. switch (response)
  419. {
  420. case NSDS50_REPL_REPLICA_READY: return "replica acquired";
  421. case NSDS50_REPL_REPLICA_BUSY: return "replica busy";
  422. case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW: return "excessive clock skew";
  423. case NSDS50_REPL_PERMISSION_DENIED: return "permission denied";
  424. case NSDS50_REPL_DECODING_ERROR: return "decoding error";
  425. case NSDS50_REPL_UNKNOWN_UPDATE_PROTOCOL: return "unknown update protocol";
  426. case NSDS50_REPL_NO_SUCH_REPLICA: return "no such replica";
  427. case NSDS50_REPL_BELOW_PURGEPOINT: return "csn below purge point";
  428. case NSDS50_REPL_INTERNAL_ERROR: return "internal error";
  429. case NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED: return "replica released";
  430. case NSDS50_REPL_LEGACY_CONSUMER: return "replica is a legacy consumer";
  431. case NSDS50_REPL_REPLICAID_ERROR: return "duplicate replica ID detected";
  432. case NSDS50_REPL_UPTODATE: return "no change to send";
  433. default: return "unknown error";
  434. }
  435. }