1
0

windows_inc_protocol.c 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. # include <config.h>
  11. #endif
  12. /* repl5_inc_protocol.c */
  13. /*
  14. The Prot_Incremental object implements the DS 5.0 multi-master incremental
  15. replication protocol.
  16. Stuff to do:
  17. - Need to figure out how asynchronous events end up in here. They are:
  18. - entry updated in replicated area.
  19. - backoff timeout
  20. - enter/leave.
  21. Perhaps these events should be properties of the main protocol.
  22. */
  23. #include "repl.h"
  24. #include "repl5.h"
  25. #include "windowsrepl.h"
  26. #include "windows_prot_private.h"
  27. #include "slap.h" /* PSEUDO_ATTR_UNHASHED */
  28. #include "repl5_ruv.h"
  29. #include "cl5_api.h"
  30. #include "slapi-plugin.h"
  31. extern int slapi_log_urp;
  32. /*** from proto-slap.h ***/
  33. void ava_done(struct ava *ava);
  34. typedef struct windows_inc_private
  35. {
  36. char *ruv; /* RUV on remote replica (use diff type for this? - ggood */
  37. Backoff_Timer *backoff;
  38. Repl_Protocol *rp;
  39. PRLock *lock;
  40. PRUint32 eventbits;
  41. } windows_inc_private;
  42. /* Various states the incremental protocol can pass through */
  43. #define STATE_START 0 /* ONREPL - should we rename this - we don't use it just to start up? */
  44. #define STATE_WAIT_WINDOW_OPEN 1
  45. #define STATE_WAIT_CHANGES 2
  46. #define STATE_READY_TO_ACQUIRE 3
  47. #define STATE_BACKOFF_START 4 /* ONREPL - can we combine BACKOFF_START and BACKOFF states? */
  48. #define STATE_BACKOFF 5
  49. #define STATE_SENDING_UPDATES 6
  50. #define STATE_STOP_FATAL_ERROR 7
  51. #define STATE_STOP_FATAL_ERROR_PART2 8
  52. #define STATE_STOP_NORMAL_TERMINATION 9
  53. /* Events (synchronous and asynchronous; these are bits) */
  54. #define EVENT_WINDOW_OPENED 1
  55. #define EVENT_WINDOW_CLOSED 2
  56. #define EVENT_TRIGGERING_CRITERIA_MET 4 /* ONREPL - should we rename this to EVENT_CHANGE_AVAILABLE */
  57. #define EVENT_BACKOFF_EXPIRED 8
  58. #define EVENT_REPLICATE_NOW 16
  59. #define EVENT_PROTOCOL_SHUTDOWN 32
  60. #define EVENT_AGMT_CHANGED 64
  61. #define EVENT_RUN_DIRSYNC 128
  62. #define UPDATE_NO_MORE_UPDATES 201
  63. #define UPDATE_TRANSIENT_ERROR 202
  64. #define UPDATE_FATAL_ERROR 203
  65. #define UPDATE_SCHEDULE_WINDOW_CLOSED 204
  66. #define UPDATE_CONNECTION_LOST 205
  67. #define UPDATE_TIMEOUT 206
  68. #define UPDATE_YIELD 207
  69. /* Return codes from examine_update_vector */
  70. #define EXAMINE_RUV_PRISTINE_REPLICA 401
  71. #define EXAMINE_RUV_GENERATION_MISMATCH 402
  72. #define EXAMINE_RUV_REPLICA_TOO_OLD 403
  73. #define EXAMINE_RUV_OK 404
  74. #define EXAMINE_RUV_PARAM_ERROR 405
  75. #define MAX_CHANGES_PER_SESSION 10000
  76. /*
  77. * Maximum time to wait between replication sessions. If we
  78. * don't see any updates for a period equal to this interval,
  79. * we go ahead and start a replication session, just to be safe
  80. */
  81. #define MAX_WAIT_BETWEEN_SESSIONS PR_SecondsToInterval(60 * 5) /* 5 minutes */
  82. /*
  83. * tests if the protocol has been shutdown and we need to quit
  84. * event_occurred resets the bits in the bit flag, so whoever tests for shutdown
  85. * resets the flags, so the next one who tests for shutdown won't get it, so we
  86. * also look at the terminate flag
  87. */
  88. #define PROTOCOL_IS_SHUTDOWN(prp) (event_occurred(prp, EVENT_PROTOCOL_SHUTDOWN) || prp->terminate)
  89. /* Forward declarations */
  90. static PRUint32 event_occurred(Private_Repl_Protocol *prp, PRUint32 event);
  91. static void reset_events (Private_Repl_Protocol *prp);
  92. static void protocol_sleep(Private_Repl_Protocol *prp, PRIntervalTime duration);
  93. static int send_updates(Private_Repl_Protocol *prp, RUV *ruv, PRUint32 *num_changes_sent, int do_send);
  94. static void windows_inc_backoff_expired(time_t timer_fire_time, void *arg);
  95. static int windows_examine_update_vector(Private_Repl_Protocol *prp, RUV *ruv);
  96. static const char* state2name (int state);
  97. static const char* event2name (int event);
  98. static const char* acquire2name (int code);
  99. static void periodic_dirsync(time_t when, void *arg);
  100. static Slapi_Eq_Context dirsync = NULL;
  101. /*
  102. * It's specifically ok to delete a protocol instance that
  103. * is currently running. The instance will be shut down, and
  104. * then resources will be freed. Since a graceful shutdown is
  105. * attempted, this function may take some time to complete.
  106. */
  107. static void
  108. windows_inc_delete(Private_Repl_Protocol **prpp)
  109. {
  110. int rc;
  111. windows_inc_private *prp_priv = (windows_inc_private *)(*prpp)->private;
  112. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_delete\n" );
  113. /* First, stop the protocol if it isn't already stopped */
  114. /* Then, delete all resources used by the protocol */
  115. rc = slapi_eq_cancel(dirsync);
  116. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  117. "windows_inc_delete: dirsync: %p, rval: %d\n", dirsync, rc);
  118. /* if backoff is set, delete it (from EQ, as well) */
  119. if (prp_priv->backoff) {
  120. backoff_delete(&prp_priv->backoff);
  121. }
  122. if (!(*prpp)->stopped) {
  123. (*prpp)->stopped = 1;
  124. (*prpp)->stop(*prpp);
  125. }
  126. if ((*prpp)->lock) {
  127. PR_DestroyLock((*prpp)->lock);
  128. (*prpp)->lock = NULL;
  129. }
  130. if ((*prpp)->cvar) {
  131. PR_DestroyCondVar((*prpp)->cvar);
  132. (*prpp)->cvar = NULL;
  133. }
  134. slapi_ch_free((void **)&(*prpp)->private);
  135. slapi_ch_free((void **)prpp);
  136. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_delete\n" );
  137. }
  138. /* helper function */
  139. void
  140. w_set_pause_and_busy_time(Private_Repl_Protocol *prp, long *pausetime, long *busywaittime)
  141. {
  142. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> w_set_pause_and_busy_time\n" );
  143. /* If neither are set, set busy time to its default */
  144. if (!*pausetime && !*busywaittime)
  145. {
  146. *busywaittime = repl5_get_backoff_min(prp);
  147. }
  148. /* pause time must be at least 1 more than the busy backoff time */
  149. if (*pausetime && !*busywaittime)
  150. {
  151. /*
  152. * user specified a pause time but no busy wait time - must
  153. * set busy wait time to 1 less than pause time - if pause
  154. * time is 1, we must set it to 2
  155. */
  156. if (*pausetime < 2)
  157. {
  158. *pausetime = 2;
  159. }
  160. *busywaittime = *pausetime - 1;
  161. }
  162. else if (!*pausetime && *busywaittime)
  163. {
  164. /*
  165. * user specified a busy wait time but no pause time - must
  166. * set pause time to 1 more than busy wait time
  167. */
  168. *pausetime = *busywaittime + 1;
  169. }
  170. else if (*pausetime && *busywaittime && *pausetime <= *busywaittime)
  171. {
  172. /*
  173. * user specified both pause and busy wait times, but the pause
  174. * time was <= busy wait time - pause time must be at least
  175. * 1 more than the busy wait time
  176. */
  177. *pausetime = *busywaittime + 1;
  178. }
  179. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= w_set_pause_and_busy_time\n" );
  180. }
  181. /*
  182. * Do the incremental protocol.
  183. *
  184. * What's going on here? This thing is a state machine. It has the
  185. * following states:
  186. *
  187. * State transition table:
  188. *
  189. * Curr State Condition/Event Next State
  190. * ---------- ------------ -----------
  191. * START schedule window is open ACQUIRE_REPLICA
  192. * schedule window is closed WAIT_WINDOW_OPEN
  193. * WAIT_WINDOW_OPEN schedule change START
  194. * replicate now ACQUIRE_REPLICA
  195. * schedule window opens ACQUIRE_REPLICA
  196. * ACQUIRE_REPLICA acquired replica SEND_CHANGES
  197. * failed to acquire - transient error START_BACKOFF
  198. * failed to acquire - fatal error STOP_FATAL_ERROR
  199. * SEND_CHANGES can't update CONSUMER_NEEDS_REINIT
  200. * no changes to send WAIT_CHANGES
  201. * can't send - thransient error START_BACKOF
  202. * can't send - window closed WAIT_WINDOW_OPEN
  203. * can'r send - fatal error STOP_FATAL_ERROR
  204. * START_BACKOF replicate now ACQUIRE_REPLICA
  205. * schedule changes START
  206. * schedule window closes WAIT_WINDOW_OPEN
  207. * backoff expires & can acquire SEND_CHANGES
  208. * backoff expires & can't acquire-trans BACKOFF
  209. * backoff expires & can't acquire-fatal STOP_FATAL_ERROR
  210. * BACKOF replicate now ACQUIRE_REPLICA
  211. * schedule changes START
  212. * schedule window closes WAIT_WINDOW_OPEN
  213. * backoff expires & can acquire SEND_CHANGES
  214. * backoff expires & can't acquire-trans BACKOFF
  215. * backoff expires & can't acquire-fatal STOP_FATAL_ERROR
  216. * WAIT_CHANGES schedule window closes WAIT_WINDOW_OPEN
  217. * replicate_now ACQUIRE_REPLICA
  218. * change available ACQUIRE_REPLICA
  219. * schedule_change START
  220. */
  221. /*
  222. * DBDB: what follows is quite possibly the worst code I have ever seen.
  223. * Unfortunately we chose not to re-write it when we did the windows sync version.
  224. */
  225. /*
  226. * Main state machine for the incremental protocol. This routine will,
  227. * under normal circumstances, not return until the protocol is shut
  228. * down.
  229. */
  230. static void
  231. windows_inc_run(Private_Repl_Protocol *prp)
  232. {
  233. int current_state = STATE_START;
  234. int next_state = STATE_START;
  235. windows_inc_private *prp_priv = (windows_inc_private *)prp->private;
  236. int done = 0;
  237. int e1 = 0;
  238. RUV *ruv = NULL;
  239. Replica *replica = NULL;
  240. int wait_change_timer_set = 0;
  241. time_t last_start_time = 0;
  242. PRUint32 num_changes_sent = 0;
  243. /* use a different backoff timer strategy for ACQUIRE_REPLICA_BUSY errors */
  244. PRBool use_busy_backoff_timer = PR_FALSE;
  245. long pausetime = 0;
  246. long busywaittime = 0;
  247. unsigned long current_interval = 0;
  248. unsigned long interval = 0;
  249. int one_way;
  250. PRBool run_dirsync = PR_FALSE;
  251. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_run\n" );
  252. prp->stopped = 0;
  253. prp->terminate = 0;
  254. windows_private_load_dirsync_cookie(prp->agmt);
  255. do {
  256. int rc = 0;
  257. one_way = windows_private_get_one_way(prp->agmt);
  258. /* Take action, based on current state, and compute new state. */
  259. switch (current_state)
  260. {
  261. case STATE_START:
  262. dev_debug("windows_inc_run(STATE_START)");
  263. if (PROTOCOL_IS_SHUTDOWN(prp))
  264. {
  265. done = 1;
  266. break;
  267. }
  268. /*
  269. * Our initial state. See if we're in a schedule window. If
  270. * so, then we're ready to acquire the replica and see if it
  271. * needs any updates from us. If not, then wait for the window
  272. * to open.
  273. */
  274. if (agmt_schedule_in_window_now(prp->agmt))
  275. {
  276. next_state = STATE_READY_TO_ACQUIRE;
  277. } else
  278. {
  279. next_state = STATE_WAIT_WINDOW_OPEN;
  280. }
  281. /* we can get here from other states because some events happened and were
  282. not cleared. For instance when we wake up in STATE_WAIT_CHANGES state.
  283. Since this is a fresh start state, we should clear all events */
  284. /* ONREPL - this does not feel right - we should take another look
  285. at this state machine */
  286. reset_events (prp);
  287. /* Cancel any linger timer that might be in effect... */
  288. windows_conn_cancel_linger(prp->conn);
  289. /* ... and disconnect, if currently connected */
  290. windows_conn_disconnect(prp->conn);
  291. /* get the new pause time, if any */
  292. pausetime = agmt_get_pausetime(prp->agmt);
  293. /* get the new busy wait time, if any */
  294. busywaittime = agmt_get_busywaittime(prp->agmt);
  295. if (pausetime || busywaittime)
  296. {
  297. /* helper function to make sure they are set correctly */
  298. w_set_pause_and_busy_time(prp, &pausetime, &busywaittime);
  299. }
  300. /* Check if the interval changed */
  301. interval = windows_private_get_sync_interval(prp->agmt) * 1000;
  302. if(interval != current_interval){
  303. current_interval = interval;
  304. if(dirsync){
  305. int rc = slapi_eq_cancel(dirsync);
  306. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  307. "windows_inc_runs: cancelled dirsync: %p, rval: %d\n",
  308. dirsync, rc);
  309. }
  310. dirsync = slapi_eq_repeat(periodic_dirsync, (void*) prp, (time_t)0 , interval);
  311. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  312. "windows_inc_runs: new dirsync: %p\n", dirsync);
  313. }
  314. break;
  315. case STATE_WAIT_WINDOW_OPEN:
  316. /*
  317. * We're waiting for a schedule window to open. If one did,
  318. * or we receive a "replicate now" event, then start a protocol
  319. * session immediately. If the replication schedule changed, go
  320. * back to start. Otherwise, go back to sleep.
  321. */
  322. dev_debug("windows_inc_run(STATE_WAIT_WINDOW_OPEN)");
  323. if (PROTOCOL_IS_SHUTDOWN(prp))
  324. {
  325. done = 1;
  326. break;
  327. }
  328. else if (event_occurred(prp, EVENT_WINDOW_OPENED))
  329. {
  330. next_state = STATE_READY_TO_ACQUIRE;
  331. }
  332. else if (event_occurred(prp, EVENT_REPLICATE_NOW))
  333. {
  334. next_state = STATE_READY_TO_ACQUIRE;
  335. }
  336. else if (event_occurred(prp, EVENT_AGMT_CHANGED))
  337. {
  338. next_state = STATE_START;
  339. run_dirsync = PR_TRUE;
  340. windows_conn_set_agmt_changed(prp->conn);
  341. }
  342. else if (event_occurred(prp, EVENT_TRIGGERING_CRITERIA_MET)) /* change available */
  343. {
  344. /* just ignore it and go to sleep */
  345. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  346. }
  347. else if ((e1 = event_occurred(prp, EVENT_WINDOW_CLOSED)) ||
  348. event_occurred(prp, EVENT_BACKOFF_EXPIRED))
  349. {
  350. /* this events - should not occur - log a warning and go to sleep */
  351. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  352. "%s: Incremental protocol: "
  353. "event %s should not occur in state %s; going to sleep\n",
  354. agmt_get_long_name(prp->agmt),
  355. e1 ? event2name(EVENT_WINDOW_CLOSED) : event2name(EVENT_BACKOFF_EXPIRED),
  356. state2name(current_state));
  357. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  358. }
  359. else if (event_occurred(prp, EVENT_RUN_DIRSYNC)) /* periodic_dirsync */
  360. {
  361. /* just ignore it and go to sleep */
  362. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  363. }
  364. else
  365. {
  366. /* wait until window opens or an event occurs */
  367. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  368. "%s: Incremental protocol: "
  369. "waiting for update window to open\n", agmt_get_long_name(prp->agmt));
  370. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  371. }
  372. break;
  373. case STATE_WAIT_CHANGES:
  374. /*
  375. * We're in a replication window, but we're waiting for more
  376. * changes to accumulate before we actually hook up and send
  377. * them.
  378. */
  379. dev_debug("windows_inc_run(STATE_WAIT_CHANGES)");
  380. if (PROTOCOL_IS_SHUTDOWN(prp))
  381. {
  382. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): PROTOCOL_IS_SHUTING_DOWN -> end windows_inc_run\n");
  383. done = 1;
  384. break;
  385. }
  386. else if (event_occurred(prp, EVENT_REPLICATE_NOW))
  387. {
  388. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): EVENT_REPLICATE_NOW received -> STATE_READY_TO_ACQUIRE\n");
  389. next_state = STATE_READY_TO_ACQUIRE;
  390. wait_change_timer_set = 0;
  391. /* We also want to run dirsync on a 'replicate now' event */
  392. run_dirsync = PR_TRUE;
  393. }
  394. else if ( event_occurred(prp, EVENT_RUN_DIRSYNC))
  395. {
  396. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): EVENT_REPLICATE_NOW received -> STATE_READY_TO_ACQUIRE\n");
  397. next_state = STATE_READY_TO_ACQUIRE;
  398. wait_change_timer_set = 0;
  399. run_dirsync = PR_TRUE;
  400. }
  401. else if (event_occurred(prp, EVENT_AGMT_CHANGED))
  402. {
  403. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): EVENT_AGMT_CHANGED received -> STATE_START\n");
  404. next_state = STATE_START;
  405. windows_conn_set_agmt_changed(prp->conn);
  406. wait_change_timer_set = 0;
  407. /* We also want to run dirsync on a 'agreement changed' event, because that's how we receive 'send updates now' */
  408. run_dirsync = PR_TRUE;
  409. }
  410. else if (event_occurred(prp, EVENT_WINDOW_CLOSED))
  411. {
  412. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): EVENT_WINDOW_CLOSED received -> STATE_WAIT_WINDOW_OPEN\n");
  413. next_state = STATE_WAIT_WINDOW_OPEN;
  414. wait_change_timer_set = 0;
  415. }
  416. else if (event_occurred(prp, EVENT_TRIGGERING_CRITERIA_MET) )
  417. {
  418. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): EVENT_TRIGGERING_CRITERIA_MET received -> STATE_READY_TO_ACQUIRE\n");
  419. next_state = STATE_READY_TO_ACQUIRE;
  420. wait_change_timer_set = 0;
  421. /* We want to run dirsync to catch values generated by AD after
  422. * it process operations that we send to it (such as GUID values). */
  423. run_dirsync = PR_TRUE;
  424. }
  425. else if ((e1 = event_occurred(prp, EVENT_WINDOW_OPENED)) ||
  426. event_occurred(prp, EVENT_BACKOFF_EXPIRED))
  427. {
  428. /* this events - should not occur - log a warning and clear the event */
  429. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name, "%s: Incremental protocol: "
  430. "event %s should not occur in state %s\n",
  431. agmt_get_long_name(prp->agmt),
  432. e1 ? event2name(EVENT_WINDOW_OPENED) : event2name(EVENT_BACKOFF_EXPIRED),
  433. state2name(current_state));
  434. wait_change_timer_set = 0;
  435. }
  436. else
  437. {
  438. if (wait_change_timer_set)
  439. {
  440. /* We are here because our timer expired */
  441. dev_debug("windows_inc_run(STATE_WAIT_CHANGES): wait_change_timer_set expired -> STATE_START\n");
  442. next_state = STATE_START;
  443. run_dirsync = PR_TRUE;
  444. wait_change_timer_set = 0;
  445. }
  446. else
  447. {
  448. /* We are here because the last replication session
  449. * finished or aborted.
  450. */
  451. wait_change_timer_set = 1;
  452. protocol_sleep(prp, MAX_WAIT_BETWEEN_SESSIONS);
  453. }
  454. }
  455. break;
  456. case STATE_READY_TO_ACQUIRE:
  457. dev_debug("windows_inc_run(STATE_READY_TO_ACQUIRE)");
  458. if (PROTOCOL_IS_SHUTDOWN(prp))
  459. {
  460. done = 1;
  461. break;
  462. }
  463. /* ONREPL - at this state we unconditionally acquire the replica
  464. ignoring all events. Not sure if this is good */
  465. object_acquire(prp->replica_object);
  466. rc = windows_acquire_replica(prp, &ruv , (run_dirsync == 0) /* yes, check the consumer RUV for incremental, but not if we're going to dirsync afterwards */);
  467. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  468. "windows_acquire_replica returned %s (%d)\n",
  469. acquire2name(rc),
  470. rc);
  471. use_busy_backoff_timer = PR_FALSE; /* default */
  472. if (rc == ACQUIRE_SUCCESS)
  473. {
  474. next_state = STATE_SENDING_UPDATES;
  475. }
  476. else if (rc == ACQUIRE_REPLICA_BUSY)
  477. {
  478. next_state = STATE_BACKOFF_START;
  479. use_busy_backoff_timer = PR_TRUE;
  480. }
  481. else if (rc == ACQUIRE_CONSUMER_WAS_UPTODATE)
  482. {
  483. next_state = STATE_WAIT_CHANGES;
  484. }
  485. else if (rc == ACQUIRE_TRANSIENT_ERROR)
  486. {
  487. next_state = STATE_BACKOFF_START;
  488. }
  489. else if (rc == ACQUIRE_FATAL_ERROR)
  490. {
  491. next_state = STATE_STOP_FATAL_ERROR;
  492. }
  493. if (rc != ACQUIRE_SUCCESS)
  494. {
  495. int optype, ldaprc;
  496. windows_conn_get_error(prp->conn, &optype, &ldaprc);
  497. agmt_set_last_update_status(prp->agmt, ldaprc,
  498. prp->last_acquire_response_code, NULL);
  499. }
  500. object_release(prp->replica_object);
  501. break;
  502. case STATE_BACKOFF_START:
  503. dev_debug("windows_inc_run(STATE_BACKOFF_START)");
  504. if (PROTOCOL_IS_SHUTDOWN(prp))
  505. {
  506. done = 1;
  507. break;
  508. }
  509. if (event_occurred(prp, EVENT_REPLICATE_NOW) || event_occurred(prp, EVENT_RUN_DIRSYNC))
  510. {
  511. next_state = STATE_READY_TO_ACQUIRE;
  512. }
  513. else if (event_occurred(prp, EVENT_AGMT_CHANGED))
  514. {
  515. next_state = STATE_START;
  516. run_dirsync = PR_TRUE; /* Also trigger dirsync for the 'send updates now' feature */
  517. windows_conn_set_agmt_changed(prp->conn);
  518. }
  519. else if (event_occurred (prp, EVENT_WINDOW_CLOSED))
  520. {
  521. next_state = STATE_WAIT_WINDOW_OPEN;
  522. }
  523. else if (event_occurred (prp, EVENT_TRIGGERING_CRITERIA_MET))
  524. {
  525. /* consume and ignore */
  526. }
  527. else if ((e1 = event_occurred (prp, EVENT_WINDOW_OPENED)) ||
  528. event_occurred (prp, EVENT_BACKOFF_EXPIRED))
  529. {
  530. /* This should never happen */
  531. /* this events - should not occur - log a warning and go to sleep */
  532. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  533. "%s: Incremental protocol: event %s should not occur in state %s\n",
  534. agmt_get_long_name(prp->agmt),
  535. e1 ? event2name(EVENT_WINDOW_OPENED) : event2name(EVENT_BACKOFF_EXPIRED),
  536. state2name(current_state));
  537. }
  538. else
  539. {
  540. /* Set up the backoff timer to wake us up at the appropriate time */
  541. /* if previous backoff set up, delete it. */
  542. if (prp_priv->backoff) {
  543. backoff_delete(&prp_priv->backoff);
  544. }
  545. if (use_busy_backoff_timer)
  546. {
  547. /* we received a busy signal from the consumer, wait for a while */
  548. if (!busywaittime){
  549. busywaittime = repl5_get_backoff_min(prp);
  550. }
  551. prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime, busywaittime);
  552. }
  553. else
  554. {
  555. prp_priv->backoff = backoff_new(BACKOFF_EXPONENTIAL, repl5_get_backoff_min(prp),
  556. repl5_get_backoff_max(prp) );
  557. }
  558. next_state = STATE_BACKOFF;
  559. backoff_reset(prp_priv->backoff, windows_inc_backoff_expired, (void *)prp);
  560. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  561. use_busy_backoff_timer = PR_FALSE;
  562. }
  563. break;
  564. case STATE_BACKOFF:
  565. /*
  566. * We're in a backoff state.
  567. */
  568. dev_debug("windows_inc_run(STATE_BACKOFF)");
  569. if (PROTOCOL_IS_SHUTDOWN(prp))
  570. {
  571. if (prp_priv->backoff)
  572. backoff_delete(&prp_priv->backoff);
  573. done = 1;
  574. break;
  575. }
  576. else if (event_occurred(prp, EVENT_REPLICATE_NOW) || event_occurred(prp, EVENT_RUN_DIRSYNC))
  577. {
  578. next_state = STATE_READY_TO_ACQUIRE;
  579. }
  580. else if (event_occurred(prp, EVENT_AGMT_CHANGED))
  581. {
  582. next_state = STATE_START;
  583. run_dirsync = PR_TRUE;
  584. windows_conn_set_agmt_changed(prp->conn);
  585. /* Destroy the backoff timer, since we won't need it anymore */
  586. if (prp_priv->backoff)
  587. backoff_delete(&prp_priv->backoff);
  588. }
  589. else if (event_occurred(prp, EVENT_WINDOW_CLOSED))
  590. {
  591. next_state = STATE_WAIT_WINDOW_OPEN;
  592. /* Destroy the backoff timer, since we won't need it anymore */
  593. if (prp_priv->backoff)
  594. backoff_delete(&prp_priv->backoff);
  595. }
  596. else if (event_occurred(prp, EVENT_BACKOFF_EXPIRED))
  597. {
  598. rc = windows_acquire_replica(prp, &ruv, 1 /* check RUV for incremental */);
  599. use_busy_backoff_timer = PR_FALSE;
  600. if (rc == ACQUIRE_SUCCESS)
  601. {
  602. next_state = STATE_SENDING_UPDATES;
  603. }
  604. else if (rc == ACQUIRE_REPLICA_BUSY)
  605. {
  606. next_state = STATE_BACKOFF;
  607. use_busy_backoff_timer = PR_TRUE;
  608. }
  609. else if (rc == ACQUIRE_CONSUMER_WAS_UPTODATE)
  610. {
  611. next_state = STATE_WAIT_CHANGES;
  612. }
  613. else if (rc == ACQUIRE_TRANSIENT_ERROR)
  614. {
  615. next_state = STATE_BACKOFF;
  616. }
  617. else if (rc == ACQUIRE_FATAL_ERROR)
  618. {
  619. next_state = STATE_STOP_FATAL_ERROR;
  620. }
  621. if (rc != ACQUIRE_SUCCESS)
  622. {
  623. int optype, ldaprc;
  624. windows_conn_get_error(prp->conn, &optype, &ldaprc);
  625. agmt_set_last_update_status(prp->agmt, ldaprc,
  626. prp->last_acquire_response_code, NULL);
  627. }
  628. /*
  629. * We either need to step the backoff timer, or
  630. * destroy it if we don't need it anymore.
  631. */
  632. if (STATE_BACKOFF == next_state)
  633. {
  634. time_t next_fire_time;
  635. time_t now;
  636. /* Step the backoff timer */
  637. time(&now);
  638. next_fire_time = backoff_step(prp_priv->backoff);
  639. /* And go back to sleep */
  640. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  641. "%s: Replication session backing off for %ld seconds\n",
  642. agmt_get_long_name(prp->agmt),
  643. next_fire_time - now);
  644. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  645. }
  646. else
  647. {
  648. /* Destroy the backoff timer, since we won't need it anymore */
  649. backoff_delete(&prp_priv->backoff);
  650. }
  651. }
  652. else if (event_occurred(prp, EVENT_TRIGGERING_CRITERIA_MET))
  653. {
  654. /* changes are available */
  655. if ( prp_priv->backoff == NULL || backoff_expired (prp_priv->backoff, 60) )
  656. {
  657. /*
  658. * Have seen cases that the agmt stuck here forever since
  659. * somehow the backoff timer was not in event queue anymore.
  660. * If the backoff timer has expired more than 60 seconds,
  661. * destroy it.
  662. */
  663. if ( prp_priv->backoff )
  664. backoff_delete(&prp_priv->backoff);
  665. next_state = STATE_READY_TO_ACQUIRE;
  666. }
  667. else
  668. {
  669. /* ignore changes and go to sleep */
  670. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  671. }
  672. }
  673. else if (event_occurred(prp, EVENT_WINDOW_OPENED))
  674. {
  675. /* this should never happen - log an error and go to sleep */
  676. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name, "%s: Incremental protocol: "
  677. "event %s should not occur in state %s; going to sleep\n",
  678. agmt_get_long_name(prp->agmt),
  679. event2name(EVENT_WINDOW_OPENED), state2name(current_state));
  680. protocol_sleep(prp, PR_INTERVAL_NO_TIMEOUT);
  681. }
  682. break;
  683. case STATE_SENDING_UPDATES:
  684. dev_debug("windows_inc_run(STATE_SENDING_UPDATES)");
  685. agmt_set_update_in_progress(prp->agmt, PR_TRUE);
  686. num_changes_sent = 0;
  687. last_start_time = current_time();
  688. agmt_set_last_update_start(prp->agmt, last_start_time);
  689. /*
  690. * We've acquired the replica, and are ready to send any
  691. * needed updates.
  692. */
  693. if (PROTOCOL_IS_SHUTDOWN(prp))
  694. {
  695. windows_release_replica (prp);
  696. done = 1;
  697. agmt_set_update_in_progress(prp->agmt, PR_FALSE);
  698. agmt_set_last_update_end(prp->agmt, current_time());
  699. /* MAB: I don't find the following status correct. How do we know it has
  700. been stopped by an admin and not by a total update request, for instance?
  701. In any case, how is this protocol shutdown situation different from all the
  702. other ones that are present in this state machine? */
  703. /* richm: We at least need to let monitors know that the protocol has been
  704. shutdown - maybe they can figure out why */
  705. agmt_set_last_update_status(prp->agmt, 0, 0, "Protocol stopped");
  706. agmt_update_done(prp->agmt, 0);
  707. break;
  708. }
  709. agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update started");
  710. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> windows_examine_update_vector");
  711. rc = windows_examine_update_vector(prp, ruv);
  712. /*
  713. * Decide what to do next - proceed with incremental,
  714. * backoff, or total update
  715. */
  716. switch (rc)
  717. {
  718. case EXAMINE_RUV_PARAM_ERROR:
  719. /* this is really bad - we have NULL prp! */
  720. next_state = STATE_STOP_FATAL_ERROR;
  721. break;
  722. case EXAMINE_RUV_PRISTINE_REPLICA:
  723. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  724. "%s: Replica has no update vector. It has never been initialized.\n",
  725. agmt_get_long_name(prp->agmt));
  726. next_state = STATE_BACKOFF_START;
  727. break;
  728. case EXAMINE_RUV_GENERATION_MISMATCH:
  729. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  730. "%s: The remote replica has a different database generation ID than "
  731. "the local database. You may have to reinitialize the remote replica, "
  732. "or the local replica.\n", agmt_get_long_name(prp->agmt));
  733. next_state = STATE_BACKOFF_START;
  734. break;
  735. case EXAMINE_RUV_REPLICA_TOO_OLD:
  736. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  737. "%s: Replica update vector is too out of date to bring "
  738. "into sync using the incremental protocol. The replica "
  739. "must be reinitialized.\n", agmt_get_long_name(prp->agmt));
  740. next_state = STATE_BACKOFF_START;
  741. break;
  742. case EXAMINE_RUV_OK:
  743. /* update our csn generator state with the consumer's ruv data */
  744. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> windows_examine_update_vector OK");
  745. object_acquire(prp->replica_object);
  746. replica = object_get_data(prp->replica_object);
  747. rc = replica_update_csngen_state (replica, ruv);
  748. object_release (prp->replica_object);
  749. replica = NULL;
  750. if (rc == CSN_LIMIT_EXCEEDED) /* too much skew */
  751. {
  752. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  753. "%s: Incremental protocol: fatal error - too much time skew between replicas!\n",
  754. agmt_get_long_name(prp->agmt));
  755. next_state = STATE_STOP_FATAL_ERROR;
  756. }
  757. else if (rc != 0) /* internal error */
  758. {
  759. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  760. "%s: Incremental protocol: fatal internal error updating the CSN generator!\n",
  761. agmt_get_long_name(prp->agmt));
  762. next_state = STATE_STOP_FATAL_ERROR;
  763. }
  764. else
  765. {
  766. /* call begin incremental update callback */
  767. winsync_plugin_call_begin_update_cb(prp->agmt,
  768. windows_private_get_directory_subtree(prp->agmt),
  769. windows_private_get_windows_subtree(prp->agmt),
  770. 0 /* is_total == FALSE */);
  771. if ((one_way == ONE_WAY_SYNC_DISABLED) || (one_way == ONE_WAY_SYNC_TO_AD)) {
  772. rc = send_updates(prp, ruv, &num_changes_sent, 1 /* actually send updates */);
  773. } else {
  774. /* We call send_updates to fast-forward the RUV
  775. * without actually sending any updates. */
  776. rc = send_updates(prp, ruv, &num_changes_sent, 0 /* don't send updates */);
  777. }
  778. if (rc == UPDATE_NO_MORE_UPDATES)
  779. {
  780. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_NO_MORE_UPDATES -> STATE_WAIT_CHANGES");
  781. agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update succeeded");
  782. next_state = STATE_WAIT_CHANGES;
  783. }
  784. else if (rc == UPDATE_YIELD)
  785. {
  786. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_YIELD -> STATE_BACKOFF_START");
  787. agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update succeeded and yielded");
  788. next_state = STATE_BACKOFF_START;
  789. }
  790. else if (rc == UPDATE_TRANSIENT_ERROR)
  791. {
  792. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_TRANSIENT_ERROR -> STATE_BACKOFF_START");
  793. next_state = STATE_BACKOFF_START;
  794. }
  795. else if (rc == UPDATE_FATAL_ERROR)
  796. {
  797. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_FATAL_ERROR -> STATE_STOP_FATAL_ERROR");
  798. next_state = STATE_STOP_FATAL_ERROR;
  799. }
  800. else if (rc == UPDATE_SCHEDULE_WINDOW_CLOSED)
  801. {
  802. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_SCHEDULE_WINDOW_CLOSED -> STATE_WAIT_WINDOW_OPEN");
  803. /* ONREPL - I don't think we should check this. We might be
  804. here because of replicate_now event - so we don't care
  805. about the schedule */
  806. next_state = STATE_WAIT_WINDOW_OPEN;
  807. /* ONREPL - do we need to release the replica here ? */
  808. windows_conn_disconnect (prp->conn);
  809. }
  810. else if (rc == UPDATE_CONNECTION_LOST)
  811. {
  812. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_CONNECTION_LOST -> STATE_BACKOFF_START");
  813. next_state = STATE_BACKOFF_START;
  814. }
  815. else if (rc == UPDATE_TIMEOUT)
  816. {
  817. dev_debug("windows_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_TIMEOUT -> STATE_BACKOFF_START");
  818. next_state = STATE_BACKOFF_START;
  819. }
  820. }
  821. last_start_time = 0UL;
  822. break;
  823. }
  824. if ( run_dirsync && ((one_way == ONE_WAY_SYNC_DISABLED) || (one_way == ONE_WAY_SYNC_FROM_AD)))
  825. {
  826. /* Don't run dirsync if we encountered
  827. * an error when sending updates to AD. */
  828. if (rc == UPDATE_NO_MORE_UPDATES) {
  829. windows_dirsync_inc_run(prp);
  830. windows_private_save_dirsync_cookie(prp->agmt);
  831. }
  832. run_dirsync = PR_FALSE;
  833. }
  834. agmt_set_last_update_end(prp->agmt, current_time());
  835. agmt_set_update_in_progress(prp->agmt, PR_FALSE);
  836. agmt_update_done(prp->agmt, 0);
  837. /* If timed out, close the connection after released the replica */
  838. windows_release_replica(prp);
  839. if (rc == UPDATE_TIMEOUT) {
  840. windows_conn_disconnect(prp->conn);
  841. }
  842. /* call end incremental update callback */
  843. winsync_plugin_call_end_update_cb(prp->agmt,
  844. windows_private_get_directory_subtree(prp->agmt),
  845. windows_private_get_windows_subtree(prp->agmt),
  846. 0 /* is_total == FALSE */);
  847. if (rc == UPDATE_NO_MORE_UPDATES && num_changes_sent > 0)
  848. {
  849. if (pausetime > 0)
  850. {
  851. /* richm - 20020219 - If we have acquired the consumer, and another master has gone
  852. into backoff waiting for us to release it, we may acquire the replica sooner
  853. than the other master has a chance to, and the other master may not be able
  854. to acquire the consumer for a long time (hours, days?) if this server is
  855. under a heavy load (see reliab06 et. al. system tests)
  856. So, this sleep gives the other master(s) a chance to acquire the consumer
  857. replica */
  858. long loops = pausetime;
  859. /* the while loop is so that we don't just sleep and sleep if an
  860. event comes in that we should handle immediately (like shutdown) */
  861. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  862. "%s: Pausing updates for %ld seconds to allow other suppliers to update consumer\n",
  863. agmt_get_long_name(prp->agmt), pausetime);
  864. while (loops-- && !(PROTOCOL_IS_SHUTDOWN(prp)))
  865. {
  866. DS_Sleep(PR_SecondsToInterval(1));
  867. }
  868. }
  869. else if (num_changes_sent > 10)
  870. {
  871. /* wait for consumer to write its ruv if the replication was busy */
  872. /* When asked, consumer sends its ruv in cache to the supplier. */
  873. /* DS_Sleep ( PR_SecondsToInterval(1) ); */
  874. }
  875. }
  876. break;
  877. case STATE_STOP_FATAL_ERROR:
  878. /*
  879. * We encountered some sort of a fatal error. Suspend.
  880. */
  881. /* XXXggood update state in replica */
  882. agmt_set_last_update_status(prp->agmt, -1, 0, "Incremental update has failed and requires administrator action");
  883. dev_debug("windows_inc_run(STATE_STOP_FATAL_ERROR)");
  884. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  885. "%s: Incremental update failed and requires administrator action\n",
  886. agmt_get_long_name(prp->agmt));
  887. next_state = STATE_STOP_FATAL_ERROR_PART2;
  888. break;
  889. case STATE_STOP_FATAL_ERROR_PART2:
  890. if (PROTOCOL_IS_SHUTDOWN(prp))
  891. {
  892. done = 1;
  893. break;
  894. }
  895. /* MAB: This state is the FATAL state where we are supposed to get
  896. as a result of a FATAL error on send_updates. But, as bug
  897. states, send_updates was always returning TRANSIENT errors and never
  898. FATAL... In other words, this code has never been tested before...
  899. As of 01/16/01, this piece of code was in a very dangerous state. In particular,
  900. 1) it does not catch any events
  901. 2) it is a terminal state (once reached it never transitions to a different state)
  902. Both things combined make this state to become a consuming infinite loop
  903. that is useless after all (we are in a fatal place requiring manual admin jobs */
  904. /* MAB: The following lines fix problem number 1 above... When the code gets
  905. into this state, it should only get a chance to get out of it by an
  906. EVENT_AGMT_CHANGED event... All other events should be ignored */
  907. else if (event_occurred(prp, EVENT_AGMT_CHANGED))
  908. {
  909. dev_debug("windows_inc_run(STATE_STOP_FATAL_ERROR): EVENT_AGMT_CHANGED received\n");
  910. /* Chance to recover for the EVENT_AGMT_CHANGED event.
  911. This is not mandatory, but fixes problem 2 above */
  912. next_state = STATE_STOP_NORMAL_TERMINATION;
  913. }
  914. else
  915. {
  916. dev_debug("windows_inc_run(STATE_STOP_FATAL_ERROR): Event received. Clearing it\n");
  917. reset_events (prp);
  918. }
  919. protocol_sleep (prp, PR_INTERVAL_NO_TIMEOUT);
  920. break;
  921. case STATE_STOP_NORMAL_TERMINATION:
  922. /*
  923. * We encountered some sort of a fatal error. Return.
  924. */
  925. /* XXXggood update state in replica */
  926. dev_debug("windows_inc_run(STATE_STOP_NORMAL_TERMINATION)");
  927. done = 1;
  928. break;
  929. }
  930. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  931. "%s: State: %s -> %s\n",
  932. agmt_get_long_name(prp->agmt),
  933. state2name(current_state), state2name(next_state));
  934. current_state = next_state;
  935. } while (!done);
  936. /* remove_protocol_callbacks(prp); */
  937. prp->stopped = 1;
  938. /* Cancel any linger timer that might be in effect... */
  939. windows_conn_cancel_linger(prp->conn);
  940. /* ... and disconnect, if currently connected */
  941. windows_conn_disconnect(prp->conn);
  942. ruv_destroy ( &ruv );
  943. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_run\n" );
  944. }
  945. /*
  946. * Go to sleep until awakened.
  947. */
  948. static void
  949. protocol_sleep(Private_Repl_Protocol *prp, PRIntervalTime duration)
  950. {
  951. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> protocol_sleep\n" );
  952. PR_ASSERT(NULL != prp);
  953. PR_Lock(prp->lock);
  954. /* we should not go to sleep if there are events available to be processed.
  955. Otherwise, we can miss the event that suppose to wake us up */
  956. if (prp->eventbits == 0)
  957. PR_WaitCondVar(prp->cvar, duration);
  958. else
  959. {
  960. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  961. "%s: Incremental protocol: can't go to sleep: event bits - %x\n",
  962. agmt_get_long_name(prp->agmt), prp->eventbits);
  963. }
  964. PR_Unlock(prp->lock);
  965. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= protocol_sleep\n" );
  966. }
  967. /*
  968. * Notify the protocol about some event. Signal the condition
  969. * variable in case the protocol is sleeping. Multiple occurences
  970. * of a single event type are not remembered (e.g. no stack
  971. * of events is maintained).
  972. */
  973. static void
  974. event_notify(Private_Repl_Protocol *prp, PRUint32 event)
  975. {
  976. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> event_notify\n" );
  977. PR_ASSERT(NULL != prp);
  978. PR_Lock(prp->lock);
  979. prp->eventbits |= event;
  980. PR_NotifyCondVar(prp->cvar);
  981. PR_Unlock(prp->lock);
  982. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= event_notify\n" );
  983. }
  984. /*
  985. * Test to see if an event occurred. The event is cleared when
  986. * read.
  987. */
  988. static PRUint32
  989. event_occurred(Private_Repl_Protocol *prp, PRUint32 event)
  990. {
  991. PRUint32 return_value;
  992. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> event_occurred\n" );
  993. PR_ASSERT(NULL != prp);
  994. PR_Lock(prp->lock);
  995. return_value = (prp->eventbits & event);
  996. prp->eventbits &= ~event; /* Clear event */
  997. PR_Unlock(prp->lock);
  998. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= event_occurred\n" );
  999. return return_value;
  1000. }
  1001. static void
  1002. reset_events (Private_Repl_Protocol *prp)
  1003. {
  1004. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> reset_events\n" );
  1005. PR_ASSERT(NULL != prp);
  1006. PR_Lock(prp->lock);
  1007. prp->eventbits = 0;
  1008. PR_Unlock(prp->lock);
  1009. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= reset_events\n" );
  1010. }
  1011. static PRBool
  1012. is_dummy_operation (const slapi_operation_parameters *op)
  1013. {
  1014. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> is_dummy_operation\n" );
  1015. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= is_dummy_operation\n" );
  1016. return (strcmp (op->target_address.uniqueid, START_ITERATION_ENTRY_UNIQUEID) == 0);
  1017. }
  1018. void
  1019. w_cl5_operation_parameters_done (struct slapi_operation_parameters *sop)
  1020. {
  1021. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> w_cl5_operation_parameters_done\n" );
  1022. if(sop!=NULL) {
  1023. switch(sop->operation_type)
  1024. {
  1025. case SLAPI_OPERATION_BIND:
  1026. slapi_ch_free((void **)&(sop->p.p_bind.bind_saslmechanism));
  1027. if (sop->p.p_bind.bind_creds)
  1028. ber_bvecfree((struct berval**)&(sop->p.p_bind.bind_creds));
  1029. if (sop->p.p_bind.bind_ret_saslcreds)
  1030. ber_bvecfree((struct berval**)&(sop->p.p_bind.bind_ret_saslcreds));
  1031. sop->p.p_bind.bind_creds = NULL;
  1032. sop->p.p_bind.bind_ret_saslcreds = NULL;
  1033. break;
  1034. case SLAPI_OPERATION_COMPARE:
  1035. ava_done((struct ava *)&(sop->p.p_compare.compare_ava));
  1036. break;
  1037. case SLAPI_OPERATION_SEARCH:
  1038. slapi_ch_free((void **)&(sop->p.p_search.search_strfilter));
  1039. charray_free(sop->p.p_search.search_attrs);
  1040. slapi_filter_free(sop->p.p_search.search_filter,1);
  1041. break;
  1042. case SLAPI_OPERATION_MODRDN:
  1043. sop->p.p_modrdn.modrdn_deloldrdn = 0;
  1044. break;
  1045. case SLAPI_OPERATION_EXTENDED:
  1046. slapi_ch_free((void **)&(sop->p.p_extended.exop_oid));
  1047. if (sop->p.p_extended.exop_value)
  1048. ber_bvecfree((struct berval**)&(sop->p.p_extended.exop_value));
  1049. sop->p.p_extended.exop_value = NULL;
  1050. break;
  1051. default:
  1052. break;
  1053. }
  1054. }
  1055. operation_parameters_done(sop);
  1056. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= w_cl5_operation_parameters_done\n" );
  1057. }
  1058. /*
  1059. * Send a set of updates to the replica. Assumes that (1) the replica
  1060. * has already been acquired, (2) that the consumer's update vector has
  1061. * been checked and (3) that it's ok to send incremental updates.
  1062. * Returns:
  1063. * UPDATE_NO_MORE_UPDATES - all updates were sent succussfully
  1064. * UPDATE_TRANSIENT_ERROR - some non-permanent error occurred. Try again later.
  1065. * UPDATE_FATAL_ERROR - some bad, permanent error occurred.
  1066. * UPDATE_SCHEDULE_WINDOW_CLOSED - the schedule window closed on us.
  1067. */
  1068. static int
  1069. send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *num_changes_sent, int do_send)
  1070. {
  1071. CL5Entry entry;
  1072. slapi_operation_parameters op;
  1073. int return_value = UPDATE_NO_MORE_UPDATES;
  1074. int rc;
  1075. int set_mincsn = 0;
  1076. CL5ReplayIterator *changelog_iterator = NULL;
  1077. RUV *current_ruv = ruv_dup(remote_update_vector);
  1078. CSN *mincsn = NULL;
  1079. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> send_updates\n" );
  1080. *num_changes_sent = 0;
  1081. /* Check if the min csn is set in our RUV to see if we need to set it below. */
  1082. ruv_get_min_csn(current_ruv, &mincsn);
  1083. if (!mincsn) {
  1084. set_mincsn = 1;
  1085. } else {
  1086. csn_free(&mincsn);
  1087. }
  1088. /*
  1089. * Iterate over the changelog. Retrieve each update,
  1090. * construct an appropriate LDAP operation,
  1091. * attaching the CSN, and send the change.
  1092. */
  1093. rc = cl5CreateReplayIteratorEx( prp, remote_update_vector, &changelog_iterator, agmt_get_consumerRID(prp->agmt));
  1094. if (CL5_SUCCESS != rc)
  1095. {
  1096. switch (rc)
  1097. {
  1098. case CL5_BAD_DATA: /* invalid parameter passed to the function */
  1099. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1100. "%s: Invalid parameter passed to cl5CreateReplayIterator\n",
  1101. agmt_get_long_name(prp->agmt));
  1102. return_value = UPDATE_FATAL_ERROR;
  1103. break;
  1104. case CL5_BAD_FORMAT: /* db data has unexpected format */
  1105. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1106. "%s: Unexpected format encountered in changelog database\n",
  1107. agmt_get_long_name(prp->agmt));
  1108. return_value = UPDATE_FATAL_ERROR;
  1109. break;
  1110. case CL5_BAD_STATE: /* changelog is in an incorrect state for attempted operation */
  1111. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1112. "%s: Changelog database was in an incorrect state\n",
  1113. agmt_get_long_name(prp->agmt));
  1114. return_value = UPDATE_FATAL_ERROR;
  1115. break;
  1116. case CL5_BAD_DBVERSION: /* changelog has invalid dbversion */
  1117. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1118. "%s: Incorrect dbversion found in changelog database\n",
  1119. agmt_get_long_name(prp->agmt));
  1120. return_value = UPDATE_FATAL_ERROR;
  1121. break;
  1122. case CL5_DB_ERROR: /* database error */
  1123. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1124. "%s: A changelog database error was encountered\n",
  1125. agmt_get_long_name(prp->agmt));
  1126. return_value = UPDATE_FATAL_ERROR;
  1127. break;
  1128. case CL5_NOTFOUND: /* we have no changes to send */
  1129. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  1130. "%s: No changes to send\n",
  1131. agmt_get_long_name(prp->agmt));
  1132. return_value = UPDATE_NO_MORE_UPDATES;
  1133. break;
  1134. case CL5_MEMORY_ERROR: /* memory allocation failed */
  1135. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1136. "%s: Memory allocation error occurred\n",
  1137. agmt_get_long_name(prp->agmt));
  1138. return_value = UPDATE_FATAL_ERROR;
  1139. break;
  1140. case CL5_SYSTEM_ERROR: /* NSPR error occurred: use PR_GetError for furhter info */
  1141. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1142. "%s: An NSPR error (%d) occurred\n",
  1143. agmt_get_long_name(prp->agmt), PR_GetError());
  1144. return_value = UPDATE_TRANSIENT_ERROR;
  1145. break;
  1146. case CL5_CSN_ERROR: /* CSN API failed */
  1147. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1148. "%s: A CSN API failure was encountered\n",
  1149. agmt_get_long_name(prp->agmt));
  1150. return_value = UPDATE_TRANSIENT_ERROR;
  1151. break;
  1152. case CL5_RUV_ERROR: /* RUV API failed */
  1153. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1154. "%s: An RUV API failure occurred\n",
  1155. agmt_get_long_name(prp->agmt));
  1156. return_value = UPDATE_TRANSIENT_ERROR;
  1157. break;
  1158. case CL5_OBJSET_ERROR: /* namedobjset api failed */
  1159. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1160. "%s: A namedobject API failure occurred\n",
  1161. agmt_get_long_name(prp->agmt));
  1162. return_value = UPDATE_TRANSIENT_ERROR;
  1163. break;
  1164. case CL5_PURGED_DATA: /* requested data has been purged */
  1165. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1166. "%s: Data required to update replica has been purged. "
  1167. "The replica must be reinitialized.\n",
  1168. agmt_get_long_name(prp->agmt));
  1169. return_value = UPDATE_FATAL_ERROR;
  1170. break;
  1171. case CL5_MISSING_DATA: /* data should be in the changelog, but is missing */
  1172. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1173. "%s: Missing data encountered\n",
  1174. agmt_get_long_name(prp->agmt));
  1175. return_value = UPDATE_FATAL_ERROR;
  1176. break;
  1177. case CL5_UNKNOWN_ERROR: /* unclassified error */
  1178. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1179. "%s: An unknown error was ecountered\n",
  1180. agmt_get_long_name(prp->agmt));
  1181. return_value = UPDATE_TRANSIENT_ERROR;
  1182. break;
  1183. default:
  1184. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1185. "%s: An unknown error (%d) occurred "
  1186. "(cl5CreateReplayIterator)\n",
  1187. agmt_get_long_name(prp->agmt), rc);
  1188. return_value = UPDATE_TRANSIENT_ERROR;
  1189. }
  1190. }
  1191. else
  1192. {
  1193. int finished = 0;
  1194. ConnResult replay_crc;
  1195. char csn_str[CSN_STRSIZE];
  1196. memset ( (void*)&op, 0, sizeof (op) );
  1197. entry.op = &op;
  1198. do {
  1199. int mark_record_done = 0;
  1200. w_cl5_operation_parameters_done ( entry.op );
  1201. memset ( (void*)entry.op, 0, sizeof (op) );
  1202. rc = cl5GetNextOperationToReplay(changelog_iterator, &entry);
  1203. switch (rc)
  1204. {
  1205. case CL5_SUCCESS:
  1206. /* check that we don't return dummy entries */
  1207. if (is_dummy_operation (entry.op))
  1208. {
  1209. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1210. "%s: changelog iteration code returned a dummy entry with csn %s, "
  1211. "skipping ...\n",
  1212. agmt_get_long_name(prp->agmt), csn_as_string(entry.op->csn, PR_FALSE, csn_str));
  1213. continue;
  1214. }
  1215. /* This is where the work actually happens: */
  1216. if (do_send) {
  1217. replay_crc = windows_replay_update(prp, entry.op);
  1218. } else {
  1219. /* Skip sending the updates and just set a success code. */
  1220. replay_crc = CONN_OPERATION_SUCCESS;
  1221. }
  1222. if (CONN_OPERATION_SUCCESS != replay_crc)
  1223. {
  1224. int operation, error;
  1225. windows_conn_get_error(prp->conn, &operation, &error);
  1226. csn_as_string(entry.op->csn, PR_FALSE, csn_str);
  1227. /* Figure out what to do next */
  1228. if (CONN_OPERATION_FAILED == replay_crc)
  1229. {
  1230. /* Map ldap error code to return value */
  1231. if (!windows_ignore_error_and_keep_going(error))
  1232. {
  1233. return_value = UPDATE_TRANSIENT_ERROR;
  1234. finished = 1;
  1235. }
  1236. else
  1237. {
  1238. agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/);
  1239. mark_record_done = 1;
  1240. }
  1241. slapi_log_error(finished ? SLAPI_LOG_FATAL : slapi_log_urp, windows_repl_plugin_name,
  1242. "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s. %s.\n",
  1243. agmt_get_long_name(prp->agmt),
  1244. entry.op->target_address.uniqueid, csn_str,
  1245. ldap_err2string(error),
  1246. finished ? "Will retry later" : "Skipping");
  1247. }
  1248. else if (CONN_NOT_CONNECTED == replay_crc)
  1249. {
  1250. /* We lost the connection - enter backoff state */
  1251. return_value = UPDATE_TRANSIENT_ERROR;
  1252. finished = 1;
  1253. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1254. "%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
  1255. "%s. Will retry later.\n",
  1256. agmt_get_long_name(prp->agmt),
  1257. entry.op->target_address.uniqueid, csn_str,
  1258. error ? ldap_err2string(error) : "Connection lost");
  1259. }
  1260. else if (CONN_TIMEOUT == replay_crc)
  1261. {
  1262. return_value = UPDATE_TIMEOUT;
  1263. finished = 1;
  1264. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1265. "%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
  1266. "%s.\n",
  1267. agmt_get_long_name(prp->agmt),
  1268. entry.op->target_address.uniqueid, csn_str,
  1269. error ? ldap_err2string(error) : "Timeout");
  1270. }
  1271. else if (CONN_LOCAL_ERROR == replay_crc)
  1272. {
  1273. /*
  1274. * Something bad happened on the local server - enter
  1275. * backoff state.
  1276. */
  1277. return_value = UPDATE_TRANSIENT_ERROR;
  1278. finished = 1;
  1279. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1280. "%s: Failed to replay change (uniqueid %s, CSN %s): "
  1281. "Local error. Will retry later.\n",
  1282. agmt_get_long_name(prp->agmt),
  1283. entry.op->target_address.uniqueid, csn_str);
  1284. }
  1285. }
  1286. else
  1287. {
  1288. /* Positive response received */
  1289. (*num_changes_sent)++;
  1290. agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 0 /*replayed*/);
  1291. mark_record_done = 1;
  1292. }
  1293. if (mark_record_done)
  1294. {
  1295. /* If this is the very first change being sent,
  1296. * it's possible that we haven't set a min csn
  1297. * in the RUV yet. This is possible because we
  1298. * simply copy the supplier RUV during the total
  1299. * update process. The supplier RUV will not have
  1300. * a min or max csn set if no changes have ever
  1301. * been written to it's changelog. We need to set
  1302. * the min csn for the consumer here to prevent
  1303. * problems with further sync operations. */
  1304. if (set_mincsn) {
  1305. ruv_set_min_csn(current_ruv, entry.op->csn, NULL);
  1306. set_mincsn = 0;
  1307. }
  1308. /* Bring the consumers (AD) RUV up to date.
  1309. * This sets the max csn. */
  1310. ruv_force_csn_update(current_ruv, entry.op->csn);
  1311. }
  1312. break;
  1313. case CL5_BAD_DATA:
  1314. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1315. "%s: Invalid parameter passed to cl5GetNextOperationToReplay\n",
  1316. agmt_get_long_name(prp->agmt));
  1317. return_value = UPDATE_FATAL_ERROR;
  1318. finished = 1;
  1319. break;
  1320. case CL5_NOTFOUND:
  1321. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  1322. "%s: No more updates to send (cl5GetNextOperationToReplay)\n",
  1323. agmt_get_long_name(prp->agmt));
  1324. return_value = UPDATE_NO_MORE_UPDATES;
  1325. finished = 1;
  1326. break;
  1327. case CL5_DB_ERROR:
  1328. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1329. "%s: A database error occurred (cl5GetNextOperationToReplay)\n",
  1330. agmt_get_long_name(prp->agmt));
  1331. return_value = UPDATE_FATAL_ERROR;
  1332. finished = 1;
  1333. break;
  1334. case CL5_BAD_FORMAT:
  1335. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1336. "%s: A malformed changelog entry was encountered (cl5GetNextOperationToReplay)\n",
  1337. agmt_get_long_name(prp->agmt));
  1338. break;
  1339. case CL5_MEMORY_ERROR:
  1340. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1341. "%s: A memory allocation error occurred (cl5GetNextOperationToRepla)\n",
  1342. agmt_get_long_name(prp->agmt));
  1343. return_value = UPDATE_FATAL_ERROR;
  1344. break;
  1345. default:
  1346. slapi_log_error(SLAPI_LOG_FATAL, windows_repl_plugin_name,
  1347. "%s: Unknown error code (%d) returned from cl5GetNextOperationToReplay\n",
  1348. agmt_get_long_name(prp->agmt), rc);
  1349. return_value = UPDATE_TRANSIENT_ERROR;
  1350. break;
  1351. }
  1352. /* Check for protocol shutdown */
  1353. if (prp->terminate)
  1354. {
  1355. return_value = UPDATE_NO_MORE_UPDATES;
  1356. finished = 1;
  1357. }
  1358. if (*num_changes_sent >= MAX_CHANGES_PER_SESSION)
  1359. {
  1360. return_value = UPDATE_YIELD;
  1361. finished = 1;
  1362. }
  1363. } while (!finished);
  1364. w_cl5_operation_parameters_done ( entry.op );
  1365. cl5DestroyReplayIterator(&changelog_iterator);
  1366. }
  1367. /* Save the RUV that we successfully replayed, this ensures that next time we start off at the next changelog record */
  1368. if (current_ruv)
  1369. {
  1370. agmt_set_consumer_ruv(prp->agmt,current_ruv);
  1371. ruv_destroy(&current_ruv);
  1372. }
  1373. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= send_updates\n" );
  1374. return return_value;
  1375. }
  1376. /*
  1377. * XXXggood this should probably be in the superclass, since the full update
  1378. * protocol is going to need it too.
  1379. */
  1380. static int
  1381. windows_inc_stop(Private_Repl_Protocol *prp)
  1382. {
  1383. int return_value;
  1384. PRIntervalTime start, maxwait, now;
  1385. int seconds = 1200;
  1386. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_stop\n" );
  1387. maxwait = PR_SecondsToInterval(seconds);
  1388. prp->terminate = 1;
  1389. event_notify(prp, EVENT_PROTOCOL_SHUTDOWN);
  1390. start = PR_IntervalNow();
  1391. now = start;
  1392. while (!prp->stopped && ((now - start) < maxwait))
  1393. {
  1394. DS_Sleep(PR_SecondsToInterval(1));
  1395. now = PR_IntervalNow();
  1396. }
  1397. if (!prp->stopped)
  1398. {
  1399. /* Isn't listening. Do something drastic. */
  1400. return_value = -1;
  1401. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  1402. "%s: windows_inc_stop: protocol does not stop after %d seconds\n",
  1403. agmt_get_long_name(prp->agmt), seconds);
  1404. }
  1405. else
  1406. {
  1407. return_value = 0;
  1408. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  1409. "%s: windows_inc_stop: protocol stopped after %d seconds\n",
  1410. agmt_get_long_name(prp->agmt),
  1411. PR_IntervalToSeconds(now-start));
  1412. }
  1413. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_stop\n" );
  1414. return return_value;
  1415. }
  1416. static int
  1417. windows_inc_status(Private_Repl_Protocol *prp)
  1418. {
  1419. int return_value = 0;
  1420. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_status\n" );
  1421. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_status\n" );
  1422. return return_value;
  1423. }
  1424. static void
  1425. windows_inc_notify_update(Private_Repl_Protocol *prp)
  1426. {
  1427. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_notify_update\n" );
  1428. event_notify(prp, EVENT_TRIGGERING_CRITERIA_MET);
  1429. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_notify_update\n" );
  1430. }
  1431. static void
  1432. windows_inc_update_now(Private_Repl_Protocol *prp)
  1433. {
  1434. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_update_now\n" );
  1435. event_notify(prp, EVENT_REPLICATE_NOW);
  1436. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_update_now\n" );
  1437. }
  1438. static void
  1439. windows_inc_notify_agmt_changed(Private_Repl_Protocol *prp)
  1440. {
  1441. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_notify_agmt_changed\n" );
  1442. event_notify(prp, EVENT_AGMT_CHANGED);
  1443. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_notify_agmt_changed\n" );
  1444. }
  1445. static void
  1446. windows_inc_notify_window_opened (Private_Repl_Protocol *prp)
  1447. {
  1448. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_notify_window_opened\n" );
  1449. event_notify(prp, EVENT_WINDOW_OPENED);
  1450. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_notify_window_opened\n" );
  1451. }
  1452. static void
  1453. windows_inc_notify_window_closed (Private_Repl_Protocol *prp)
  1454. {
  1455. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_notify_window_closed\n" );
  1456. event_notify(prp, EVENT_WINDOW_CLOSED);
  1457. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_notify_window_closed\n" );
  1458. }
  1459. Private_Repl_Protocol *
  1460. Windows_Inc_Protocol_new(Repl_Protocol *rp)
  1461. {
  1462. windows_inc_private *rip = NULL;
  1463. Private_Repl_Protocol *prp = (Private_Repl_Protocol *)slapi_ch_calloc(1, sizeof(Private_Repl_Protocol));
  1464. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> Windows_Inc_Protocol_new\n" );
  1465. prp->delete = windows_inc_delete;
  1466. prp->run = windows_inc_run;
  1467. prp->stop = windows_inc_stop;
  1468. prp->status = windows_inc_status;
  1469. prp->notify_update = windows_inc_notify_update;
  1470. prp->notify_agmt_changed = windows_inc_notify_agmt_changed;
  1471. prp->notify_window_opened = windows_inc_notify_window_opened;
  1472. prp->notify_window_closed = windows_inc_notify_window_closed;
  1473. prp->update_now = windows_inc_update_now;
  1474. prp->replica_object = prot_get_replica_object(rp);
  1475. if ((prp->lock = PR_NewLock()) == NULL)
  1476. {
  1477. goto loser;
  1478. }
  1479. if ((prp->cvar = PR_NewCondVar(prp->lock)) == NULL)
  1480. {
  1481. goto loser;
  1482. }
  1483. prp->stopped = 0;
  1484. prp->terminate = 0;
  1485. prp->eventbits = 0;
  1486. prp->conn = prot_get_connection(rp);
  1487. prp->agmt = prot_get_agreement(rp);
  1488. prp->last_acquire_response_code = NSDS50_REPL_REPLICA_READY;
  1489. rip = (void *)slapi_ch_malloc(sizeof(windows_inc_private));
  1490. rip->ruv = NULL;
  1491. rip->backoff = NULL;
  1492. rip->rp = rp;
  1493. prp->private = (void *)rip;
  1494. prp->replica_acquired = PR_FALSE;
  1495. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= Windows_Inc_Protocol_new\n" );
  1496. return prp;
  1497. loser:
  1498. windows_inc_delete(&prp);
  1499. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= Windows_Inc_Protocol_new (loser)\n" );
  1500. return NULL;
  1501. }
  1502. static void
  1503. windows_inc_backoff_expired(time_t timer_fire_time, void *arg)
  1504. {
  1505. Private_Repl_Protocol *prp = (Private_Repl_Protocol *)arg;
  1506. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_inc_backoff_expired\n" );
  1507. PR_ASSERT(NULL != prp);
  1508. event_notify(prp, EVENT_BACKOFF_EXPIRED);
  1509. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_inc_backoff_expired\n" );
  1510. }
  1511. /*
  1512. * Examine the update vector and determine our course of action.
  1513. * There are 3 different possibilities, plus a catch-all error:
  1514. * 1 - no update vector (ruv is NULL). The consumer's replica is
  1515. * pristine, so it needs to be initialized. Return
  1516. * EXAMINE_RUV_PRISTINE_REPLICA.
  1517. * 2 - ruv is present, but its database generation ID doesn't
  1518. * match the local generation ID. This means that either
  1519. * the local replica must be reinitialized from the remote
  1520. * replica or vice-versa. Return
  1521. * EXAMINE_RUV_GENERATION_MISMATCH.
  1522. * 3 - ruv is present, and we have all updates needed to bring
  1523. * the replica up to date using the incremental protocol.
  1524. * return EXAMINE_RUV_OK.
  1525. * 4 - parameter error. Return EXAMINE_RUV_PARAM_ERROR
  1526. */
  1527. static int
  1528. windows_examine_update_vector(Private_Repl_Protocol *prp, RUV *remote_ruv)
  1529. {
  1530. int return_value;
  1531. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> windows_examine_update_vector\n" );
  1532. PR_ASSERT(NULL != prp);
  1533. if (NULL == prp)
  1534. {
  1535. return_value = EXAMINE_RUV_PARAM_ERROR;
  1536. }
  1537. else if (NULL == remote_ruv)
  1538. {
  1539. return_value = EXAMINE_RUV_PRISTINE_REPLICA;
  1540. }
  1541. else
  1542. {
  1543. char *local_gen = NULL;
  1544. char *remote_gen = ruv_get_replica_generation(remote_ruv);
  1545. Object *local_ruv_obj;
  1546. RUV *local_ruv;
  1547. Replica *replica;
  1548. PR_ASSERT(NULL != prp->replica_object);
  1549. replica = object_get_data(prp->replica_object);
  1550. PR_ASSERT(NULL != replica);
  1551. local_ruv_obj = replica_get_ruv (replica);
  1552. if (NULL != local_ruv_obj)
  1553. {
  1554. local_ruv = (RUV*) object_get_data (local_ruv_obj);
  1555. PR_ASSERT (local_ruv);
  1556. local_gen = ruv_get_replica_generation(local_ruv);
  1557. object_release (local_ruv_obj);
  1558. }
  1559. if (NULL == remote_gen || NULL == local_gen || strcmp(remote_gen, local_gen) != 0)
  1560. {
  1561. return_value = EXAMINE_RUV_GENERATION_MISMATCH;
  1562. }
  1563. else
  1564. {
  1565. return_value = EXAMINE_RUV_OK;
  1566. }
  1567. slapi_ch_free((void**)&remote_gen);
  1568. slapi_ch_free((void**)&local_gen);
  1569. }
  1570. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= windows_examine_update_vector\n" );
  1571. return return_value;
  1572. }
  1573. /* this function converts an aquisition code to a string - for debug output */
  1574. static const char*
  1575. acquire2name (int code)
  1576. {
  1577. switch (code)
  1578. {
  1579. case ACQUIRE_SUCCESS: return "success";
  1580. case ACQUIRE_REPLICA_BUSY: return "replica_busy";
  1581. case ACQUIRE_FATAL_ERROR: return "fatal_error";
  1582. case ACQUIRE_CONSUMER_WAS_UPTODATE: return "consumer_was_uptodate";
  1583. case ACQUIRE_TRANSIENT_ERROR: return "transient_error";
  1584. default: return "invalid_code";
  1585. }
  1586. }
  1587. /* this function converts a state to its name - for debug output */
  1588. static const char*
  1589. state2name (int state)
  1590. {
  1591. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> state2name\n" );
  1592. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= state2name\n" );
  1593. switch (state)
  1594. {
  1595. case STATE_START: return "start";
  1596. case STATE_WAIT_WINDOW_OPEN: return "wait_for_window_to_open";
  1597. case STATE_WAIT_CHANGES: return "wait_for_changes";
  1598. case STATE_READY_TO_ACQUIRE: return "ready_to_acquire_replica";
  1599. case STATE_BACKOFF_START: return "start_backoff";
  1600. case STATE_BACKOFF: return "backoff";
  1601. case STATE_SENDING_UPDATES: return "sending_updates";
  1602. case STATE_STOP_FATAL_ERROR: return "stop_fatal_error";
  1603. case STATE_STOP_FATAL_ERROR_PART2: return "stop_fatal_error";
  1604. case STATE_STOP_NORMAL_TERMINATION: return "stop_normal_termination";
  1605. default: return "invalid_state";
  1606. }
  1607. }
  1608. /* this function convert s an event to its name - for debug output */
  1609. static const char*
  1610. event2name (int event)
  1611. {
  1612. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> event2name\n" );
  1613. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= event2name\n" );
  1614. switch (event)
  1615. {
  1616. case EVENT_WINDOW_OPENED: return "update_window_opened";
  1617. case EVENT_WINDOW_CLOSED: return "update_window_closed";
  1618. case EVENT_TRIGGERING_CRITERIA_MET: return "data_modified";
  1619. case EVENT_BACKOFF_EXPIRED: return "backoff_timer_expired";
  1620. case EVENT_REPLICATE_NOW: return "replicate_now";
  1621. case EVENT_PROTOCOL_SHUTDOWN: return "protocol_shutdown";
  1622. case EVENT_AGMT_CHANGED: return "agreement_changed";
  1623. case EVENT_RUN_DIRSYNC: return "run_dirsync";
  1624. default: return "invalid_event";
  1625. }
  1626. }
  1627. static void
  1628. periodic_dirsync(time_t when, void *arg)
  1629. {
  1630. LDAPDebug0Args( LDAP_DEBUG_TRACE, "=> periodic_dirsync\n" );
  1631. slapi_log_error(SLAPI_LOG_REPL, windows_repl_plugin_name,
  1632. "Running Dirsync \n");
  1633. event_notify( (Private_Repl_Protocol*) arg, EVENT_RUN_DIRSYNC);
  1634. LDAPDebug0Args( LDAP_DEBUG_TRACE, "<= periodic_dirsync\n" );
  1635. }