windows_tot_protocol.c 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. * END COPYRIGHT BLOCK **/
  6. /* windows_tot_protocol.c */
  7. /*
  8. The tot_protocol object implements the DS 5.0 multi-master total update
  9. replication protocol, used to (re)populate a replica.
  10. */
  11. #include "repl.h"
  12. #include "repl5.h"
  13. #include "windowsrepl.h"
  14. #include "windows_prot_private.h"
  15. #include "slap.h"
  16. /* Private data structures */
  17. typedef struct windows_tot_private
  18. {
  19. Repl_Protocol *rp;
  20. Repl_Agmt *ra;
  21. PRLock *lock;
  22. PRUint32 eventbits;
  23. } windows_tot_private;
  24. typedef struct callback_data
  25. {
  26. Private_Repl_Protocol *prp;
  27. int rc;
  28. unsigned long num_entries;
  29. time_t sleep_on_busy;
  30. time_t last_busy;
  31. } callback_data;
  32. /*
  33. * Number of window seconds to wait until we programmatically decide
  34. * that the replica has got out of BUSY state
  35. */
  36. #define SLEEP_ON_BUSY_WINDOW (10)
  37. /* Helper functions */
  38. static void get_result (int rc, void *cb_data);
  39. static int send_entry (Slapi_Entry *e, void *callback_data);
  40. static void windows_tot_delete(Private_Repl_Protocol **prp);
  41. /*
  42. * Completely refresh a replica. The basic protocol interaction goes
  43. * like this:
  44. * - Acquire Replica by sending a StartReplicationRequest extop, with the
  45. * total update protocol OID and supplier's ruv.
  46. * - Send a series of extended operations containing entries.
  47. * - send an EndReplicationRequest extended operation
  48. */
  49. static void
  50. windows_tot_run(Private_Repl_Protocol *prp)
  51. {
  52. int rc;
  53. callback_data cb_data;
  54. Slapi_PBlock *pb;
  55. const char* dn;
  56. CSN *remote_schema_csn = NULL;
  57. PRBool cookie_has_more = PR_TRUE;
  58. RUV *ruv = NULL;
  59. LDAPDebug( LDAP_DEBUG_TRACE, "=> windows_tot_run\n", 0, 0, 0 );
  60. PR_ASSERT(NULL != prp);
  61. prp->stopped = 0;
  62. if (prp->terminate)
  63. {
  64. prp->stopped = 1;
  65. goto done;
  66. }
  67. conn_set_timeout(prp->conn, agmt_get_timeout(prp->agmt));
  68. /* acquire remote replica */
  69. agmt_set_last_init_start(prp->agmt, current_time());
  70. rc = windows_acquire_replica (prp, &ruv, 0 /* don't check RUV for total protocol */);
  71. /* We never retry total protocol, even in case a transient error.
  72. This is because if somebody already updated the replica we don't
  73. want to do it again */
  74. if (rc != ACQUIRE_SUCCESS)
  75. {
  76. int optype, ldaprc;
  77. conn_get_error(prp->conn, &optype, &ldaprc);
  78. agmt_set_last_init_status(prp->agmt, ldaprc,
  79. prp->last_acquire_response_code, NULL);
  80. goto done;
  81. }
  82. else if (prp->terminate)
  83. {
  84. conn_disconnect(prp->conn);
  85. prp->stopped = 1;
  86. goto done;
  87. }
  88. agmt_set_last_init_status(prp->agmt, 0, 0, "Total schema update in progress");
  89. remote_schema_csn = agmt_get_consumer_schema_csn ( prp->agmt );
  90. agmt_set_last_init_status(prp->agmt, 0, 0, "Total update in progress");
  91. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name, "Beginning total update of replica "
  92. "\"%s\".\n", agmt_get_long_name(prp->agmt));
  93. windows_private_null_dirsync_cookie(prp->agmt);
  94. /* get everything */
  95. windows_dirsync_inc_run(prp);
  96. cookie_has_more = windows_private_dirsync_has_more(prp->agmt);
  97. windows_private_save_dirsync_cookie(prp->agmt);
  98. /* send everything */
  99. dn = slapi_sdn_get_dn( windows_private_get_directory_subtree(prp->agmt));
  100. pb = slapi_pblock_new ();
  101. slapi_search_internal_set_pb (pb, dn, /* XXX modify the searchfilter and scope? */
  102. LDAP_SCOPE_ONELEVEL, "(|(objectclass=ntuser)(objectclass=ntgroup)(nsuniqueid=*))", NULL, 0, NULL, NULL,
  103. repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
  104. cb_data.prp = prp;
  105. cb_data.rc = 0;
  106. cb_data.num_entries = 0UL;
  107. cb_data.sleep_on_busy = 0UL;
  108. cb_data.last_busy = current_time ();
  109. /* this search get all the entries from the replicated area including tombstones
  110. and referrals */
  111. slapi_search_internal_callback_pb (pb, &cb_data /* callback data */,
  112. get_result /* result callback */,
  113. send_entry /* entry callback */,
  114. NULL /* referral callback*/);
  115. slapi_pblock_destroy (pb);
  116. agmt_set_last_init_end(prp->agmt, current_time());
  117. rc = cb_data.rc;
  118. windows_release_replica(prp);
  119. if (rc != LDAP_SUCCESS)
  120. {
  121. slapi_log_error (SLAPI_LOG_REPL, windows_repl_plugin_name, "%s: windows_tot_run: "
  122. "failed to obtain data to send to the consumer; LDAP error - %d\n",
  123. agmt_get_long_name(prp->agmt), rc);
  124. agmt_set_last_init_status(prp->agmt, rc, 0, "Total update aborted");
  125. } else {
  126. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name, "Finished total update of replica "
  127. "\"%s\". Sent %d entries.\n", agmt_get_long_name(prp->agmt), cb_data.num_entries);
  128. agmt_set_last_init_status(prp->agmt, 0, 0, "Total update succeeded");
  129. }
  130. done:
  131. prp->stopped = 1;
  132. LDAPDebug( LDAP_DEBUG_TRACE, "<= windows_tot_run\n", 0, 0, 0 );
  133. }
  134. static int
  135. windows_tot_stop(Private_Repl_Protocol *prp)
  136. {
  137. int return_value;
  138. int seconds = 600;
  139. PRIntervalTime start, maxwait, now;
  140. LDAPDebug( LDAP_DEBUG_TRACE, "=> windows_tot_stop\n", 0, 0, 0 );
  141. prp->terminate = 1;
  142. maxwait = PR_SecondsToInterval(seconds);
  143. start = PR_IntervalNow();
  144. now = start;
  145. while (!prp->stopped && ((now - start) < maxwait))
  146. {
  147. DS_Sleep(PR_SecondsToInterval(1));
  148. now = PR_IntervalNow();
  149. }
  150. if (!prp->stopped)
  151. {
  152. /* Isn't listening. Disconnect from the replica. */
  153. slapi_log_error (SLAPI_LOG_REPL, windows_repl_plugin_name, "windows_tot_run: "
  154. "protocol not stopped after waiting for %d seconds "
  155. "for agreement %s\n", PR_IntervalToSeconds(now-start),
  156. agmt_get_long_name(prp->agmt));
  157. conn_disconnect(prp->conn);
  158. return_value = -1;
  159. }
  160. else
  161. {
  162. return_value = 0;
  163. }
  164. LDAPDebug( LDAP_DEBUG_TRACE, "<= windows_tot_stop\n", 0, 0, 0 );
  165. return return_value;
  166. }
  167. static int
  168. windows_tot_status(Private_Repl_Protocol *prp)
  169. {
  170. int return_value = 0;
  171. LDAPDebug( LDAP_DEBUG_TRACE, "=> windows_tot_status\n", 0, 0, 0 );
  172. LDAPDebug( LDAP_DEBUG_TRACE, "<= windows_tot_status\n", 0, 0, 0 );
  173. return return_value;
  174. }
  175. static void
  176. windows_tot_noop(Private_Repl_Protocol *prp)
  177. {
  178. LDAPDebug( LDAP_DEBUG_TRACE, "=> windows_tot_noop\n", 0, 0, 0 );
  179. LDAPDebug( LDAP_DEBUG_TRACE, "<= windows_tot_noop\n", 0, 0, 0 );
  180. /* noop */
  181. }
  182. Private_Repl_Protocol *
  183. Windows_Tot_Protocol_new(Repl_Protocol *rp)
  184. {
  185. windows_tot_private *rip = NULL;
  186. Private_Repl_Protocol *prp = (Private_Repl_Protocol *)slapi_ch_malloc(sizeof(Private_Repl_Protocol));
  187. LDAPDebug( LDAP_DEBUG_TRACE, "=> Windows_Tot_Protocol_new\n", 0, 0, 0 );
  188. prp->delete = windows_tot_delete;
  189. prp->run = windows_tot_run;
  190. prp->stop = windows_tot_stop;
  191. prp->status = windows_tot_status;
  192. prp->notify_update = windows_tot_noop;
  193. prp->notify_agmt_changed = windows_tot_noop;
  194. prp->notify_window_opened = windows_tot_noop;
  195. prp->notify_window_closed = windows_tot_noop;
  196. prp->replica_object = prot_get_replica_object(rp);
  197. prp->update_now = windows_tot_noop;
  198. if ((prp->lock = PR_NewLock()) == NULL)
  199. {
  200. goto loser;
  201. }
  202. if ((prp->cvar = PR_NewCondVar(prp->lock)) == NULL)
  203. {
  204. goto loser;
  205. }
  206. prp->stopped = 1;
  207. prp->terminate = 0;
  208. prp->eventbits = 0;
  209. prp->conn = prot_get_connection(rp);
  210. prp->agmt = prot_get_agreement(rp);
  211. rip = (void *)slapi_ch_malloc(sizeof(windows_tot_private));
  212. rip->rp = rp;
  213. prp->private = (void *)rip;
  214. prp->replica_acquired = PR_FALSE;
  215. LDAPDebug( LDAP_DEBUG_TRACE, "<= Windows_Tot_Protocol_new\n", 0, 0, 0 );
  216. return prp;
  217. loser:
  218. windows_tot_delete(&prp);
  219. LDAPDebug( LDAP_DEBUG_TRACE, "<= Windows_Tot_Protocol_new - loser\n", 0, 0, 0 );
  220. return NULL;
  221. }
  222. static void
  223. windows_tot_delete(Private_Repl_Protocol **prp)
  224. {
  225. LDAPDebug( LDAP_DEBUG_TRACE, "=> windows_tot_delete\n", 0, 0, 0 );
  226. LDAPDebug( LDAP_DEBUG_TRACE, "<= windows_tot_delete\n", 0, 0, 0 );
  227. }
  228. static
  229. void get_result (int rc, void *cb_data)
  230. {
  231. LDAPDebug( LDAP_DEBUG_TRACE, "=> get_result\n", 0, 0, 0 );
  232. PR_ASSERT (cb_data);
  233. ((callback_data*)cb_data)->rc = rc;
  234. LDAPDebug( LDAP_DEBUG_TRACE, "<= get_result\n", 0, 0, 0 );
  235. }
  236. static
  237. int send_entry (Slapi_Entry *e, void *cb_data)
  238. {
  239. int rc;
  240. Private_Repl_Protocol *prp;
  241. unsigned long *num_entriesp;
  242. time_t *sleep_on_busyp;
  243. time_t *last_busyp;
  244. LDAPDebug( LDAP_DEBUG_TRACE, "=> send_entry\n", 0, 0, 0 );
  245. PR_ASSERT (cb_data);
  246. prp = ((callback_data*)cb_data)->prp;
  247. num_entriesp = &((callback_data *)cb_data)->num_entries;
  248. sleep_on_busyp = &((callback_data *)cb_data)->sleep_on_busy;
  249. last_busyp = &((callback_data *)cb_data)->last_busy;
  250. PR_ASSERT (prp);
  251. if (prp->terminate)
  252. {
  253. conn_disconnect(prp->conn);
  254. prp->stopped = 1;
  255. ((callback_data*)cb_data)->rc = -1;
  256. LDAPDebug( LDAP_DEBUG_TRACE, "<= send_entry\n", 0, 0, 0 );
  257. return -1;
  258. }
  259. /* skip ruv tombstone - not relvant to Active Directory */
  260. if (is_ruv_tombstone_entry (e)) {
  261. LDAPDebug( LDAP_DEBUG_TRACE, "<= send_entry\n", 0, 0, 0 );
  262. return 0;
  263. }
  264. /* push the entry to the consumer */
  265. rc = windows_process_total_entry(prp,e);
  266. (*num_entriesp)++;
  267. LDAPDebug( LDAP_DEBUG_TRACE, "<= send_entry\n", 0, 0, 0 );
  268. if (CONN_OPERATION_SUCCESS == rc) {
  269. return 0;
  270. } else {
  271. ((callback_data*)cb_data)->rc = rc;
  272. return -1;
  273. }
  274. }