csngen.c 25 KB


  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. # include <config.h>
  11. #endif
  12. /*
  13. * csngen.c - CSN Generator
  14. */
  15. #include <string.h>
  16. #include "prcountr.h"
  17. #include "slap.h"
  18. #define CSN_MAX_SEQNUM 0xffff /* largest sequence number */
  19. #define CSN_MAX_TIME_ADJUST 24*60*60 /* maximum allowed time adjustment (in seconds) = 1 day */
  20. #define ATTR_CSN_GENERATOR_STATE "nsState" /* attribute that stores csn state information */
  21. #define STATE_FORMAT "%8x%8x%8x%4hx%4hx"
  22. #define STATE_LENGTH 32
  23. #define MAX_VAL(x,y) ((x)>(y)?(x):(y))
  24. #define CSN_CALC_TSTAMP(gen) ((gen)->state.sampled_time + \
  25. (gen)->state.local_offset + \
  26. (gen)->state.remote_offset)
  27. /*
  28. * **************************************************************************
  29. * data structures
  30. * **************************************************************************
  31. */
  32. /* callback node */
  33. typedef struct callback_node
  34. {
  35. GenCSNFn gen_fn; /* function to be called when new csn is generated */
  36. void *gen_arg; /* argument to pass to gen_fn function */
  37. AbortCSNFn abort_fn; /* function to be called when csn is aborted */
  38. void *abort_arg; /* argument to pass to abort_fn function */
  39. } callback_node;
  40. typedef struct callback_list
  41. {
  42. Slapi_RWLock *lock;
  43. DataList *list; /* list of callback_node structures */
  44. } callback_list;
  45. /* persistently stored generator's state */
  46. typedef struct csngen_state
  47. {
  48. ReplicaId rid; /* replica id of the replicated area to which it is attached */
  49. time_t sampled_time; /* time last obtained from time() */
  50. time_t local_offset; /* offset due to the local clock being set back */
  51. time_t remote_offset; /* offset due to clock difference with remote systems */
  52. PRUint16 seq_num; /* used to allow to generate multiple csns within a second */
  53. }csngen_state;
  54. /* data maintained for each generator */
  55. struct csngen
  56. {
  57. csngen_state state; /* persistent state of the generator */
  58. callback_list callbacks; /* list of callbacks registered with the generator */
  59. Slapi_RWLock *lock; /* concurrency control */
  60. };
  61. /*
  62. * **************************************************************************
  63. * global data
  64. * **************************************************************************
  65. */
  66. static time_t g_sampled_time; /* time obtained from time() call */
  67. /*
  68. * **************************************************************************
  69. * forward declarations of helper functions
  70. * **************************************************************************
  71. */
  72. static int _csngen_parse_state (CSNGen *gen, Slapi_Attr *state);
  73. static int _csngen_init_callbacks (CSNGen *gen);
  74. static void _csngen_call_callbacks (const CSNGen *gen, const CSN *csn, PRBool abort);
  75. static int _csngen_cmp_callbacks (const void *el1, const void *el2);
  76. static void _csngen_free_callbacks (CSNGen *gen);
  77. static int _csngen_adjust_local_time (CSNGen *gen, time_t cur_time);
  78. /*
  79. * **************************************************************************
  80. * forward declarations of tester functions
  81. * **************************************************************************
  82. */
  83. static int _csngen_start_test_threads (CSNGen *gen);
  84. static void _csngen_stop_test_threads ();
  85. static void _csngen_gen_tester_main (void *data);
  86. static void _csngen_local_tester_main (void *data);
  87. static void _csngen_remote_tester_main (void *data);
  88. /*
  89. * **************************************************************************
  90. * API
  91. * **************************************************************************
  92. */
  93. CSNGen*
  94. csngen_new (ReplicaId rid, Slapi_Attr *state)
  95. {
  96. int rc = CSN_SUCCESS;
  97. CSNGen *gen = NULL;
  98. gen = (CSNGen*)slapi_ch_calloc (1, sizeof (CSNGen));
  99. if (gen == NULL)
  100. {
  101. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new: memory allocation failed\n");
  102. return NULL;
  103. }
  104. /* create lock to control the access to the state information */
  105. gen->lock = slapi_new_rwlock();
  106. if (gen->lock == NULL)
  107. {
  108. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new: failed to create lock\n");
  109. rc = CSN_NSPR_ERROR;
  110. goto done;
  111. }
  112. /* initialize callback list */
  113. _csngen_init_callbacks (gen);
  114. gen->state.rid = rid;
  115. if (state)
  116. {
  117. rc = _csngen_parse_state (gen, state);
  118. if (rc != CSN_SUCCESS)
  119. {
  120. goto done;
  121. }
  122. }
  123. else
  124. {
  125. /* new generator */
  126. gen->state.sampled_time = current_time ();
  127. gen->state.local_offset = 0;
  128. gen->state.remote_offset = 0;
  129. gen->state.seq_num = 0;
  130. }
  131. done:
  132. if (rc != CSN_SUCCESS)
  133. {
  134. if (gen)
  135. {
  136. csngen_free (&gen);
  137. }
  138. return NULL;
  139. }
  140. return gen;
  141. }
  142. void
  143. csngen_free (CSNGen **gen)
  144. {
  145. if (gen == NULL || *gen == NULL)
  146. return;
  147. _csngen_free_callbacks (*gen);
  148. if ((*gen)->lock)
  149. slapi_destroy_rwlock ((*gen)->lock);
  150. slapi_ch_free((void**)gen);
  151. }
  152. int
  153. csngen_new_csn (CSNGen *gen, CSN **csn, PRBool notify)
  154. {
  155. int rc = CSN_SUCCESS;
  156. time_t cur_time;
  157. int delta;
  158. if (gen == NULL || csn == NULL)
  159. {
  160. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: invalid argument\n");
  161. return CSN_INVALID_PARAMETER;
  162. }
  163. *csn = csn_new ();
  164. if (*csn == NULL)
  165. {
  166. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: memory allocation failed\n");
  167. return CSN_MEMORY_ERROR;
  168. }
  169. slapi_rwlock_wrlock (gen->lock);
  170. if (g_sampled_time == 0)
  171. csngen_update_time ();
  172. cur_time = g_sampled_time;
  173. /* check if the time should be adjusted */
  174. delta = cur_time - gen->state.sampled_time;
  175. if (delta > 0)
  176. {
  177. rc = _csngen_adjust_local_time (gen, cur_time);
  178. if (rc != CSN_SUCCESS)
  179. {
  180. slapi_rwlock_unlock (gen->lock);
  181. return rc;
  182. }
  183. }
  184. else if (delta < -300) {
  185. /*
  186. * The maxseqnum could support up to 65535 CSNs per second.
  187. * That means that we could avoid duplicated CSN's for
  188. * delta up to 300 secs if update rate is 200/sec (usually
  189. * the max rate is below 20/sec).
  190. * Beyond 300 secs, we advance gen->state.sampled_time by
  191. * one sec to recycle seqnum.
  192. */
  193. slapi_log_error (SLAPI_LOG_FATAL, "csngen_new_csn", "Warning: too much time skew (%d secs). Current seqnum=%0x\n", delta, gen->state.seq_num );
  194. rc = _csngen_adjust_local_time (gen, gen->state.sampled_time+1);
  195. if (rc != CSN_SUCCESS)
  196. {
  197. slapi_rwlock_unlock (gen->lock);
  198. return rc;
  199. }
  200. }
  201. if (gen->state.seq_num == CSN_MAX_SEQNUM)
  202. {
  203. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: sequence rollover; "
  204. "local offset updated.\n");
  205. gen->state.local_offset ++;
  206. gen->state.seq_num = 0;
  207. }
  208. (*csn)->tstamp = CSN_CALC_TSTAMP(gen);
  209. (*csn)->seqnum = gen->state.seq_num ++;
  210. (*csn)->rid = gen->state.rid;
  211. (*csn)->subseqnum = 0;
  212. /* The lock is intentionally unlocked before callbacks are called.
  213. This is to prevent deadlocks. The callback management code has
  214. its own lock */
  215. slapi_rwlock_unlock (gen->lock);
  216. /* notify modules that registered interest in csn generation */
  217. if (notify)
  218. {
  219. _csngen_call_callbacks (gen, *csn, 0);
  220. }
  221. return rc;
  222. }
  223. /* this function should be called for csns generated with non-zero notify
  224. that were unused because the corresponding operation was aborted.
  225. The function calls "abort" functions registered through
  226. csngen_register_callbacks call */
  227. void csngen_abort_csn (CSNGen *gen, const CSN *csn)
  228. {
  229. _csngen_call_callbacks (gen, csn, 1);
  230. }
  231. void csngen_rewrite_rid(CSNGen *gen, ReplicaId rid)
  232. {
  233. if (gen == NULL){
  234. return;
  235. }
  236. slapi_rwlock_wrlock (gen->lock);
  237. gen->state.rid = rid;
  238. slapi_rwlock_unlock (gen->lock);
  239. }
  240. /* this function should be called when a remote CSN for the same part of
  241. the dit becomes known to the server (for instance, as part of RUV during
  242. replication session. In response, the generator would adjust its notion
  243. of time so that it does not generate smaller csns */
  244. int csngen_adjust_time(CSNGen *gen, const CSN* csn)
  245. {
  246. time_t remote_time, remote_offset, cur_time;
  247. PRUint16 remote_seqnum;
  248. int rc;
  249. extern int config_get_ignore_time_skew();
  250. int ignore_time_skew = config_get_ignore_time_skew();
  251. if (gen == NULL || csn == NULL)
  252. return CSN_INVALID_PARAMETER;
  253. remote_time = csn_get_time (csn);
  254. remote_seqnum = csn_get_seqnum (csn);
  255. slapi_rwlock_wrlock (gen->lock);
  256. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  257. cur_time = CSN_CALC_TSTAMP(gen);
  258. slapi_log_error (SLAPI_LOG_REPL, NULL, "csngen_adjust_time: "
  259. "gen state before %08lx%04x:%ld:%ld:%ld\n",
  260. cur_time, gen->state.seq_num,
  261. gen->state.sampled_time,
  262. gen->state.local_offset,
  263. gen->state.remote_offset);
  264. }
  265. /* make sure we have the current time */
  266. csngen_update_time();
  267. cur_time = g_sampled_time;
  268. /* make sure sampled_time is current */
  269. /* must only call adjust_local_time if the current time is greater than
  270. the generator state time */
  271. if ((cur_time > gen->state.sampled_time) &&
  272. (CSN_SUCCESS != (rc = _csngen_adjust_local_time(gen, cur_time))))
  273. {
  274. /* _csngen_adjust_local_time will log error */
  275. slapi_rwlock_unlock (gen->lock);
  276. csngen_dump_state(gen);
  277. return rc;
  278. }
  279. cur_time = CSN_CALC_TSTAMP(gen);
  280. if (remote_time >= cur_time)
  281. {
  282. time_t new_time = 0;
  283. if (remote_seqnum > gen->state.seq_num )
  284. {
  285. if (remote_seqnum < CSN_MAX_SEQNUM)
  286. {
  287. gen->state.seq_num = remote_seqnum + 1;
  288. }
  289. else
  290. {
  291. remote_time++;
  292. }
  293. }
  294. remote_offset = remote_time - cur_time;
  295. if (remote_offset > gen->state.remote_offset)
  296. {
  297. if (ignore_time_skew || (remote_offset <= CSN_MAX_TIME_ADJUST))
  298. {
  299. gen->state.remote_offset = remote_offset;
  300. }
  301. else /* remote_offset > CSN_MAX_TIME_ADJUST */
  302. {
  303. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_adjust_time: "
  304. "adjustment limit exceeded; value - %ld, limit - %ld\n",
  305. remote_offset, (long)CSN_MAX_TIME_ADJUST);
  306. slapi_rwlock_unlock (gen->lock);
  307. csngen_dump_state(gen);
  308. return CSN_LIMIT_EXCEEDED;
  309. }
  310. }
  311. else if (remote_offset > 0) { /* still need to account for this */
  312. gen->state.local_offset += remote_offset;
  313. }
  314. new_time = CSN_CALC_TSTAMP(gen);
  315. /* let's revisit the seq num - if the new time is > the old
  316. tiem, we should reset the seq number to remote + 1 if
  317. this won't cause a wrap around */
  318. if (new_time > cur_time) {
  319. /* just set seq_num regardless of whether the current one
  320. is < or > than the remote one - the goal of this function
  321. is to make sure we generate CSNs > the remote CSN - if
  322. we have increased the time, we can decrease the seqnum
  323. and still guarantee that any new CSNs generated will be
  324. > any current CSNs we have generated */
  325. gen->state.seq_num = remote_seqnum + 1;
  326. }
  327. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  328. slapi_log_error (SLAPI_LOG_REPL, NULL, "csngen_adjust_time: "
  329. "gen state after %08lx%04x:%ld:%ld:%ld\n",
  330. new_time, gen->state.seq_num,
  331. gen->state.sampled_time,
  332. gen->state.local_offset,
  333. gen->state.remote_offset);
  334. }
  335. }
  336. else if (gen->state.remote_offset > 0)
  337. {
  338. /* decrease remote offset? */
  339. /* how to decrease remote offset but ensure that we don't
  340. generate a duplicate CSN, or a CSN smaller than one we've already
  341. generated? */
  342. }
  343. slapi_rwlock_unlock (gen->lock);
  344. return CSN_SUCCESS;
  345. }
  346. /* returns PR_TRUE if the csn was generated by this generator and
  347. PR_FALSE otherwise. */
  348. PRBool csngen_is_local_csn(const CSNGen *gen, const CSN *csn)
  349. {
  350. return (gen && csn && gen->state.rid == csn_get_replicaid(csn));
  351. }
  352. /* returns current state of the generator so that it can be saved in the DIT */
  353. int csngen_get_state (const CSNGen *gen, Slapi_Mod *state)
  354. {
  355. struct berval bval;
  356. if (gen == NULL || state == NULL)
  357. return CSN_INVALID_PARAMETER;
  358. slapi_rwlock_rdlock (gen->lock);
  359. slapi_mod_init (state, 1);
  360. slapi_mod_set_type (state, ATTR_CSN_GENERATOR_STATE);
  361. slapi_mod_set_operation (state, LDAP_MOD_REPLACE | LDAP_MOD_BVALUES);
  362. bval.bv_val = (char*)&gen->state;
  363. bval.bv_len = sizeof (gen->state);
  364. slapi_mod_add_value(state, &bval);
  365. slapi_rwlock_unlock (gen->lock);
  366. return CSN_SUCCESS;
  367. }
  368. /* registers callbacks to be called when csn is created or aborted */
  369. void* csngen_register_callbacks(CSNGen *gen, GenCSNFn genFn, void *genArg,
  370. AbortCSNFn abortFn, void *abortArg)
  371. {
  372. callback_node *node;
  373. if (gen == NULL || (genFn == NULL && abortFn == NULL))
  374. return NULL;
  375. node = (callback_node *)slapi_ch_malloc (sizeof (callback_node));
  376. node->gen_fn = genFn;
  377. node->gen_arg = genArg;
  378. node->abort_fn = abortFn;
  379. node->abort_arg = abortArg;
  380. slapi_rwlock_wrlock (gen->callbacks.lock);
  381. dl_add (gen->callbacks.list, node);
  382. slapi_rwlock_unlock (gen->callbacks.lock);
  383. return node;
  384. }
  385. /* unregisters callbacks registered via call to csngenRegisterCallbacks */
  386. void csngen_unregister_callbacks(CSNGen *gen, void *cookie)
  387. {
  388. if (gen && cookie)
  389. {
  390. slapi_rwlock_wrlock (gen->callbacks.lock);
  391. dl_delete (gen->callbacks.list, cookie, _csngen_cmp_callbacks, slapi_ch_free);
  392. slapi_rwlock_unlock (gen->callbacks.lock);
  393. }
  394. }
  395. /* this functions is periodically called from daemon.c to
  396. update time used by all generators */
  397. void csngen_update_time ()
  398. {
  399. g_sampled_time = current_time ();
  400. }
  401. /* debugging function */
  402. void csngen_dump_state (const CSNGen *gen)
  403. {
  404. if (gen)
  405. {
  406. slapi_rwlock_rdlock (gen->lock);
  407. slapi_log_error(SLAPI_LOG_FATAL, NULL, "CSN generator's state:\n");
  408. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\treplica id: %d\n", gen->state.rid);
  409. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tsampled time: %ld\n", gen->state.sampled_time);
  410. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tlocal offset: %ld\n", gen->state.local_offset);
  411. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tremote offset: %ld\n", gen->state.remote_offset);
  412. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tsequence number: %d\n", gen->state.seq_num);
  413. slapi_rwlock_unlock (gen->lock);
  414. }
  415. }
  416. #define TEST_TIME 600 /* 10 minutes */
  417. /* This function tests csn generator. It verifies that csn's are generated in
  418. monotnically increasing order in the face of local and remote time skews */
  419. void csngen_test ()
  420. {
  421. int rc;
  422. CSNGen *gen = csngen_new (255, NULL);
  423. slapi_log_error(SLAPI_LOG_FATAL, NULL, "staring csn generator test ...");
  424. csngen_dump_state (gen);
  425. rc = _csngen_start_test_threads(gen);
  426. if (rc == 0)
  427. {
  428. DS_Sleep(PR_SecondsToInterval(TEST_TIME));
  429. }
  430. _csngen_stop_test_threads(gen);
  431. csngen_dump_state (gen);
  432. slapi_log_error(SLAPI_LOG_FATAL, NULL, "csn generator test is complete...");
  433. }
  434. /*
  435. * **************************************************************************
  436. * Helper functions
  437. * **************************************************************************
  438. */
  439. static int
  440. _csngen_parse_state (CSNGen *gen, Slapi_Attr *state)
  441. {
  442. int rc;
  443. Slapi_Value *val;
  444. const struct berval *bval;
  445. ReplicaId rid = gen->state.rid;
  446. PR_ASSERT (gen && state);
  447. rc = slapi_attr_first_value(state, &val);
  448. if (rc != 0)
  449. {
  450. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_parse_state: invalid state format\n");
  451. return CSN_INVALID_FORMAT;
  452. }
  453. bval = slapi_value_get_berval(val);
  454. memcpy (&gen->state, bval->bv_val, bval->bv_len);
  455. /* replicaid does not match */
  456. if (rid != gen->state.rid)
  457. {
  458. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_parse_state: replica id"
  459. " mismatch; current id - %d, replica id in the state - %d\n",
  460. rid, gen->state.rid);
  461. return CSN_INVALID_FORMAT;
  462. }
  463. return CSN_SUCCESS;
  464. }
  465. static int
  466. _csngen_init_callbacks (CSNGen *gen)
  467. {
  468. /* create a lock to control access to the callback list */
  469. gen->callbacks.lock = slapi_new_rwlock();
  470. if (gen->callbacks.lock == NULL)
  471. {
  472. return CSN_NSPR_ERROR;
  473. }
  474. gen->callbacks.list = dl_new ();
  475. dl_init (gen->callbacks.list, 0);
  476. return CSN_SUCCESS;
  477. }
  478. static void
  479. _csngen_free_callbacks (CSNGen *gen)
  480. {
  481. PR_ASSERT (gen);
  482. if (gen->callbacks.list)
  483. {
  484. dl_cleanup (gen->callbacks.list, slapi_ch_free);
  485. dl_free (&(gen->callbacks.list));
  486. }
  487. if (gen->callbacks.lock)
  488. slapi_destroy_rwlock (gen->callbacks.lock);
  489. }
  490. static void
  491. _csngen_call_callbacks (const CSNGen *gen, const CSN *csn, PRBool abort)
  492. {
  493. int cookie;
  494. callback_node* node;
  495. PR_ASSERT (gen && csn);
  496. slapi_rwlock_rdlock (gen->callbacks.lock);
  497. node = (callback_node*)dl_get_first (gen->callbacks.list, &cookie);
  498. while (node)
  499. {
  500. if (abort)
  501. {
  502. if (node->abort_fn)
  503. node->abort_fn (csn, node->abort_arg);
  504. }
  505. else
  506. {
  507. if (node->gen_fn)
  508. node->gen_fn (csn, node->gen_arg);
  509. }
  510. node = (callback_node*)dl_get_next (gen->callbacks.list, &cookie);
  511. }
  512. slapi_rwlock_unlock (gen->callbacks.lock);
  513. }
  514. /* el1 is just a pointer to the callback_node */
  515. static int
  516. _csngen_cmp_callbacks (const void *el1, const void *el2)
  517. {
  518. if (el1 == el2)
  519. return 0;
  520. if (el1 < el2)
  521. return -1;
  522. else
  523. return 1;
  524. }
  525. static int
  526. _csngen_adjust_local_time (CSNGen *gen, time_t cur_time)
  527. {
  528. extern int config_get_ignore_time_skew();
  529. int ignore_time_skew = config_get_ignore_time_skew();
  530. time_t time_diff = cur_time - gen->state.sampled_time;
  531. if (time_diff == 0) {
  532. /* This is a no op - _csngen_adjust_local_time should never be called
  533. in this case, because there is nothing to adjust - but just return
  534. here to protect ourselves
  535. */
  536. return CSN_SUCCESS;
  537. }
  538. else if (time_diff > 0)
  539. {
  540. time_t ts_before = CSN_CALC_TSTAMP(gen);
  541. time_t ts_after = 0;
  542. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  543. time_t new_time = CSN_CALC_TSTAMP(gen);
  544. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  545. "gen state before %08lx%04x:%ld:%ld:%ld\n",
  546. new_time, gen->state.seq_num,
  547. gen->state.sampled_time,
  548. gen->state.local_offset,
  549. gen->state.remote_offset);
  550. }
  551. gen->state.sampled_time = cur_time;
  552. if (time_diff > gen->state.local_offset)
  553. gen->state.local_offset = 0;
  554. else
  555. gen->state.local_offset = gen->state.local_offset - time_diff;
  556. /* only reset the seq_num if the new timestamp part of the CSN
  557. is going to be greater than the old one - if they are the
  558. same after the above adjustment (which can happen if
  559. csngen_adjust_time has to store the offset in the
  560. local_offset field) we must not allow the CSN to regress or
  561. generate duplicate numbers */
  562. ts_after = CSN_CALC_TSTAMP(gen);
  563. if (ts_after > ts_before) {
  564. gen->state.seq_num = 0; /* only reset if new time > old time */
  565. }
  566. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  567. time_t new_time = CSN_CALC_TSTAMP(gen);
  568. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  569. "gen state after %08lx%04x:%ld:%ld:%ld\n",
  570. new_time, gen->state.seq_num,
  571. gen->state.sampled_time,
  572. gen->state.local_offset,
  573. gen->state.remote_offset);
  574. }
  575. return CSN_SUCCESS;
  576. }
  577. else /* time was turned back */
  578. {
  579. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  580. time_t new_time = CSN_CALC_TSTAMP(gen);
  581. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  582. "gen state back before %08lx%04x:%ld:%ld:%ld\n",
  583. new_time, gen->state.seq_num,
  584. gen->state.sampled_time,
  585. gen->state.local_offset,
  586. gen->state.remote_offset);
  587. }
  588. if (!ignore_time_skew && (abs (time_diff) > CSN_MAX_TIME_ADJUST))
  589. {
  590. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_adjust_local_time: "
  591. "adjustment limit exceeded; value - %d, limit - %d\n",
  592. abs (time_diff), CSN_MAX_TIME_ADJUST);
  593. return CSN_LIMIT_EXCEEDED;
  594. }
  595. gen->state.sampled_time = cur_time;
  596. gen->state.local_offset = MAX_VAL (gen->state.local_offset, abs (time_diff));
  597. gen->state.seq_num = 0;
  598. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  599. time_t new_time = CSN_CALC_TSTAMP(gen);
  600. slapi_log_error (SLAPI_LOG_REPL, NULL, "_csngen_adjust_local_time: "
  601. "gen state back after %08lx%04x:%ld:%ld:%ld\n",
  602. new_time, gen->state.seq_num,
  603. gen->state.sampled_time,
  604. gen->state.local_offset,
  605. gen->state.remote_offset);
  606. }
  607. return CSN_SUCCESS;
  608. }
  609. }
  610. /*
  611. * **************************************************************************
  612. * test code
  613. * **************************************************************************
  614. */
  615. #define DEFAULT_THREAD_STACKSIZE 0
  616. #define GEN_TREAD_COUNT 20
  617. int s_thread_count;
  618. int s_must_exit;
  619. static int
  620. _csngen_start_test_threads(CSNGen *gen)
  621. {
  622. int i;
  623. PR_ASSERT (gen);
  624. s_thread_count = 0;
  625. s_must_exit = 0;
  626. /* create threads that generate csns */
  627. for(i=0; i< GEN_TREAD_COUNT; i++)
  628. {
  629. if (PR_CreateThread(PR_USER_THREAD, _csngen_gen_tester_main, gen,
  630. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  631. DEFAULT_THREAD_STACKSIZE) == NULL)
  632. {
  633. PRErrorCode prerr = PR_GetError();
  634. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  635. "failed to create a CSN generator thread number %d; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  636. i, prerr, slapd_pr_strerror(prerr));
  637. return -1;
  638. }
  639. s_thread_count ++;
  640. }
  641. /* create a thread that modifies remote time */
  642. if (PR_CreateThread(PR_USER_THREAD, _csngen_remote_tester_main, (void *)gen,
  643. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  644. DEFAULT_THREAD_STACKSIZE) == NULL)
  645. {
  646. PRErrorCode prerr = PR_GetError();
  647. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  648. "failed to create the remote CSN tester thread; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  649. prerr, slapd_pr_strerror(prerr));
  650. return -1;
  651. }
  652. s_thread_count ++;
  653. /* create a thread that modifies local time */
  654. if (PR_CreateThread(PR_USER_THREAD, _csngen_local_tester_main, (void *)gen,
  655. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  656. DEFAULT_THREAD_STACKSIZE) == NULL)
  657. {
  658. PRErrorCode prerr = PR_GetError();
  659. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  660. "failed to create the local CSN tester thread; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  661. prerr, slapd_pr_strerror(prerr));
  662. return -1;
  663. }
  664. s_thread_count ++;
  665. return 0;
  666. }
  667. static void _csngen_stop_test_threads ()
  668. {
  669. s_must_exit = 1;
  670. while (s_thread_count > 0)
  671. {
  672. /* sleep for 30 seconds */
  673. DS_Sleep (PR_SecondsToInterval(20));
  674. }
  675. }
  676. /* periodically generate a csn and dump it to the error log */
  677. static void
  678. _csngen_gen_tester_main (void *data)
  679. {
  680. CSNGen *gen = (CSNGen*)data;
  681. CSN *csn = NULL;
  682. char buff [CSN_STRSIZE];
  683. int rc;
  684. PR_ASSERT (gen);
  685. while (!s_must_exit)
  686. {
  687. rc = csngen_new_csn (gen, &csn, PR_FALSE);
  688. if (rc != CSN_SUCCESS)
  689. {
  690. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  691. "failed to generate csn; csn error - %d\n", rc);
  692. }
  693. else
  694. {
  695. slapi_log_error (SLAPI_LOG_FATAL, NULL, "generate csn %s\n",
  696. csn_as_string(csn, PR_FALSE, buff));
  697. }
  698. csn_free(&csn);
  699. /* sleep for 30 seconds */
  700. DS_Sleep (PR_SecondsToInterval(10));
  701. }
  702. PR_AtomicDecrement (&s_thread_count);
  703. }
  704. /* simulate clock skew with remote servers that causes
  705. generator to advance its remote offset */
  706. static void
  707. _csngen_remote_tester_main (void *data)
  708. {
  709. CSNGen *gen = (CSNGen*)data;
  710. CSN *csn;
  711. time_t csn_time;
  712. int rc;
  713. PR_ASSERT (gen);
  714. while (!s_must_exit)
  715. {
  716. rc = csngen_new_csn (gen, &csn, PR_FALSE);
  717. if (rc != CSN_SUCCESS)
  718. {
  719. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  720. "failed to generate csn; csn error - %d\n", rc);
  721. }
  722. else
  723. {
  724. csn_time = csn_get_time(csn);
  725. csn_set_time (csn, csn_time + slapi_rand () % 100);
  726. rc = csngen_adjust_time (gen, csn);
  727. if (rc != CSN_SUCCESS)
  728. {
  729. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  730. "failed to adjust generator's time; csn error - %d\n", rc);
  731. }
  732. csngen_dump_state (gen);
  733. }
  734. csn_free(&csn);
  735. /* sleep for 30 seconds */
  736. DS_Sleep (PR_SecondsToInterval(60));
  737. }
  738. PR_AtomicDecrement (&s_thread_count);
  739. }
  740. /* simulate local clock being set back */
  741. static void
  742. _csngen_local_tester_main (void *data)
  743. {
  744. CSNGen *gen = (CSNGen*)data;
  745. PR_ASSERT (gen);
  746. while (!s_must_exit)
  747. {
  748. /* sleep for 30 seconds */
  749. DS_Sleep (PR_SecondsToInterval(60));
  750. g_sampled_time -= slapi_rand () % 100;
  751. csngen_dump_state (gen);
  752. }
  753. PR_AtomicDecrement (&s_thread_count);
  754. }