repl5_tot_protocol.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright 2001 Sun Microsystems, Inc.
  3. * Portions copyright 1999, 2001-2003 Netscape Communications Corporation.
  4. * All rights reserved.
  5. * END COPYRIGHT BLOCK **/
  6. /* repl5_tot_protocol.c */
  7. /*
  8. The tot_protocol object implements the DS 5.0 multi-master total update
  9. replication protocol, used to (re)populate a replica.
  10. */
  11. #include "repl.h"
  12. #include "repl5.h"
  13. #include "repl5_prot_private.h"
  14. /* Private data structures */
  15. typedef struct repl5_tot_private
  16. {
  17. Repl_Protocol *rp;
  18. Repl_Agmt *ra;
  19. PRLock *lock;
  20. PRUint32 eventbits;
  21. } repl5_tot_private;
  22. typedef struct callback_data
  23. {
  24. Private_Repl_Protocol *prp;
  25. int rc;
  26. unsigned long num_entries;
  27. time_t sleep_on_busy;
  28. time_t last_busy;
  29. } callback_data;
  30. /*
  31. * Number of window seconds to wait until we programmatically decide
  32. * that the replica has got out of BUSY state
  33. */
  34. #define SLEEP_ON_BUSY_WINDOW (10)
  35. /* Helper functions */
  36. static void get_result (int rc, void *cb_data);
  37. static int send_entry (Slapi_Entry *e, void *callback_data);
  38. static void repl5_tot_delete(Private_Repl_Protocol **prp);
  39. /*
  40. * Completely refresh a replica. The basic protocol interaction goes
  41. * like this:
  42. * - Acquire Replica by sending a StartReplicationRequest extop, with the
  43. * total update protocol OID and supplier's ruv.
  44. * - Send a series of extended operations containing entries.
  45. * - send an EndReplicationRequest extended operation
  46. */
  47. static void
  48. repl5_tot_run(Private_Repl_Protocol *prp)
  49. {
  50. int rc;
  51. callback_data cb_data;
  52. Slapi_PBlock *pb;
  53. LDAPControl **ctrls;
  54. PRBool replica_acquired = PR_FALSE;
  55. char *hostname = NULL;
  56. int portnum = 0;
  57. Slapi_DN *area_sdn = NULL;
  58. CSN *remote_schema_csn = NULL;
  59. PR_ASSERT(NULL != prp);
  60. prp->stopped = 0;
  61. if (prp->terminate)
  62. {
  63. prp->stopped = 1;
  64. goto done;
  65. }
  66. conn_set_timeout(prp->conn, agmt_get_timeout(prp->agmt));
  67. /* acquire remote replica */
  68. agmt_set_last_init_start(prp->agmt, current_time());
  69. rc = acquire_replica (prp, REPL_NSDS50_TOTAL_PROTOCOL_OID, NULL /* ruv */);
  70. /* We never retry total protocol, even in case a transient error.
  71. This is because if somebody already updated the replica we don't
  72. want to do it again */
  73. if (rc != ACQUIRE_SUCCESS)
  74. {
  75. int optype, ldaprc;
  76. conn_get_error(prp->conn, &optype, &ldaprc);
  77. agmt_set_last_init_status(prp->agmt, ldaprc,
  78. prp->last_acquire_response_code, NULL);
  79. goto done;
  80. }
  81. else if (prp->terminate)
  82. {
  83. conn_disconnect(prp->conn);
  84. prp->stopped = 1;
  85. goto done;
  86. }
  87. hostname = agmt_get_hostname(prp->agmt);
  88. portnum = agmt_get_port(prp->agmt);
  89. agmt_set_last_init_status(prp->agmt, 0, 0, "Total schema update in progress");
  90. remote_schema_csn = agmt_get_consumer_schema_csn ( prp->agmt );
  91. rc = conn_push_schema(prp->conn, &remote_schema_csn);
  92. if (CONN_SCHEMA_UPDATED != rc && CONN_SCHEMA_NO_UPDATE_NEEDED != rc)
  93. {
  94. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
  95. "replicate schema to host %s, port %d. Continuing with "
  96. "total update session.\n",
  97. hostname, portnum);
  98. /* But keep going */
  99. agmt_set_last_init_status(prp->agmt, 0, rc, "Total schema update failed");
  100. }
  101. else
  102. {
  103. agmt_set_last_init_status(prp->agmt, 0, 0, "Total schema update succeeded");
  104. }
  105. /* ONREPL - big assumption here is that entries a returned in the id order
  106. and that the order implies that perent entry is always ahead of the
  107. child entry in the list. Otherwise, the consumer would not be
  108. properly updated because bulk import at the moment skips orphand entries. */
  109. /* XXXggood above assumption may not be valid if orphaned entry moved???? */
  110. agmt_set_last_init_status(prp->agmt, 0, 0, "Total update in progress");
  111. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Beginning total update of replica "
  112. "\"%s\".\n", agmt_get_long_name(prp->agmt));
  113. pb = slapi_pblock_new ();
  114. /* RMREPL - need to send schema here */
  115. area_sdn = agmt_get_replarea(prp->agmt);
  116. /* we need to provide managedsait control so that referral entries can
  117. be replicated */
  118. ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
  119. ctrls[0] = create_managedsait_control ();
  120. ctrls[1] = create_backend_control(area_sdn);
  121. slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn),
  122. LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
  123. repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
  124. cb_data.prp = prp;
  125. cb_data.rc = 0;
  126. cb_data.num_entries = 0UL;
  127. cb_data.sleep_on_busy = 0UL;
  128. cb_data.last_busy = current_time ();
  129. /* this search get all the entries from the replicated area including tombstones
  130. and referrals */
  131. slapi_search_internal_callback_pb (pb, &cb_data /* callback data */,
  132. get_result /* result callback */,
  133. send_entry /* entry callback */,
  134. NULL /* referral callback*/);
  135. slapi_pblock_destroy (pb);
  136. agmt_set_last_init_end(prp->agmt, current_time());
  137. rc = cb_data.rc;
  138. release_replica(prp);
  139. slapi_sdn_free(&area_sdn);
  140. if (rc != LDAP_SUCCESS)
  141. {
  142. slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name, "%s: repl5_tot_run: "
  143. "failed to obtain data to send to the consumer; LDAP error - %d\n",
  144. agmt_get_long_name(prp->agmt), rc);
  145. agmt_set_last_init_status(prp->agmt, rc, 0, "Total update aborted");
  146. } else {
  147. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Finished total update of replica "
  148. "\"%s\". Sent %d entries.\n", agmt_get_long_name(prp->agmt), cb_data.num_entries);
  149. agmt_set_last_init_status(prp->agmt, 0, 0, "Total update succeeded");
  150. }
  151. done:
  152. slapi_ch_free_string(&hostname);
  153. prp->stopped = 1;
  154. }
  155. static int
  156. repl5_tot_stop(Private_Repl_Protocol *prp)
  157. {
  158. int return_value;
  159. int seconds = 600;
  160. PRIntervalTime start, maxwait, now;
  161. prp->terminate = 1;
  162. maxwait = PR_SecondsToInterval(seconds);
  163. start = PR_IntervalNow();
  164. now = start;
  165. while (!prp->stopped && ((now - start) < maxwait))
  166. {
  167. DS_Sleep(PR_SecondsToInterval(1));
  168. now = PR_IntervalNow();
  169. }
  170. if (!prp->stopped)
  171. {
  172. /* Isn't listening. Disconnect from the replica. */
  173. slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name, "repl5_tot_run: "
  174. "protocol not stopped after waiting for %d seconds "
  175. "for agreement %s\n", PR_IntervalToSeconds(now-start),
  176. agmt_get_long_name(prp->agmt));
  177. conn_disconnect(prp->conn);
  178. return_value = -1;
  179. }
  180. else
  181. {
  182. return_value = 0;
  183. }
  184. return return_value;
  185. }
  186. static int
  187. repl5_tot_status(Private_Repl_Protocol *prp)
  188. {
  189. int return_value = 0;
  190. return return_value;
  191. }
  192. static void
  193. repl5_tot_noop(Private_Repl_Protocol *prp)
  194. {
  195. /* noop */
  196. }
  197. Private_Repl_Protocol *
  198. Repl_5_Tot_Protocol_new(Repl_Protocol *rp)
  199. {
  200. repl5_tot_private *rip = NULL;
  201. Private_Repl_Protocol *prp = (Private_Repl_Protocol *)slapi_ch_malloc(sizeof(Private_Repl_Protocol));
  202. prp->delete = repl5_tot_delete;
  203. prp->run = repl5_tot_run;
  204. prp->stop = repl5_tot_stop;
  205. prp->status = repl5_tot_status;
  206. prp->notify_update = repl5_tot_noop;
  207. prp->notify_agmt_changed = repl5_tot_noop;
  208. prp->notify_window_opened = repl5_tot_noop;
  209. prp->notify_window_closed = repl5_tot_noop;
  210. prp->update_now = repl5_tot_noop;
  211. if ((prp->lock = PR_NewLock()) == NULL)
  212. {
  213. goto loser;
  214. }
  215. if ((prp->cvar = PR_NewCondVar(prp->lock)) == NULL)
  216. {
  217. goto loser;
  218. }
  219. prp->stopped = 1;
  220. prp->terminate = 0;
  221. prp->eventbits = 0;
  222. prp->conn = prot_get_connection(rp);
  223. prp->agmt = prot_get_agreement(rp);
  224. rip = (void *)slapi_ch_malloc(sizeof(repl5_tot_private));
  225. rip->rp = rp;
  226. prp->private = (void *)rip;
  227. prp->replica_acquired = PR_FALSE;
  228. return prp;
  229. loser:
  230. repl5_tot_delete(&prp);
  231. return NULL;
  232. }
  233. static void
  234. repl5_tot_delete(Private_Repl_Protocol **prp)
  235. {
  236. }
  237. static
  238. void get_result (int rc, void *cb_data)
  239. {
  240. PR_ASSERT (cb_data);
  241. ((callback_data*)cb_data)->rc = rc;
  242. }
  243. static
  244. int send_entry (Slapi_Entry *e, void *cb_data)
  245. {
  246. int rc;
  247. Private_Repl_Protocol *prp;
  248. BerElement *bere;
  249. struct berval *bv;
  250. unsigned long *num_entriesp;
  251. time_t *sleep_on_busyp;
  252. time_t *last_busyp;
  253. PR_ASSERT (cb_data);
  254. prp = ((callback_data*)cb_data)->prp;
  255. num_entriesp = &((callback_data *)cb_data)->num_entries;
  256. sleep_on_busyp = &((callback_data *)cb_data)->sleep_on_busy;
  257. last_busyp = &((callback_data *)cb_data)->last_busy;
  258. PR_ASSERT (prp);
  259. if (prp->terminate)
  260. {
  261. conn_disconnect(prp->conn);
  262. prp->stopped = 1;
  263. ((callback_data*)cb_data)->rc = -1;
  264. return -1;
  265. }
  266. /* skip ruv tombstone - need to do this because it might be
  267. more up to date then the data we are sending to the client.
  268. RUV is sent separately via the protocol */
  269. if (is_ruv_tombstone_entry (e))
  270. return 0;
  271. /* ONREPL we would purge copiedFrom and copyingFrom here but I decided against it.
  272. Instead, it will get removed when this replica stops being 4.0 consumer and
  273. then propagated to all its consumer */
  274. /* convert the entry to the on the wire format */
  275. bere = entry2bere(e);
  276. if (bere == NULL)
  277. {
  278. slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s: send_entry: Encoding Error\n",
  279. agmt_get_long_name(prp->agmt));
  280. ((callback_data*)cb_data)->rc = -1;
  281. return -1;
  282. }
  283. rc = ber_flatten(bere, &bv);
  284. ber_free (bere, 1);
  285. if (rc != 0)
  286. {
  287. ((callback_data*)cb_data)->rc = -1;
  288. return -1;
  289. }
  290. do {
  291. /* push the entry to the consumer */
  292. rc = conn_send_extended_operation(prp->conn, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID,
  293. bv /* payload */, NULL /* retoidp */,
  294. NULL /* retdatap */, NULL /* update_control */,
  295. NULL /* returned_controls */);
  296. if (rc == CONN_BUSY) {
  297. time_t now = current_time ();
  298. if ((now - *last_busyp) < (*sleep_on_busyp + 10)) {
  299. *sleep_on_busyp +=5;
  300. }
  301. else {
  302. *sleep_on_busyp = 5;
  303. }
  304. *last_busyp = now;
  305. slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
  306. "Replica \"%s\" is busy. Waiting %ds while"
  307. " it finishes processing its current import queue\n",
  308. agmt_get_long_name(prp->agmt), *sleep_on_busyp);
  309. DS_Sleep(PR_SecondsToInterval(*sleep_on_busyp));
  310. }
  311. }
  312. while (rc == CONN_BUSY);
  313. ber_bvfree(bv);
  314. (*num_entriesp)++;
  315. if (CONN_OPERATION_SUCCESS == rc) {
  316. return 0;
  317. } else {
  318. ((callback_data*)cb_data)->rc = rc;
  319. return -1;
  320. }
  321. }