csngen.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. /*
  42. * csngen.c - CSN Generator
  43. */
  44. #ifdef _WIN32
  45. #define _WIN32_WINNT 0x0400
  46. #include <windows.h>
  47. #endif
  48. #include <string.h>
  49. #include "prcountr.h"
  50. #include "slap.h"
  51. #define CSN_MAX_SEQNUM 0xffff /* largest sequence number */
  52. #define CSN_MAX_TIME_ADJUST 24*60*60 /* maximum allowed time adjustment (in seconds) = 1 day */
  53. #define ATTR_CSN_GENERATOR_STATE "nsState" /* attribute that stores csn state information */
  54. #define STATE_FORMAT "%8x%8x%8x%4hx%4hx"
  55. #define STATE_LENGTH 32
  56. #define MAX_VAL(x,y) ((x)>(y)?(x):(y))
  57. #define CSN_CALC_TSTAMP(gen) ((gen)->state.sampled_time + \
  58. (gen)->state.local_offset + \
  59. (gen)->state.remote_offset)
  60. /*
  61. * **************************************************************************
  62. * data structures
  63. * **************************************************************************
  64. */
  65. /* callback node */
  66. typedef struct callback_node
  67. {
  68. GenCSNFn gen_fn; /* function to be called when new csn is generated */
  69. void *gen_arg; /* argument to pass to gen_fn function */
  70. AbortCSNFn abort_fn; /* function to be called when csn is aborted */
  71. void *abort_arg; /* argument to pass to abort_fn function */
  72. } callback_node;
  73. typedef struct callback_list
  74. {
  75. PRRWLock *lock;
  76. DataList *list; /* list of callback_node structures */
  77. } callback_list;
  78. /* persistently stored generator's state */
  79. typedef struct csngen_state
  80. {
  81. ReplicaId rid; /* replica id of the replicated area to which it is attached */
  82. time_t sampled_time; /* time last obtained from time() */
  83. time_t local_offset; /* offset due to the local clock being set back */
  84. time_t remote_offset; /* offset due to clock difference with remote systems */
  85. PRUint16 seq_num; /* used to allow to generate multiple csns within a second */
  86. }csngen_state;
  87. /* data maintained for each generator */
  88. struct csngen
  89. {
  90. csngen_state state; /* persistent state of the generator */
  91. callback_list callbacks; /* list of callbacks registered with the generator */
  92. PRRWLock *lock; /* concurrency control */
  93. };
  94. /*
  95. * **************************************************************************
  96. * global data
  97. * **************************************************************************
  98. */
  99. static time_t g_sampled_time; /* time obtained from time() call */
  100. /*
  101. * **************************************************************************
  102. * forward declarations of helper functions
  103. * **************************************************************************
  104. */
  105. static int _csngen_parse_state (CSNGen *gen, Slapi_Attr *state);
  106. static int _csngen_init_callbacks (CSNGen *gen);
  107. static void _csngen_call_callbacks (const CSNGen *gen, const CSN *csn, PRBool abort);
  108. static int _csngen_cmp_callbacks (const void *el1, const void *el2);
  109. static void _csngen_free_callbacks (CSNGen *gen);
  110. static int _csngen_adjust_local_time (CSNGen *gen, time_t cur_time);
  111. /*
  112. * **************************************************************************
  113. * forward declarations of tester functions
  114. * **************************************************************************
  115. */
  116. static int _csngen_start_test_threads (CSNGen *gen);
  117. static void _csngen_stop_test_threads ();
  118. static void _csngen_gen_tester_main (void *data);
  119. static void _csngen_local_tester_main (void *data);
  120. static void _csngen_remote_tester_main (void *data);
  121. /*
  122. * **************************************************************************
  123. * API
  124. * **************************************************************************
  125. */
  126. CSNGen*
  127. csngen_new (ReplicaId rid, Slapi_Attr *state)
  128. {
  129. int rc = CSN_SUCCESS;
  130. CSNGen *gen = NULL;
  131. gen = (CSNGen*)slapi_ch_calloc (1, sizeof (CSNGen));
  132. if (gen == NULL)
  133. {
  134. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new: memory allocation failed\n");
  135. return NULL;
  136. }
  137. /* create lock to control the access to the state information */
  138. gen->lock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "state_lock");
  139. if (gen->lock == NULL)
  140. {
  141. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new: failed to create lock\n");
  142. rc = CSN_NSPR_ERROR;
  143. goto done;
  144. }
  145. /* initialize callback list */
  146. _csngen_init_callbacks (gen);
  147. gen->state.rid = rid;
  148. if (state)
  149. {
  150. rc = _csngen_parse_state (gen, state);
  151. if (rc != CSN_SUCCESS)
  152. {
  153. goto done;
  154. }
  155. }
  156. else
  157. {
  158. /* new generator */
  159. gen->state.sampled_time = current_time ();
  160. gen->state.local_offset = 0;
  161. gen->state.remote_offset = 0;
  162. gen->state.seq_num = 0;
  163. }
  164. done:
  165. if (rc != CSN_SUCCESS)
  166. {
  167. if (gen)
  168. {
  169. csngen_free (&gen);
  170. }
  171. return NULL;
  172. }
  173. return gen;
  174. }
  175. void
  176. csngen_free (CSNGen **gen)
  177. {
  178. if (gen == NULL || *gen == NULL)
  179. return;
  180. _csngen_free_callbacks (*gen);
  181. if ((*gen)->lock)
  182. PR_DestroyRWLock ((*gen)->lock);
  183. }
  184. int
  185. csngen_new_csn (CSNGen *gen, CSN **csn, PRBool notify)
  186. {
  187. int rc = CSN_SUCCESS;
  188. time_t cur_time;
  189. int delta;
  190. if (gen == NULL || csn == NULL)
  191. {
  192. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: invalid argument\n");
  193. return CSN_INVALID_PARAMETER;
  194. }
  195. *csn = csn_new ();
  196. if (*csn == NULL)
  197. {
  198. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: memory allocation failed\n");
  199. return CSN_MEMORY_ERROR;
  200. }
  201. PR_RWLock_Wlock (gen->lock);
  202. if (g_sampled_time == 0)
  203. csngen_update_time ();
  204. cur_time = g_sampled_time;
  205. /* check if the time should be adjusted */
  206. delta = cur_time - gen->state.sampled_time;
  207. if (delta > 0)
  208. {
  209. rc = _csngen_adjust_local_time (gen, cur_time);
  210. if (rc != CSN_SUCCESS)
  211. {
  212. PR_RWLock_Unlock (gen->lock);
  213. return rc;
  214. }
  215. }
  216. else if (delta < -300) {
  217. /*
  218. * The maxseqnum could support up to 65535 CSNs per second.
  219. * That means that we could avoid duplicated CSN's for
  220. * delta up to 300 secs if update rate is 200/sec (usually
  221. * the max rate is below 20/sec).
  222. * Beyond 300 secs, we advance gen->state.sampled_time by
  223. * one sec to recycle seqnum.
  224. */
  225. slapi_log_error (SLAPI_LOG_FATAL, "csngen_new_csn", "Warning: too much time skew (%d secs). Current seqnum=%0x\n", delta, gen->state.seq_num );
  226. rc = _csngen_adjust_local_time (gen, gen->state.sampled_time+1);
  227. if (rc != CSN_SUCCESS)
  228. {
  229. PR_RWLock_Unlock (gen->lock);
  230. return rc;
  231. }
  232. }
  233. if (gen->state.seq_num == CSN_MAX_SEQNUM)
  234. {
  235. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: sequence rollover; "
  236. "local offset updated.\n");
  237. gen->state.local_offset ++;
  238. gen->state.seq_num = 0;
  239. }
  240. (*csn)->tstamp = CSN_CALC_TSTAMP(gen);
  241. (*csn)->seqnum = gen->state.seq_num ++;
  242. (*csn)->rid = gen->state.rid;
  243. (*csn)->subseqnum = 0;
  244. /* The lock is intentionally unlocked before callbacks are called.
  245. This is to prevent deadlocks. The callback management code has
  246. its own lock */
  247. PR_RWLock_Unlock (gen->lock);
  248. /* notify modules that registered interest in csn generation */
  249. if (notify)
  250. {
  251. _csngen_call_callbacks (gen, *csn, 0);
  252. }
  253. return rc;
  254. }
  255. /* this function should be called for csns generated with non-zero notify
  256. that were unused because the corresponding operation was aborted.
  257. The function calls "abort" functions registered through
  258. csngen_register_callbacks call */
  259. void csngen_abort_csn (CSNGen *gen, const CSN *csn)
  260. {
  261. _csngen_call_callbacks (gen, csn, 1);
  262. }
  263. /* this function should be called when a remote CSN for the same part of
  264. the dit becomes known to the server (for instance, as part of RUV during
  265. replication session. In response, the generator would adjust its notion
  266. of time so that it does not generate smaller csns */
  267. int csngen_adjust_time (CSNGen *gen, const CSN* csn)
  268. {
  269. time_t remote_time, remote_offset, cur_time;
  270. PRUint16 remote_seqnum;
  271. int rc;
  272. if (gen == NULL || csn == NULL)
  273. return CSN_INVALID_PARAMETER;
  274. remote_time = csn_get_time (csn);
  275. remote_seqnum = csn_get_seqnum (csn);
  276. PR_RWLock_Wlock (gen->lock);
  277. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  278. cur_time = CSN_CALC_TSTAMP(gen);
  279. slapi_log_error (SLAPI_LOG_REPL, NULL, "csngen_adjust_time: "
  280. "gen state before %08lx%04x:%ld:%ld:%ld\n",
  281. cur_time, gen->state.seq_num,
  282. gen->state.sampled_time,
  283. gen->state.local_offset,
  284. gen->state.remote_offset);
  285. }
  286. /* make sure we have the current time */
  287. csngen_update_time();
  288. cur_time = g_sampled_time;
  289. /* make sure sampled_time is current */
  290. /* must only call adjust_local_time if the current time is greater than
  291. the generator state time */
  292. if ((cur_time > gen->state.sampled_time) &&
  293. (CSN_SUCCESS != (rc = _csngen_adjust_local_time(gen, cur_time))))
  294. {
  295. /* _csngen_adjust_local_time will log error */
  296. PR_RWLock_Unlock (gen->lock);
  297. csngen_dump_state(gen);
  298. return rc;
  299. }
  300. cur_time = CSN_CALC_TSTAMP(gen);
  301. if (remote_time >= cur_time)
  302. {
  303. time_t new_time = 0;
  304. if (remote_seqnum > gen->state.seq_num )
  305. {
  306. if (remote_seqnum < CSN_MAX_SEQNUM)
  307. {
  308. gen->state.seq_num = remote_seqnum + 1;
  309. }
  310. else
  311. {
  312. remote_time++;
  313. }
  314. }
  315. remote_offset = remote_time - cur_time;
  316. if (remote_offset > gen->state.remote_offset)
  317. {
  318. if (remote_offset <= CSN_MAX_TIME_ADJUST)
  319. {
  320. gen->state.remote_offset = remote_offset;
  321. }
  322. else /* remote_offset > CSN_MAX_TIME_ADJUST */
  323. {
  324. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_adjust_time: "
  325. "adjustment limit exceeded; value - %ld, limit - %ld\n",
  326. remote_offset, (long)CSN_MAX_TIME_ADJUST);
  327. PR_RWLock_Unlock (gen->lock);
  328. csngen_dump_state(gen);
  329. return CSN_LIMIT_EXCEEDED;
  330. }
  331. }
  332. else if (remote_offset > 0) { /* still need to account for this */
  333. gen->state.local_offset += remote_offset;
  334. }
  335. new_time = CSN_CALC_TSTAMP(gen);
  336. /* let's revisit the seq num - if the new time is > the old
  337. tiem, we should reset the seq number to remote + 1 if
  338. this won't cause a wrap around */
  339. if (new_time > cur_time) {
  340. /* just set seq_num regardless of whether the current one
  341. is < or > than the remote one - the goal of this function
  342. is to make sure we generate CSNs > the remote CSN - if
  343. we have increased the time, we can decrease the seqnum
  344. and still guarantee that any new CSNs generated will be
  345. > any current CSNs we have generated */
  346. gen->state.seq_num = remote_seqnum + 1;
  347. }
  348. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  349. slapi_log_error (SLAPI_LOG_REPL, NULL, "csngen_adjust_time: "
  350. "gen state after %08lx%04x:%ld:%ld:%ld\n",
  351. new_time, gen->state.seq_num,
  352. gen->state.sampled_time,
  353. gen->state.local_offset,
  354. gen->state.remote_offset);
  355. }
  356. }
  357. else if (gen->state.remote_offset > 0)
  358. {
  359. /* decrease remote offset? */
  360. /* how to decrease remote offset but ensure that we don't
  361. generate a duplicate CSN, or a CSN smaller than one we've already
  362. generated? */
  363. }
  364. PR_RWLock_Unlock (gen->lock);
  365. return CSN_SUCCESS;
  366. }
  367. /* returns PR_TRUE if the csn was generated by this generator and
  368. PR_FALSE otherwise. */
  369. PRBool csngen_is_local_csn(const CSNGen *gen, const CSN *csn)
  370. {
  371. return (gen && csn && gen->state.rid == csn_get_replicaid(csn));
  372. }
  373. /* returns current state of the generator so that it can be saved in the DIT */
  374. int csngen_get_state (const CSNGen *gen, Slapi_Mod *state)
  375. {
  376. struct berval bval;
  377. if (gen == NULL || state == NULL)
  378. return CSN_INVALID_PARAMETER;
  379. PR_RWLock_Rlock (gen->lock);
  380. slapi_mod_init (state, 1);
  381. slapi_mod_set_type (state, ATTR_CSN_GENERATOR_STATE);
  382. slapi_mod_set_operation (state, LDAP_MOD_REPLACE | LDAP_MOD_BVALUES);
  383. bval.bv_val = (char*)&gen->state;
  384. bval.bv_len = sizeof (gen->state);
  385. slapi_mod_add_value(state, &bval);
  386. PR_RWLock_Unlock (gen->lock);
  387. return CSN_SUCCESS;
  388. }
  389. /* registers callbacks to be called when csn is created or aborted */
  390. void* csngen_register_callbacks(CSNGen *gen, GenCSNFn genFn, void *genArg,
  391. AbortCSNFn abortFn, void *abortArg)
  392. {
  393. callback_node *node;
  394. if (gen == NULL || (genFn == NULL && abortFn == NULL))
  395. return NULL;
  396. node = (callback_node *)slapi_ch_malloc (sizeof (callback_node));
  397. node->gen_fn = genFn;
  398. node->gen_arg = genArg;
  399. node->abort_fn = abortFn;
  400. node->abort_arg = abortArg;
  401. PR_RWLock_Wlock (gen->callbacks.lock);
  402. dl_add (gen->callbacks.list, node);
  403. PR_RWLock_Unlock (gen->callbacks.lock);
  404. return node;
  405. }
  406. /* unregisters callbacks registered via call to csngenRegisterCallbacks */
  407. void csngen_unregister_callbacks(CSNGen *gen, void *cookie)
  408. {
  409. if (gen && cookie)
  410. {
  411. PR_RWLock_Wlock (gen->callbacks.lock);
  412. dl_delete (gen->callbacks.list, cookie, _csngen_cmp_callbacks, slapi_ch_free);
  413. PR_RWLock_Unlock (gen->callbacks.lock);
  414. }
  415. }
  416. /* this functions is periodically called from daemon.c to
  417. update time used by all generators */
  418. void csngen_update_time ()
  419. {
  420. g_sampled_time = current_time ();
  421. }
  422. /* debugging function */
  423. void csngen_dump_state (const CSNGen *gen)
  424. {
  425. if (gen)
  426. {
  427. PR_RWLock_Rlock (gen->lock);
  428. slapi_log_error(SLAPI_LOG_FATAL, NULL, "CSN generator's state:\n");
  429. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\treplica id: %d\n", gen->state.rid);
  430. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tsampled time: %ld\n", gen->state.sampled_time);
  431. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tlocal offset: %ld\n", gen->state.local_offset);
  432. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tremote offset: %ld\n", gen->state.remote_offset);
  433. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tsequence number: %d\n", gen->state.seq_num);
  434. PR_RWLock_Unlock (gen->lock);
  435. }
  436. }
  437. #define TEST_TIME 600 /* 10 minutes */
  438. /* This function tests csn generator. It verifies that csn's are generated in
  439. monotnically increasing order in the face of local and remote time skews */
  440. void csngen_test ()
  441. {
  442. int rc;
  443. CSNGen *gen = csngen_new (255, NULL);
  444. slapi_log_error(SLAPI_LOG_FATAL, NULL, "staring csn generator test ...");
  445. csngen_dump_state (gen);
  446. rc = _csngen_start_test_threads(gen);
  447. if (rc == 0)
  448. {
  449. DS_Sleep(PR_SecondsToInterval(TEST_TIME));
  450. }
  451. _csngen_stop_test_threads(gen);
  452. csngen_dump_state (gen);
  453. slapi_log_error(SLAPI_LOG_FATAL, NULL, "csn generator test is complete...");
  454. }
  455. /*
  456. * **************************************************************************
  457. * Helper functions
  458. * **************************************************************************
  459. */
  460. static int
  461. _csngen_parse_state (CSNGen *gen, Slapi_Attr *state)
  462. {
  463. int rc;
  464. Slapi_Value *val;
  465. const struct berval *bval;
  466. ReplicaId rid = gen->state.rid;
  467. PR_ASSERT (gen && state);
  468. rc = slapi_attr_first_value(state, &val);
  469. if (rc != 0)
  470. {
  471. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_parse_state: invalid state format\n");
  472. return CSN_INVALID_FORMAT;
  473. }
  474. bval = slapi_value_get_berval(val);
  475. memcpy (&gen->state, bval->bv_val, bval->bv_len);
  476. /* replicaid does not match */
  477. if (rid != gen->state.rid)
  478. {
  479. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_parse_state: replica id"
  480. " mismatch; current id - %d, replica id in the state - %d\n",
  481. rid, gen->state.rid);
  482. return CSN_INVALID_FORMAT;
  483. }
  484. return CSN_SUCCESS;
  485. }
  486. static int
  487. _csngen_init_callbacks (CSNGen *gen)
  488. {
  489. /* create a lock to control access to the callback list */
  490. gen->callbacks.lock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "callback_lock");
  491. if (gen->callbacks.lock == NULL)
  492. {
  493. return CSN_NSPR_ERROR;
  494. }
  495. gen->callbacks.list = dl_new ();
  496. dl_init (gen->callbacks.list, 0);
  497. return CSN_SUCCESS;
  498. }
  499. static void
  500. _csngen_free_callbacks (CSNGen *gen)
  501. {
  502. PR_ASSERT (gen);
  503. if (gen->callbacks.list)
  504. {
  505. dl_cleanup (gen->callbacks.list, slapi_ch_free);
  506. dl_free (&(gen->callbacks.list));
  507. }
  508. if (gen->callbacks.lock)
  509. PR_DestroyRWLock (gen->callbacks.lock);
  510. }
  511. static void
  512. _csngen_call_callbacks (const CSNGen *gen, const CSN *csn, PRBool abort)
  513. {
  514. int cookie;
  515. callback_node* node;
  516. PR_ASSERT (gen && csn);
  517. PR_RWLock_Rlock (gen->callbacks.lock);
  518. node = (callback_node*)dl_get_first (gen->callbacks.list, &cookie);
  519. while (node)
  520. {
  521. if (abort)
  522. {
  523. if (node->abort_fn)
  524. node->abort_fn (csn, node->abort_arg);
  525. }
  526. else
  527. {
  528. if (node->gen_fn)
  529. node->gen_fn (csn, node->gen_arg);
  530. }
  531. node = (callback_node*)dl_get_next (gen->callbacks.list, &cookie);
  532. }
  533. PR_RWLock_Unlock (gen->callbacks.lock);
  534. }
  535. /* el1 is just a pointer to the callback_node */
  536. static int
  537. _csngen_cmp_callbacks (const void *el1, const void *el2)
  538. {
  539. if (el1 == el2)
  540. return 0;
  541. if (el1 < el2)
  542. return -1;
  543. else
  544. return 1;
  545. }
  546. static int
  547. _csngen_adjust_local_time (CSNGen *gen, time_t cur_time)
  548. {
  549. time_t time_diff = cur_time - gen->state.sampled_time;
  550. if (time_diff == 0) {
  551. /* This is a no op - _csngen_adjust_local_time should never be called
  552. in this case, because there is nothing to adjust - but just return
  553. here to protect ourselves
  554. */
  555. return CSN_SUCCESS;
  556. }
  557. else if (time_diff > 0)
  558. {
  559. time_t ts_before = CSN_CALC_TSTAMP(gen);
  560. time_t ts_after = 0;
  561. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  562. time_t new_time = CSN_CALC_TSTAMP(gen);
  563. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  564. "gen state before %08lx%04x:%ld:%ld:%ld\n",
  565. new_time, gen->state.seq_num,
  566. gen->state.sampled_time,
  567. gen->state.local_offset,
  568. gen->state.remote_offset);
  569. }
  570. gen->state.sampled_time = cur_time;
  571. if (time_diff > gen->state.local_offset)
  572. gen->state.local_offset = 0;
  573. else
  574. gen->state.local_offset = gen->state.local_offset - time_diff;
  575. /* only reset the seq_num if the new timestamp part of the CSN
  576. is going to be greater than the old one - if they are the
  577. same after the above adjustment (which can happen if
  578. csngen_adjust_time has to store the offset in the
  579. local_offset field) we must not allow the CSN to regress or
  580. generate duplicate numbers */
  581. ts_after = CSN_CALC_TSTAMP(gen);
  582. if (ts_after > ts_before) {
  583. gen->state.seq_num = 0; /* only reset if new time > old time */
  584. }
  585. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  586. time_t new_time = CSN_CALC_TSTAMP(gen);
  587. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  588. "gen state after %08lx%04x:%ld:%ld:%ld\n",
  589. new_time, gen->state.seq_num,
  590. gen->state.sampled_time,
  591. gen->state.local_offset,
  592. gen->state.remote_offset);
  593. }
  594. return CSN_SUCCESS;
  595. }
  596. else /* time was turned back */
  597. {
  598. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  599. time_t new_time = CSN_CALC_TSTAMP(gen);
  600. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  601. "gen state back before %08lx%04x:%ld:%ld:%ld\n",
  602. new_time, gen->state.seq_num,
  603. gen->state.sampled_time,
  604. gen->state.local_offset,
  605. gen->state.remote_offset);
  606. }
  607. if (abs (time_diff) > CSN_MAX_TIME_ADJUST)
  608. {
  609. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_adjust_local_time: "
  610. "adjustment limit exceeded; value - %d, limit - %d\n",
  611. abs (time_diff), CSN_MAX_TIME_ADJUST);
  612. return CSN_LIMIT_EXCEEDED;
  613. }
  614. gen->state.sampled_time = cur_time;
  615. gen->state.local_offset = MAX_VAL (gen->state.local_offset, abs (time_diff));
  616. gen->state.seq_num = 0;
  617. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  618. time_t new_time = CSN_CALC_TSTAMP(gen);
  619. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  620. "gen state back after %08lx%04x:%ld:%ld:%ld\n",
  621. new_time, gen->state.seq_num,
  622. gen->state.sampled_time,
  623. gen->state.local_offset,
  624. gen->state.remote_offset);
  625. }
  626. return CSN_SUCCESS;
  627. }
  628. }
  629. /*
  630. * **************************************************************************
  631. * test code
  632. * **************************************************************************
  633. */
  634. /*
  635. * The defult thread stacksize for nspr21 is 64k. For OSF, we require
  636. * a larger stacksize as actual storage allocation is higher i.e
  637. * pointers are allocated 8 bytes but lower 4 bytes are used.
  638. * The value 0 means use the default stacksize.
  639. */
  640. #if defined (OSF1) || defined(__LP64__) || defined (_LP64) /* 64-bit architectures need large stacks */
  641. #define DEFAULT_THREAD_STACKSIZE 131072L
  642. #else
  643. #define DEFAULT_THREAD_STACKSIZE 0
  644. #endif
  645. #define GEN_TREAD_COUNT 20
  646. int s_thread_count;
  647. int s_must_exit;
  648. static int
  649. _csngen_start_test_threads(CSNGen *gen)
  650. {
  651. int i;
  652. PR_ASSERT (gen);
  653. s_thread_count = 0;
  654. s_must_exit = 0;
  655. /* create threads that generate csns */
  656. for(i=0; i< GEN_TREAD_COUNT; i++)
  657. {
  658. if (PR_CreateThread(PR_USER_THREAD, _csngen_gen_tester_main, gen,
  659. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  660. DEFAULT_THREAD_STACKSIZE) == NULL)
  661. {
  662. PRErrorCode prerr = PR_GetError();
  663. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  664. "failed to create a CSN generator thread number %d; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  665. i, prerr, slapd_pr_strerror(prerr));
  666. return -1;
  667. }
  668. s_thread_count ++;
  669. }
  670. /* create a thread that modifies remote time */
  671. if (PR_CreateThread(PR_USER_THREAD, _csngen_remote_tester_main, (void *)gen,
  672. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  673. DEFAULT_THREAD_STACKSIZE) == NULL)
  674. {
  675. PRErrorCode prerr = PR_GetError();
  676. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  677. "failed to create the remote CSN tester thread; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  678. prerr, slapd_pr_strerror(prerr));
  679. return -1;
  680. }
  681. s_thread_count ++;
  682. /* create a thread that modifies local time */
  683. if (PR_CreateThread(PR_USER_THREAD, _csngen_local_tester_main, (void *)gen,
  684. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  685. DEFAULT_THREAD_STACKSIZE) == NULL)
  686. {
  687. PRErrorCode prerr = PR_GetError();
  688. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  689. "failed to create the local CSN tester thread; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  690. prerr, slapd_pr_strerror(prerr));
  691. return -1;
  692. }
  693. s_thread_count ++;
  694. return 0;
  695. }
  696. static void _csngen_stop_test_threads ()
  697. {
  698. s_must_exit = 1;
  699. while (s_thread_count > 0)
  700. {
  701. /* sleep for 30 seconds */
  702. DS_Sleep (PR_SecondsToInterval(20));
  703. }
  704. }
  705. /* periodically generate a csn and dump it to the error log */
  706. static void
  707. _csngen_gen_tester_main (void *data)
  708. {
  709. CSNGen *gen = (CSNGen*)data;
  710. CSN *csn;
  711. char buff [CSN_STRSIZE];
  712. int rc;
  713. PR_ASSERT (gen);
  714. while (!s_must_exit)
  715. {
  716. rc = csngen_new_csn (gen, &csn, PR_FALSE);
  717. if (rc != CSN_SUCCESS)
  718. {
  719. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  720. "failed to generate csn; csn error - %d\n", rc);
  721. }
  722. else
  723. {
  724. slapi_log_error (SLAPI_LOG_FATAL, NULL, "generate csn %s\n",
  725. csn_as_string(csn, PR_FALSE, buff));
  726. }
  727. /* sleep for 30 seconds */
  728. DS_Sleep (PR_SecondsToInterval(10));
  729. }
  730. PR_AtomicDecrement (&s_thread_count);
  731. }
  732. /* simulate clock skew with remote servers that causes
  733. generator to advance its remote offset */
  734. static void
  735. _csngen_remote_tester_main (void *data)
  736. {
  737. CSNGen *gen = (CSNGen*)data;
  738. CSN *csn;
  739. time_t csn_time;
  740. int rc;
  741. PR_ASSERT (gen);
  742. while (!s_must_exit)
  743. {
  744. rc = csngen_new_csn (gen, &csn, PR_FALSE);
  745. if (rc != CSN_SUCCESS)
  746. {
  747. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  748. "failed to generate csn; csn error - %d\n", rc);
  749. }
  750. else
  751. {
  752. csn_time = csn_get_time(csn);
  753. csn_set_time (csn, csn_time + slapi_rand () % 100);
  754. rc = csngen_adjust_time (gen, csn);
  755. if (rc != CSN_SUCCESS)
  756. {
  757. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  758. "failed to adjust generator's time; csn error - %d\n", rc);
  759. }
  760. csngen_dump_state (gen);
  761. }
  762. /* sleep for 30 seconds */
  763. DS_Sleep (PR_SecondsToInterval(60));
  764. }
  765. PR_AtomicDecrement (&s_thread_count);
  766. }
  767. /* simulate local clock being set back */
  768. static void
  769. _csngen_local_tester_main (void *data)
  770. {
  771. CSNGen *gen = (CSNGen*)data;
  772. PR_ASSERT (gen);
  773. while (!s_must_exit)
  774. {
  775. /* sleep for 30 seconds */
  776. DS_Sleep (PR_SecondsToInterval(60));
  777. g_sampled_time -= slapi_rand () % 100;
  778. csngen_dump_state (gen);
  779. }
  780. PR_AtomicDecrement (&s_thread_count);
  781. }