windows_tot_protocol.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. * END COPYRIGHT BLOCK **/
  6. /* windows_tot_protocol.c */
  7. /*
  8. The tot_protocol object implements the DS 5.0 multi-master total update
  9. replication protocol, used to (re)populate a replica.
  10. */
  11. #include "windowsrepl.h"
  12. #include "windows_prot_private.h"
  13. #include "repl.h"
  14. #include "repl5.h"
  15. #include "repl5_prot_private.h"
  16. /* Private data structures */
  17. typedef struct windows_tot_private
  18. {
  19. Repl_Protocol *rp;
  20. Repl_Agmt *ra;
  21. PRLock *lock;
  22. PRUint32 eventbits;
  23. } windows_tot_private;
  24. typedef struct callback_data
  25. {
  26. Private_Repl_Protocol *prp;
  27. int rc;
  28. unsigned long num_entries;
  29. time_t sleep_on_busy;
  30. time_t last_busy;
  31. } callback_data;
  32. /*
  33. * Number of window seconds to wait until we programmatically decide
  34. * that the replica has got out of BUSY state
  35. */
  36. #define SLEEP_ON_BUSY_WINDOW (10)
  37. /* Helper functions */
  38. static void get_result (int rc, void *cb_data);
  39. static int send_entry (Slapi_Entry *e, void *callback_data);
  40. static void windows_tot_delete(Private_Repl_Protocol **prp);
  41. /*
  42. * Completely refresh a replica. The basic protocol interaction goes
  43. * like this:
  44. * - Acquire Replica by sending a StartReplicationRequest extop, with the
  45. * total update protocol OID and supplier's ruv.
  46. * - Send a series of extended operations containing entries.
  47. * - send an EndReplicationRequest extended operation
  48. */
  49. static void
  50. windows_tot_run(Private_Repl_Protocol *prp)
  51. {
  52. int rc;
  53. callback_data cb_data;
  54. Slapi_PBlock *pb;
  55. const char* dn;
  56. CSN *remote_schema_csn = NULL;
  57. PRBool cookie_has_more = PR_TRUE;
  58. RUV *ruv = NULL;
  59. PR_ASSERT(NULL != prp);
  60. prp->stopped = 0;
  61. if (prp->terminate)
  62. {
  63. prp->stopped = 1;
  64. goto done;
  65. }
  66. conn_set_timeout(prp->conn, agmt_get_timeout(prp->agmt));
  67. /* acquire remote replica */
  68. agmt_set_last_init_start(prp->agmt, current_time());
  69. rc = windows_acquire_replica (prp, &ruv);
  70. /* We never retry total protocol, even in case a transient error.
  71. This is because if somebody already updated the replica we don't
  72. want to do it again */
  73. if (rc != ACQUIRE_SUCCESS)
  74. {
  75. int optype, ldaprc;
  76. conn_get_error(prp->conn, &optype, &ldaprc);
  77. agmt_set_last_init_status(prp->agmt, ldaprc,
  78. prp->last_acquire_response_code, NULL);
  79. goto done;
  80. }
  81. else if (prp->terminate)
  82. {
  83. conn_disconnect(prp->conn);
  84. prp->stopped = 1;
  85. goto done;
  86. }
  87. agmt_set_last_init_status(prp->agmt, 0, 0, "Total schema update in progress");
  88. remote_schema_csn = agmt_get_consumer_schema_csn ( prp->agmt );
  89. agmt_set_last_init_status(prp->agmt, 0, 0, "Total update in progress");
  90. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name, "Beginning total update of replica "
  91. "\"%s\".\n", agmt_get_long_name(prp->agmt));
  92. windows_private_null_dirsync_control(prp->agmt);
  93. /* get everything */
  94. windows_dirsync_inc_run(prp);
  95. cookie_has_more = windows_private_dirsync_has_more(prp->agmt);
  96. /* send everything */
  97. dn = slapi_sdn_get_dn( windows_private_get_directory_replarea(prp->agmt));
  98. pb = slapi_pblock_new ();
  99. slapi_search_internal_set_pb (pb, dn, /* XXX modify the searchfilter and scope? */
  100. LDAP_SCOPE_ONELEVEL, "(|(objectclass=ntuser)(objectclass=ntgroup)(nsuniqueid=*))", NULL, 0, NULL, NULL,
  101. repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
  102. cb_data.prp = prp;
  103. cb_data.rc = 0;
  104. cb_data.num_entries = 0UL;
  105. cb_data.sleep_on_busy = 0UL;
  106. cb_data.last_busy = current_time ();
  107. /* this search get all the entries from the replicated area including tombstones
  108. and referrals */
  109. slapi_search_internal_callback_pb (pb, &cb_data /* callback data */,
  110. get_result /* result callback */,
  111. send_entry /* entry callback */,
  112. NULL /* referral callback*/);
  113. slapi_pblock_destroy (pb);
  114. agmt_set_last_init_end(prp->agmt, current_time());
  115. rc = cb_data.rc;
  116. windows_release_replica(prp);
  117. if (rc != LDAP_SUCCESS)
  118. {
  119. slapi_log_error (SLAPI_LOG_REPL, windows_repl_plugin_name, "%s: windows_tot_run: "
  120. "failed to obtain data to send to the consumer; LDAP error - %d\n",
  121. agmt_get_long_name(prp->agmt), rc);
  122. agmt_set_last_init_status(prp->agmt, rc, 0, "Total update aborted");
  123. } else {
  124. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name, "Finished total update of replica "
  125. "\"%s\". Sent %d entries.\n", agmt_get_long_name(prp->agmt), cb_data.num_entries);
  126. agmt_set_last_init_status(prp->agmt, 0, 0, "Total update succeeded");
  127. }
  128. done:
  129. prp->stopped = 1;
  130. }
  131. static int
  132. windows_tot_stop(Private_Repl_Protocol *prp)
  133. {
  134. int return_value;
  135. int seconds = 600;
  136. PRIntervalTime start, maxwait, now;
  137. prp->terminate = 1;
  138. maxwait = PR_SecondsToInterval(seconds);
  139. start = PR_IntervalNow();
  140. now = start;
  141. while (!prp->stopped && ((now - start) < maxwait))
  142. {
  143. DS_Sleep(PR_SecondsToInterval(1));
  144. now = PR_IntervalNow();
  145. }
  146. if (!prp->stopped)
  147. {
  148. /* Isn't listening. Disconnect from the replica. */
  149. slapi_log_error (SLAPI_LOG_REPL, windows_repl_plugin_name, "windows_tot_run: "
  150. "protocol not stopped after waiting for %d seconds "
  151. "for agreement %s\n", PR_IntervalToSeconds(now-start),
  152. agmt_get_long_name(prp->agmt));
  153. conn_disconnect(prp->conn);
  154. return_value = -1;
  155. }
  156. else
  157. {
  158. return_value = 0;
  159. }
  160. return return_value;
  161. }
  162. static int
  163. windows_tot_status(Private_Repl_Protocol *prp)
  164. {
  165. int return_value = 0;
  166. return return_value;
  167. }
  168. static void
  169. windows_tot_noop(Private_Repl_Protocol *prp)
  170. {
  171. /* noop */
  172. }
  173. Private_Repl_Protocol *
  174. Windows_Tot_Protocol_new(Repl_Protocol *rp)
  175. {
  176. windows_tot_private *rip = NULL;
  177. Private_Repl_Protocol *prp = (Private_Repl_Protocol *)slapi_ch_malloc(sizeof(Private_Repl_Protocol));
  178. prp->delete = windows_tot_delete;
  179. prp->run = windows_tot_run;
  180. prp->stop = windows_tot_stop;
  181. prp->status = windows_tot_status;
  182. prp->notify_update = windows_tot_noop;
  183. prp->notify_agmt_changed = windows_tot_noop;
  184. prp->notify_window_opened = windows_tot_noop;
  185. prp->notify_window_closed = windows_tot_noop;
  186. prp->replica_object = prot_get_replica_object(rp);
  187. prp->update_now = windows_tot_noop;
  188. if ((prp->lock = PR_NewLock()) == NULL)
  189. {
  190. goto loser;
  191. }
  192. if ((prp->cvar = PR_NewCondVar(prp->lock)) == NULL)
  193. {
  194. goto loser;
  195. }
  196. prp->stopped = 1;
  197. prp->terminate = 0;
  198. prp->eventbits = 0;
  199. prp->conn = prot_get_connection(rp);
  200. prp->agmt = prot_get_agreement(rp);
  201. rip = (void *)slapi_ch_malloc(sizeof(windows_tot_private));
  202. rip->rp = rp;
  203. prp->private = (void *)rip;
  204. prp->replica_acquired = PR_FALSE;
  205. return prp;
  206. loser:
  207. windows_tot_delete(&prp);
  208. return NULL;
  209. }
  210. static void
  211. windows_tot_delete(Private_Repl_Protocol **prp)
  212. {
  213. }
  214. static
  215. void get_result (int rc, void *cb_data)
  216. {
  217. PR_ASSERT (cb_data);
  218. ((callback_data*)cb_data)->rc = rc;
  219. }
  220. static
  221. int send_entry (Slapi_Entry *e, void *cb_data)
  222. {
  223. int rc;
  224. Private_Repl_Protocol *prp;
  225. // struct berval *bv;
  226. unsigned long *num_entriesp;
  227. time_t *sleep_on_busyp;
  228. time_t *last_busyp;
  229. PR_ASSERT (cb_data);
  230. prp = ((callback_data*)cb_data)->prp;
  231. num_entriesp = &((callback_data *)cb_data)->num_entries;
  232. sleep_on_busyp = &((callback_data *)cb_data)->sleep_on_busy;
  233. last_busyp = &((callback_data *)cb_data)->last_busy;
  234. PR_ASSERT (prp);
  235. if (prp->terminate)
  236. {
  237. conn_disconnect(prp->conn);
  238. prp->stopped = 1;
  239. ((callback_data*)cb_data)->rc = -1;
  240. return -1;
  241. }
  242. /* skip ruv tombstone - need to do this because it might be
  243. more up to date then the data we are sending to the client.
  244. RUV is sent separately via the protocol */
  245. if (is_ruv_tombstone_entry (e))
  246. return 0;
  247. do {
  248. /* push the entry to the consumer */
  249. rc = add_or_modify_user(e);
  250. if (rc == CONN_BUSY) {
  251. time_t now = current_time ();
  252. if ((now - *last_busyp) < (*sleep_on_busyp + 10)) {
  253. *sleep_on_busyp +=5;
  254. }
  255. else {
  256. *sleep_on_busyp = 5;
  257. }
  258. *last_busyp = now;
  259. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  260. "Replica \"%s\" is busy. Waiting %ds while"
  261. " it finishes processing its current import queue\n",
  262. agmt_get_long_name(prp->agmt), *sleep_on_busyp);
  263. DS_Sleep(PR_SecondsToInterval(*sleep_on_busyp));
  264. }
  265. }
  266. while (rc == CONN_BUSY);
  267. (*num_entriesp)++;
  268. if (CONN_OPERATION_SUCCESS == rc) {
  269. return 0;
  270. } else {
  271. ((callback_data*)cb_data)->rc = rc;
  272. return -1;
  273. }
  274. }