csngen.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. /*
  39. * csngen.c - CSN Generator
  40. */
  41. #ifdef _WIN32
  42. #define _WIN32_WINNT 0x0400
  43. #include <windows.h>
  44. #endif
  45. #include <string.h>
  46. #include "prcountr.h"
  47. #include "slap.h"
  48. #define CSN_MAX_SEQNUM 0xffff /* largest sequence number */
  49. #define CSN_MAX_TIME_ADJUST 24*60*60 /* maximum allowed time adjustment (in seconds) = 1 day */
  50. #define ATTR_CSN_GENERATOR_STATE "nsState" /* attribute that stores csn state information */
  51. #define STATE_FORMAT "%8x%8x%8x%4hx%4hx"
  52. #define STATE_LENGTH 32
  53. #define MAX_VAL(x,y) ((x)>(y)?(x):(y))
  54. /*
  55. * **************************************************************************
  56. * data structures
  57. * **************************************************************************
  58. */
  59. /* callback node */
  60. typedef struct callback_node
  61. {
  62. GenCSNFn gen_fn; /* function to be called when new csn is generated */
  63. void *gen_arg; /* argument to pass to gen_fn function */
  64. AbortCSNFn abort_fn; /* function to be called when csn is aborted */
  65. void *abort_arg; /* argument to pass to abort_fn function */
  66. } callback_node;
  67. typedef struct callback_list
  68. {
  69. PRRWLock *lock;
  70. DataList *list; /* list of callback_node structures */
  71. } callback_list;
  72. /* persistently stored generator's state */
  73. typedef struct csngen_state
  74. {
  75. ReplicaId rid; /* replica id of the replicated area to which it is attached */
  76. time_t sampled_time; /* time last obtained from time() */
  77. time_t local_offset; /* offset due to the local clock being set back */
  78. time_t remote_offset; /* offset due to clock difference with remote systems */
  79. PRUint16 seq_num; /* used to allow to generate multiple csns within a second */
  80. }csngen_state;
  81. /* data maintained for each generator */
  82. struct csngen
  83. {
  84. csngen_state state; /* persistent state of the generator */
  85. callback_list callbacks; /* list of callbacks registered with the generator */
  86. PRRWLock *lock; /* concurrency control */
  87. };
  88. /*
  89. * **************************************************************************
  90. * global data
  91. * **************************************************************************
  92. */
  93. static time_t g_sampled_time; /* time obtained from time() call */
  94. /*
  95. * **************************************************************************
  96. * forward declarations of helper functions
  97. * **************************************************************************
  98. */
  99. static int _csngen_parse_state (CSNGen *gen, Slapi_Attr *state);
  100. static int _csngen_init_callbacks (CSNGen *gen);
  101. static void _csngen_call_callbacks (const CSNGen *gen, const CSN *csn, PRBool abort);
  102. static int _csngen_cmp_callbacks (const void *el1, const void *el2);
  103. static void _csngen_free_callbacks (CSNGen *gen);
  104. static int _csngen_adjust_local_time (CSNGen *gen, time_t cur_time);
  105. /*
  106. * **************************************************************************
  107. * forward declarations of tester functions
  108. * **************************************************************************
  109. */
  110. static int _csngen_start_test_threads (CSNGen *gen);
  111. static void _csngen_stop_test_threads ();
  112. static void _csngen_gen_tester_main (void *data);
  113. static void _csngen_local_tester_main (void *data);
  114. static void _csngen_remote_tester_main (void *data);
  115. /*
  116. * **************************************************************************
  117. * API
  118. * **************************************************************************
  119. */
  120. CSNGen*
  121. csngen_new (ReplicaId rid, Slapi_Attr *state)
  122. {
  123. int rc = CSN_SUCCESS;
  124. CSNGen *gen = NULL;
  125. gen = (CSNGen*)slapi_ch_calloc (1, sizeof (CSNGen));
  126. if (gen == NULL)
  127. {
  128. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new: memory allocation failed\n");
  129. return NULL;
  130. }
  131. /* create lock to control the access to the state information */
  132. gen->lock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "state_lock");
  133. if (gen->lock == NULL)
  134. {
  135. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new: failed to create lock\n");
  136. rc = CSN_NSPR_ERROR;
  137. goto done;
  138. }
  139. /* initialize callback list */
  140. _csngen_init_callbacks (gen);
  141. gen->state.rid = rid;
  142. if (state)
  143. {
  144. rc = _csngen_parse_state (gen, state);
  145. if (rc != CSN_SUCCESS)
  146. {
  147. goto done;
  148. }
  149. }
  150. else
  151. {
  152. /* new generator */
  153. gen->state.sampled_time = current_time ();
  154. gen->state.local_offset = 0;
  155. gen->state.remote_offset = 0;
  156. gen->state.seq_num = 0;
  157. }
  158. done:
  159. if (rc != CSN_SUCCESS)
  160. {
  161. if (gen)
  162. {
  163. csngen_free (&gen);
  164. }
  165. return NULL;
  166. }
  167. return gen;
  168. }
  169. void
  170. csngen_free (CSNGen **gen)
  171. {
  172. if (gen == NULL || *gen == NULL)
  173. return;
  174. _csngen_free_callbacks (*gen);
  175. if ((*gen)->lock)
  176. PR_DestroyRWLock ((*gen)->lock);
  177. }
  178. int
  179. csngen_new_csn (CSNGen *gen, CSN **csn, PRBool notify)
  180. {
  181. int rc = CSN_SUCCESS;
  182. time_t cur_time;
  183. int delta;
  184. if (gen == NULL || csn == NULL)
  185. {
  186. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: invalid argument\n");
  187. return CSN_INVALID_PARAMETER;
  188. }
  189. *csn = csn_new ();
  190. if (*csn == NULL)
  191. {
  192. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: memory allocation failed\n");
  193. return CSN_MEMORY_ERROR;
  194. }
  195. PR_RWLock_Wlock (gen->lock);
  196. if (g_sampled_time == 0)
  197. csngen_update_time ();
  198. cur_time = g_sampled_time;
  199. /* check if the time should be adjusted */
  200. delta = cur_time - gen->state.sampled_time;
  201. if (delta > 0)
  202. {
  203. rc = _csngen_adjust_local_time (gen, cur_time);
  204. if (rc != CSN_SUCCESS)
  205. {
  206. PR_RWLock_Unlock (gen->lock);
  207. return rc;
  208. }
  209. }
  210. else if (delta < -300) {
  211. /*
  212. * The maxseqnum could support up to 65535 CSNs per second.
  213. * That means that we could avoid duplicated CSN's for
  214. * delta up to 300 secs if update rate is 200/sec (usually
  215. * the max rate is below 20/sec).
  216. * Beyond 300 secs, we advance gen->state.sampled_time by
  217. * one sec to recycle seqnum.
  218. */
  219. slapi_log_error (SLAPI_LOG_FATAL, "csngen_new_csn", "Warning: too much time skew (%d secs). Current seqnum=%0x\n", delta, gen->state.seq_num );
  220. rc = _csngen_adjust_local_time (gen, gen->state.sampled_time+1);
  221. if (rc != CSN_SUCCESS)
  222. {
  223. PR_RWLock_Unlock (gen->lock);
  224. return rc;
  225. }
  226. }
  227. if (gen->state.seq_num == CSN_MAX_SEQNUM)
  228. {
  229. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_new_csn: sequence rollover; "
  230. "local offset updated.\n");
  231. gen->state.local_offset ++;
  232. gen->state.seq_num = 0;
  233. }
  234. (*csn)->tstamp = gen->state.sampled_time + gen->state.local_offset +
  235. gen->state.remote_offset;
  236. (*csn)->seqnum = gen->state.seq_num ++;
  237. (*csn)->rid = gen->state.rid;
  238. (*csn)->subseqnum = 0;
  239. /* The lock is intentionally unlocked before callbacks are called.
  240. This is to prevent deadlocks. The callback management code has
  241. its own lock */
  242. PR_RWLock_Unlock (gen->lock);
  243. /* notify modules that registered interest in csn generation */
  244. if (notify)
  245. {
  246. _csngen_call_callbacks (gen, *csn, 0);
  247. }
  248. return rc;
  249. }
  250. /* this function should be called for csns generated with non-zero notify
  251. that were unused because the corresponding operation was aborted.
  252. The function calls "abort" functions registered through
  253. csngen_register_callbacks call */
  254. void csngen_abort_csn (CSNGen *gen, const CSN *csn)
  255. {
  256. _csngen_call_callbacks (gen, csn, 1);
  257. }
  258. /* this function should be called when a remote CSN for the same part of
  259. the dit becomes known to the server (for instance, as part of RUV during
  260. replication session. In response, the generator would adjust its notion
  261. of time so that it does not generate smaller csns */
  262. int csngen_adjust_time (CSNGen *gen, const CSN* csn)
  263. {
  264. time_t remote_time, remote_offset;
  265. PRUint16 remote_seqnum;
  266. if (gen == NULL || csn == NULL)
  267. return CSN_INVALID_PARAMETER;
  268. remote_time = csn_get_time (csn);
  269. remote_seqnum = csn_get_seqnum (csn);
  270. PR_RWLock_Wlock (gen->lock);
  271. if (remote_seqnum > gen->state.seq_num )
  272. {
  273. if (remote_seqnum < CSN_MAX_SEQNUM)
  274. {
  275. gen->state.seq_num = remote_seqnum + 1;
  276. }
  277. else
  278. {
  279. remote_time++;
  280. }
  281. }
  282. if (remote_time >= gen->state.sampled_time)
  283. {
  284. remote_offset = remote_time - gen->state.sampled_time;
  285. if (remote_offset > gen->state.remote_offset)
  286. {
  287. if (remote_offset <= CSN_MAX_TIME_ADJUST)
  288. {
  289. gen->state.remote_offset = remote_offset;
  290. }
  291. else /* remote_offset > CSN_MAX_TIME_ADJUST */
  292. {
  293. slapi_log_error (SLAPI_LOG_FATAL, NULL, "csngen_adjust_time: "
  294. "adjustment limit exceeded; value - %d, limit - %d\n",
  295. remote_offset, CSN_MAX_TIME_ADJUST);
  296. PR_RWLock_Unlock (gen->lock);
  297. return CSN_LIMIT_EXCEEDED;
  298. }
  299. }
  300. }
  301. PR_RWLock_Unlock (gen->lock);
  302. return CSN_SUCCESS;
  303. }
  304. /* returns PR_TRUE if the csn was generated by this generator and
  305. PR_FALSE otherwise. */
  306. PRBool csngen_is_local_csn(const CSNGen *gen, const CSN *csn)
  307. {
  308. return (gen && csn && gen->state.rid == csn_get_replicaid(csn));
  309. }
  310. /* returns current state of the generator so that it can be saved in the DIT */
  311. int csngen_get_state (const CSNGen *gen, Slapi_Mod *state)
  312. {
  313. struct berval bval;
  314. if (gen == NULL || state == NULL)
  315. return CSN_INVALID_PARAMETER;
  316. PR_RWLock_Rlock (gen->lock);
  317. slapi_mod_init (state, 1);
  318. slapi_mod_set_type (state, ATTR_CSN_GENERATOR_STATE);
  319. slapi_mod_set_operation (state, LDAP_MOD_REPLACE | LDAP_MOD_BVALUES);
  320. bval.bv_val = (char*)&gen->state;
  321. bval.bv_len = sizeof (gen->state);
  322. slapi_mod_add_value(state, &bval);
  323. PR_RWLock_Unlock (gen->lock);
  324. return CSN_SUCCESS;
  325. }
  326. /* registers callbacks to be called when csn is created or aborted */
  327. void* csngen_register_callbacks(CSNGen *gen, GenCSNFn genFn, void *genArg,
  328. AbortCSNFn abortFn, void *abortArg)
  329. {
  330. callback_node *node;
  331. if (gen == NULL || (genFn == NULL && abortFn == NULL))
  332. return NULL;
  333. node = (callback_node *)slapi_ch_malloc (sizeof (callback_node));
  334. node->gen_fn = genFn;
  335. node->gen_arg = genArg;
  336. node->abort_fn = abortFn;
  337. node->abort_arg = abortArg;
  338. PR_RWLock_Wlock (gen->callbacks.lock);
  339. dl_add (gen->callbacks.list, node);
  340. PR_RWLock_Unlock (gen->callbacks.lock);
  341. return node;
  342. }
  343. /* unregisters callbacks registered via call to csngenRegisterCallbacks */
  344. void csngen_unregister_callbacks(CSNGen *gen, void *cookie)
  345. {
  346. if (gen && cookie)
  347. {
  348. PR_RWLock_Wlock (gen->callbacks.lock);
  349. dl_delete (gen->callbacks.list, cookie, _csngen_cmp_callbacks, slapi_ch_free);
  350. PR_RWLock_Unlock (gen->callbacks.lock);
  351. }
  352. }
  353. /* this functions is periodically called from daemon.c to
  354. update time used by all generators */
  355. void csngen_update_time ()
  356. {
  357. g_sampled_time = current_time ();
  358. }
  359. /* debugging function */
  360. void csngen_dump_state (const CSNGen *gen)
  361. {
  362. if (gen)
  363. {
  364. PR_RWLock_Rlock (gen->lock);
  365. slapi_log_error(SLAPI_LOG_FATAL, NULL, "CSN generator's state:\n");
  366. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\treplica id: %d\n", gen->state.rid);
  367. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tsampled time: %d\n", gen->state.sampled_time);
  368. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tlocal offset: %d\n", gen->state.local_offset);
  369. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tremote offset: %d\n", gen->state.remote_offset);
  370. slapi_log_error(SLAPI_LOG_FATAL, NULL, "\tsequence number: %d\n", gen->state.seq_num);
  371. PR_RWLock_Unlock (gen->lock);
  372. }
  373. }
  374. #define TEST_TIME 600 /* 10 minutes */
  375. /* This function tests csn generator. It verifies that csn's are generated in
  376. monotnically increasing order in the face of local and remote time skews */
  377. void csngen_test ()
  378. {
  379. int rc;
  380. CSNGen *gen = csngen_new (255, NULL);
  381. slapi_log_error(SLAPI_LOG_FATAL, NULL, "staring csn generator test ...");
  382. csngen_dump_state (gen);
  383. rc = _csngen_start_test_threads(gen);
  384. if (rc == 0)
  385. {
  386. DS_Sleep(PR_SecondsToInterval(TEST_TIME));
  387. }
  388. _csngen_stop_test_threads(gen);
  389. csngen_dump_state (gen);
  390. slapi_log_error(SLAPI_LOG_FATAL, NULL, "csn generator test is complete...");
  391. }
  392. /*
  393. * **************************************************************************
  394. * Helper functions
  395. * **************************************************************************
  396. */
  397. static int
  398. _csngen_parse_state (CSNGen *gen, Slapi_Attr *state)
  399. {
  400. int rc;
  401. Slapi_Value *val;
  402. const struct berval *bval;
  403. ReplicaId rid = gen->state.rid;
  404. PR_ASSERT (gen && state);
  405. rc = slapi_attr_first_value(state, &val);
  406. if (rc != 0)
  407. {
  408. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_parse_state: invalid state format\n");
  409. return CSN_INVALID_FORMAT;
  410. }
  411. bval = slapi_value_get_berval(val);
  412. memcpy (&gen->state, bval->bv_val, bval->bv_len);
  413. /* replicaid does not match */
  414. if (rid != gen->state.rid)
  415. {
  416. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_parse_state: replica id"
  417. " mismatch; current id - %d, replica id in the state - %d\n",
  418. rid, gen->state.rid);
  419. return CSN_INVALID_FORMAT;
  420. }
  421. return CSN_SUCCESS;
  422. }
  423. static int
  424. _csngen_init_callbacks (CSNGen *gen)
  425. {
  426. /* create a lock to control access to the callback list */
  427. gen->callbacks.lock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "callback_lock");
  428. if (gen->callbacks.lock == NULL)
  429. {
  430. return CSN_NSPR_ERROR;
  431. }
  432. gen->callbacks.list = dl_new ();
  433. dl_init (gen->callbacks.list, 0);
  434. return CSN_SUCCESS;
  435. }
  436. static void
  437. _csngen_free_callbacks (CSNGen *gen)
  438. {
  439. PR_ASSERT (gen);
  440. if (gen->callbacks.list)
  441. {
  442. dl_cleanup (gen->callbacks.list, slapi_ch_free);
  443. dl_free (&(gen->callbacks.list));
  444. }
  445. if (gen->callbacks.lock)
  446. PR_DestroyRWLock (gen->callbacks.lock);
  447. }
  448. static void
  449. _csngen_call_callbacks (const CSNGen *gen, const CSN *csn, PRBool abort)
  450. {
  451. int cookie;
  452. callback_node* node;
  453. PR_ASSERT (gen && csn);
  454. PR_RWLock_Rlock (gen->callbacks.lock);
  455. node = (callback_node*)dl_get_first (gen->callbacks.list, &cookie);
  456. while (node)
  457. {
  458. if (abort)
  459. {
  460. if (node->abort_fn)
  461. node->abort_fn (csn, node->abort_arg);
  462. }
  463. else
  464. {
  465. if (node->gen_fn)
  466. node->gen_fn (csn, node->gen_arg);
  467. }
  468. node = (callback_node*)dl_get_next (gen->callbacks.list, &cookie);
  469. }
  470. PR_RWLock_Unlock (gen->callbacks.lock);
  471. }
  472. /* el1 is just a pointer to the callback_node */
  473. static int
  474. _csngen_cmp_callbacks (const void *el1, const void *el2)
  475. {
  476. if (el1 == el2)
  477. return 0;
  478. if (el1 < el2)
  479. return -1;
  480. else
  481. return 1;
  482. }
  483. static int
  484. _csngen_adjust_local_time (CSNGen *gen, time_t cur_time)
  485. {
  486. time_t time_diff = cur_time - gen->state.sampled_time;
  487. if (time_diff > 0)
  488. {
  489. gen->state.sampled_time = cur_time;
  490. if (time_diff > gen->state.local_offset)
  491. gen->state.local_offset = 0;
  492. else
  493. gen->state.local_offset = gen->state.local_offset - time_diff;
  494. gen->state.seq_num = 0;
  495. return CSN_SUCCESS;
  496. }
  497. else /* time was turend back */
  498. {
  499. if (abs (time_diff) > CSN_MAX_TIME_ADJUST)
  500. {
  501. slapi_log_error (SLAPI_LOG_FATAL, NULL, "_csngen_adjust_local_time: "
  502. "adjustment limit exceeded; value - %d, limit - %d\n",
  503. abs (time_diff), CSN_MAX_TIME_ADJUST);
  504. return CSN_LIMIT_EXCEEDED;
  505. }
  506. gen->state.sampled_time = cur_time;
  507. gen->state.local_offset = MAX_VAL (gen->state.local_offset, abs (time_diff));
  508. gen->state.seq_num = 0;
  509. return CSN_SUCCESS;
  510. }
  511. }
  512. /*
  513. * **************************************************************************
  514. * test code
  515. * **************************************************************************
  516. */
  517. /*
  518. * The defult thread stacksize for nspr21 is 64k. For OSF, we require
  519. * a larger stacksize as actual storage allocation is higher i.e
  520. * pointers are allocated 8 bytes but lower 4 bytes are used.
  521. * The value 0 means use the default stacksize.
  522. */
  523. #if defined (OSF1) || defined(__LP64__) || defined (_LP64) /* 64-bit architectures need large stacks */
  524. #define DEFAULT_THREAD_STACKSIZE 131072L
  525. #else
  526. #define DEFAULT_THREAD_STACKSIZE 0
  527. #endif
  528. #define GEN_TREAD_COUNT 20
  529. int s_thread_count;
  530. int s_must_exit;
  531. static int
  532. _csngen_start_test_threads(CSNGen *gen)
  533. {
  534. int i;
  535. PR_ASSERT (gen);
  536. s_thread_count = 0;
  537. s_must_exit = 0;
  538. /* create threads that generate csns */
  539. for(i=0; i< GEN_TREAD_COUNT; i++)
  540. {
  541. if (PR_CreateThread(PR_USER_THREAD, _csngen_gen_tester_main, gen,
  542. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  543. DEFAULT_THREAD_STACKSIZE) == NULL)
  544. {
  545. PRErrorCode prerr = PR_GetError();
  546. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  547. "failed to create a CSN generator thread number %d; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  548. i, prerr, slapd_pr_strerror(prerr));
  549. return -1;
  550. }
  551. s_thread_count ++;
  552. }
  553. /* create a thread that modifies remote time */
  554. if (PR_CreateThread(PR_USER_THREAD, _csngen_remote_tester_main, (void *)gen,
  555. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  556. DEFAULT_THREAD_STACKSIZE) == NULL)
  557. {
  558. PRErrorCode prerr = PR_GetError();
  559. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  560. "failed to create the remote CSN tester thread; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  561. prerr, slapd_pr_strerror(prerr));
  562. return -1;
  563. }
  564. s_thread_count ++;
  565. /* create a thread that modifies local time */
  566. if (PR_CreateThread(PR_USER_THREAD, _csngen_local_tester_main, (void *)gen,
  567. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  568. DEFAULT_THREAD_STACKSIZE) == NULL)
  569. {
  570. PRErrorCode prerr = PR_GetError();
  571. slapi_log_error(SLAPI_LOG_FATAL, NULL,
  572. "failed to create the local CSN tester thread; " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  573. prerr, slapd_pr_strerror(prerr));
  574. return -1;
  575. }
  576. s_thread_count ++;
  577. return 0;
  578. }
  579. static void _csngen_stop_test_threads ()
  580. {
  581. s_must_exit = 1;
  582. while (s_thread_count > 0)
  583. {
  584. /* sleep for 30 seconds */
  585. DS_Sleep (PR_SecondsToInterval(20));
  586. }
  587. }
  588. /* periodically generate a csn and dump it to the error log */
  589. static void
  590. _csngen_gen_tester_main (void *data)
  591. {
  592. CSNGen *gen = (CSNGen*)data;
  593. CSN *csn;
  594. char buff [CSN_STRSIZE];
  595. int rc;
  596. PR_ASSERT (gen);
  597. while (!s_must_exit)
  598. {
  599. rc = csngen_new_csn (gen, &csn, PR_FALSE);
  600. if (rc != CSN_SUCCESS)
  601. {
  602. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  603. "failed to generate csn; csn error - %d\n", rc);
  604. }
  605. else
  606. {
  607. slapi_log_error (SLAPI_LOG_FATAL, NULL, "generate csn %s\n",
  608. csn_as_string(csn, PR_FALSE, buff));
  609. }
  610. /* sleep for 30 seconds */
  611. DS_Sleep (PR_SecondsToInterval(10));
  612. }
  613. PR_AtomicDecrement (&s_thread_count);
  614. }
  615. /* simulate clock skew with remote servers that causes
  616. generator to advance its remote offset */
  617. static void
  618. _csngen_remote_tester_main (void *data)
  619. {
  620. CSNGen *gen = (CSNGen*)data;
  621. CSN *csn;
  622. time_t csn_time;
  623. int rc;
  624. PR_ASSERT (gen);
  625. while (!s_must_exit)
  626. {
  627. rc = csngen_new_csn (gen, &csn, PR_FALSE);
  628. if (rc != CSN_SUCCESS)
  629. {
  630. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  631. "failed to generate csn; csn error - %d\n", rc);
  632. }
  633. else
  634. {
  635. csn_time = csn_get_time(csn);
  636. csn_set_time (csn, csn_time + slapi_rand () % 100);
  637. rc = csngen_adjust_time (gen, csn);
  638. if (rc != CSN_SUCCESS)
  639. {
  640. slapi_log_error (SLAPI_LOG_FATAL, NULL,
  641. "failed to adjust generator's time; csn error - %d\n", rc);
  642. }
  643. csngen_dump_state (gen);
  644. }
  645. /* sleep for 30 seconds */
  646. DS_Sleep (PR_SecondsToInterval(60));
  647. }
  648. PR_AtomicDecrement (&s_thread_count);
  649. }
  650. /* simulate local clock being set back */
  651. static void
  652. _csngen_local_tester_main (void *data)
  653. {
  654. CSNGen *gen = (CSNGen*)data;
  655. PR_ASSERT (gen);
  656. while (!s_must_exit)
  657. {
  658. /* sleep for 30 seconds */
  659. DS_Sleep (PR_SecondsToInterval(60));
  660. g_sampled_time -= slapi_rand () % 100;
  661. csngen_dump_state (gen);
  662. }
  663. PR_AtomicDecrement (&s_thread_count);
  664. }