cl5_api.c 215 KB


  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * Copyright (C) 2010 Hewlett-Packard Development Company, L.P.
  5. * All rights reserved.
  6. *
  7. * License: GPL (version 3 or any later version).
  8. * See LICENSE for details.
  9. * END COPYRIGHT BLOCK **/
  10. #ifdef HAVE_CONFIG_H
  11. #include <config.h>
  12. #endif
  13. /* cl5_api.c - implementation of 5.0 style changelog API */
  14. #include <unistd.h>
  15. #include <errno.h>
  16. #include <sys/stat.h>
  17. #if defined(OS_solaris) || defined(hpux)
  18. #include <sys/types.h>
  19. #include <sys/statvfs.h>
  20. #endif
  21. #if defined(linux)
  22. #include <sys/vfs.h>
  23. #endif
  24. #include "cl5.h"
  25. #include "cl_crypt.h"
  26. #include "plhash.h"
  27. #include "plstr.h"
  28. #include "db.h"
  29. #include "cl5_clcache.h" /* To use the Changelog Cache */
  30. #include "repl5.h" /* for agmt_get_consumer_rid() */
  31. #define GUARDIAN_FILE "guardian" /* name of the guardian file */
  32. #define VERSION_FILE "DBVERSION" /* name of the version file */
  33. #define V_5 5 /* changelog entry version */
  34. #define CHUNK_SIZE 64 * 1024
  35. #define DBID_SIZE 64
  36. #define FILE_SEP "_" /* separates parts of the db file name */
  37. #define T_CSNSTR "csn"
  38. #define T_UNIQUEIDSTR "nsuniqueid"
  39. #define T_PARENTIDSTR "parentuniqueid"
  40. #define T_NEWSUPERIORDNSTR "newsuperiordn"
  41. #define T_NEWSUPERIORIDSTR "newsuperioruniqueid"
  42. #define T_REPLGEN "replgen"
  43. #define ENTRY_COUNT_TIME 111 /* this time is used to construct csn \
  44. used to store/retrieve entry count */
  45. #define PURGE_RUV_TIME 222 /* this time is used to construct csn \
  46. used to store purge RUV vector */
  47. #define MAX_RUV_TIME 333 /* this time is used to construct csn \
  48. used to store upper boundary RUV vector */
  49. #define DB_EXTENSION_DB3 "db3"
  50. #define DB_EXTENSION_DB4 "db4"
  51. #if 1000 * DB_VERSION_MAJOR + 100 * DB_VERSION_MINOR >= 5000
  52. #define DB_EXTENSION "db"
  53. #else
  54. #define DB_EXTENSION "db4"
  55. #endif
  56. #define HASH_BACKETS_COUNT 16 /* number of buckets in a hash table */
  57. #define DEFAULT_DB_ENV_OP_FLAGS DB_AUTO_COMMIT
  58. #define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval) \
  59. { \
  60. if (((oflags)&DB_INIT_TXN) && ((oflags)&DB_INIT_LOG)) { \
  61. (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags) | DB_AUTO_COMMIT, (mode)); \
  62. } else { \
  63. (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags), (mode)); \
  64. } \
  65. }
  66. #define TXN_BEGIN(env, parent_txn, tid, flags) \
  67. (env)->txn_begin((env), (parent_txn), (tid), (flags))
  68. #define TXN_COMMIT(txn) (txn)->commit((txn), 0)
  69. #define TXN_ABORT(txn) (txn)->abort(txn)
  70. /*
  71. * The defult thread stacksize for nspr21 is 64k. For OSF, we require
  72. * a larger stacksize as actual storage allocation is higher i.e
  73. * pointers are allocated 8 bytes but lower 4 bytes are used.
  74. * The value 0 means use the default stacksize.
  75. */
  76. #if defined(__LP64__) || defined(_LP64) /* 64-bit architectures need bigger stacks */
  77. #if defined(__hpux) && defined(__ia64)
  78. #define DEFAULT_THREAD_STACKSIZE 524288L
  79. #else
  80. #define DEFAULT_THREAD_STACKSIZE 131072L
  81. #endif
  82. #else
  83. #define DEFAULT_THREAD_STACKSIZE 0
  84. #endif
  85. #define FILE_CREATE_MODE S_IRUSR | S_IWUSR
  86. #define DIR_CREATE_MODE 0755
  87. #define NO_DISK_SPACE 1024
  88. #define MIN_DISK_SPACE 10485760 /* 10 MB */
  89. /***** Data Definitions *****/
  90. /* possible changelog open modes */
  91. typedef enum {
  92. CL5_OPEN_NONE, /* nothing specified */
  93. CL5_OPEN_NORMAL, /* open for normal read/write use */
  94. CL5_OPEN_RESTORE_RECOVER, /* restore from archive and recover */
  95. CL5_OPEN_RESTORE, /* restore, but no recovery */
  96. CL5_OPEN_LDIF2CL, /* open as part of ldif2cl: no locking,
  97. recovery, checkpointing */
  98. CL5_OPEN_CLEAN_RECOVER /* remove env after recover open (upgrade) */
  99. } CL5OpenMode;
  100. #define DB_FILE_DELETED 0x1
  101. #define DB_FILE_INIT 0x2
  102. /* this structure represents one changelog file, Each changelog file contains
  103. changes applied to a single backend. Files are named by the database id */
  104. typedef struct cl5dbfile
  105. {
  106. char *name; /* file name (with the extension) */
  107. char *replGen; /* replica generation of the data */
  108. char *replName; /* replica name */
  109. DB *db; /* db handle to the changelog file*/
  110. int entryCount; /* number of entries in the file */
  111. int flags; /* currently used to mark the file as deleted
  112. * or as initialized */
  113. RUV *purgeRUV; /* ruv to which the file has been purged */
  114. RUV *maxRUV; /* ruv that marks the upper boundary of the data */
  115. } CL5DBFile;
  116. /* structure that allows to iterate through entries to be sent to a consumer
  117. that originated on a particular supplier. */
  118. struct cl5replayiterator
  119. {
  120. Object *fileObj;
  121. CLC_Buffer *clcache; /* changelog cache */
  122. ReplicaId consumerRID; /* consumer's RID */
  123. const RUV *consumerRuv; /* consumer's update vector */
  124. Object *supplierRuvObj; /* supplier's update vector object */
  125. };
  126. typedef struct cl5iterator
  127. {
  128. DBC *cursor; /* current position in the db file */
  129. Object *file; /* handle to release db file object */
  130. } CL5Iterator;
  131. /* changelog trimming configuration */
  132. typedef struct cl5trim
  133. {
  134. time_t maxAge; /* maximum entry age in seconds */
  135. int maxEntries; /* maximum number of entries across all changelog files */
  136. int compactInterval; /* interval to compact changelog db */
  137. int trimInterval; /* trimming interval */
  138. PRLock *lock; /* controls access to trimming configuration */
  139. } CL5Trim;
  140. /* this structure defines 5.0 changelog internals */
  141. typedef struct cl5desc
  142. {
  143. char *dbDir; /* absolute path to changelog directory */
  144. DB_ENV *dbEnv; /* db environment shared by all db files */
  145. int dbEnvOpenFlags; /* openflag used for env->open */
  146. Objset *dbFiles; /* ref counted set of changelog files (CL5DBFile) */
  147. PRLock *fileLock; /* ensures that changelog file is not added twice */
  148. CL5OpenMode dbOpenMode; /* how we open db */
  149. CL5DBConfig dbConfig; /* database configuration params */
  150. CL5Trim dbTrim; /* trimming parameters */
  151. CL5State dbState; /* changelog current state */
  152. Slapi_RWLock *stLock; /* lock that controls access to the changelog state */
  153. PRBool dbRmOnClose; /* indicates whether changelog should be removed when
  154. it is closed */
  155. PRBool fatalError; /* bad stuff happened like out of disk space; don't
  156. write guardian file on close - UnUsed so far */
  157. int threadCount; /* threads that globally access changelog like
  158. deadlock detection, etc. */
  159. PRLock *clLock; /* Lock associated to clVar, used to notify threads on close */
  160. PRCondVar *clCvar; /* Condition Variable used to notify threads on close */
  161. void *clcrypt_handle; /* for cl encryption */
  162. } CL5Desc;
  163. typedef void (*VFP)(void *);
  164. /***** Global Variables *****/
  165. static CL5Desc s_cl5Desc = {0};
  166. /***** Forward Declarations *****/
  167. /* changelog initialization and cleanup */
  168. static int _cl5Open(const char *dir, const CL5DBConfig *config, CL5OpenMode openMode);
  169. static int _cl5AppInit(void);
  170. static int _cl5DBOpen(void);
  171. static void _cl5SetDefaultDBConfig(void);
  172. static void _cl5SetDBConfig(const CL5DBConfig *config);
  173. static int _cl5CheckDBVersion(void);
  174. static int _cl5ReadDBVersion(const char *dir, char *clVersion, int buflen);
  175. static int _cl5WriteDBVersion(void);
  176. static void _cl5Close(void);
  177. static int _cl5Delete(const char *dir, PRBool rmDir);
  178. static void _cl5DBClose(void);
  179. /* thread management */
  180. static int _cl5DispatchDBThreads(void);
  181. static int _cl5AddThread(void);
  182. static void _cl5RemoveThread(void);
  183. /* functions that work with individual changelog files */
  184. static int _cl5NewDBFile(const char *replName, const char *replGen, CL5DBFile **dbFile);
  185. static int _cl5DBOpenFile(Object *replica, Object **obj, PRBool checkDups);
  186. static int _cl5DBOpenFileByReplicaName(const char *replName, const char *replGen, Object **obj, PRBool checkDups);
  187. static void _cl5DBCloseFile(void **data);
  188. static void _cl5DBDeleteFile(Object *obj);
  189. static void _cl5DBFileInitialized(Object *obj);
  190. static int _cl5GetDBFile(Object *replica, Object **obj);
  191. static int _cl5GetDBFileByReplicaName(const char *replName, const char *replGen, Object **obj);
  192. static int _cl5AddDBFile(CL5DBFile *file, Object **obj);
  193. static int _cl5CompareDBFile(Object *el1, const void *el2);
  194. static char *_cl5Replica2FileName(Object *replica);
  195. static char *_cl5MakeFileName(const char *replName, const char *replGen);
  196. static PRBool _cl5FileName2Replica(const char *fileName, Object **replica);
  197. static int _cl5ExportFile(PRFileDesc *prFile, Object *obj);
  198. static PRBool _cl5ReplicaInList(Object *replica, Object **replicas);
  199. /* data storage and retrieval */
  200. static int _cl5Entry2DBData(const CL5Entry *entry, char **data, PRUint32 *len);
  201. static int _cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local);
  202. static int _cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local, void *txn);
  203. static int _cl5GetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid);
  204. static int _cl5GetNextEntry(CL5Entry *entry, void *iterator);
  205. static int _cl5CurrentDeleteEntry(void *iterator);
  206. static PRBool _cl5IsValidIterator(const CL5Iterator *iterator);
  207. static int _cl5GetOperation(Object *replica, slapi_operation_parameters *op);
  208. static const char *_cl5OperationType2Str(int type);
  209. static int _cl5Str2OperationType(const char *str);
  210. static void _cl5WriteString(const char *str, char **buff);
  211. static void _cl5ReadString(char **str, char **buff);
  212. static void _cl5WriteMods(LDAPMod **mods, char **buff);
  213. static int _cl5WriteMod(LDAPMod *mod, char **buff);
  214. static int _cl5ReadMods(LDAPMod ***mods, char **buff);
  215. static int _cl5ReadMod(Slapi_Mod *mod, char **buff);
  216. static int _cl5GetModsSize(LDAPMod **mods);
  217. static int _cl5GetModSize(LDAPMod *mod);
  218. static void _cl5ReadBerval(struct berval *bv, char **buff);
  219. static void _cl5WriteBerval(struct berval *bv, char **buff);
  220. static int _cl5ReadBervals(struct berval ***bv, char **buff, unsigned int size);
  221. static int _cl5WriteBervals(struct berval **bv, char **buff, u_int32_t *size);
  222. /* replay iteration */
  223. #ifdef FOR_DEBUGGING
  224. static PRBool _cl5ValidReplayIterator(const CL5ReplayIterator *iterator);
  225. #endif
  226. static int _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Object *replica, Object *fileObject, CL5ReplayIterator **iterator, int *continue_on_missing);
  227. static int _cl5CheckMissingCSN(const CSN *minCsn, const RUV *supplierRUV, CL5DBFile *file);
  228. /* changelog trimming */
  229. static int _cl5TrimInit(void);
  230. static void _cl5TrimCleanup(void);
  231. static int _cl5TrimMain(void *param);
  232. static void _cl5DoTrimming(void);
  233. static void _cl5CompactDBs(void);
  234. static void _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid);
  235. static int _cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
  236. static int _cl5PurgeGetNextEntry(CL5Entry *entry, void *iterator, DBT *key);
  237. static void _cl5TrimFile(Object *obj, long *numToTrim);
  238. static PRBool _cl5CanTrim(time_t time, long *numToTrim);
  239. static int _cl5ReadRUV(const char *replGen, Object *obj, PRBool purge);
  240. static int _cl5WriteRUV(CL5DBFile *file, PRBool purge);
  241. static int _cl5ConstructRUV(const char *replGen, Object *obj, PRBool purge);
  242. static int _cl5UpdateRUV(Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
  243. static int _cl5GetRUV2Purge2(Object *fileObj, RUV **ruv);
  244. void trigger_cl_purging_thread(void *rid);
  245. /* bakup/recovery, import/export */
  246. static int _cl5LDIF2Operation(char *ldifEntry, slapi_operation_parameters *op, char **replGen);
  247. static int _cl5Operation2LDIF(const slapi_operation_parameters *op, const char *replGen, char **ldifEntry, PRInt32 *lenLDIF);
  248. /* entry count */
  249. static int _cl5GetEntryCount(CL5DBFile *file);
  250. static int _cl5WriteEntryCount(CL5DBFile *file);
  251. /* misc */
  252. static char *_cl5GetHelperEntryKey(int type, char *csnStr);
  253. static Object *_cl5GetReplica(const slapi_operation_parameters *op, const char *replGen);
  254. static int _cl5FileEndsWith(const char *filename, const char *ext);
  255. static PRLock *cl5_diskfull_lock = NULL;
  256. static int cl5_diskfull_flag = 0;
  257. static void cl5_set_diskfull(void);
  258. static void cl5_set_no_diskfull(void);
  259. /***** Module APIs *****/
  260. /* Name: cl5Init
  261. Description: initializes changelog module; must be called by a single thread
  262. before any other changelog function.
  263. Parameters: none
  264. Return: CL5_SUCCESS if function is successful;
  265. CL5_SYSTEM_ERROR error if NSPR call fails.
  266. */
  267. int
  268. cl5Init(void)
  269. {
  270. s_cl5Desc.stLock = slapi_new_rwlock();
  271. if (s_cl5Desc.stLock == NULL) {
  272. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  273. "cl5Init - Failed to create state lock; NSPR error - %d\n",
  274. PR_GetError());
  275. return CL5_SYSTEM_ERROR;
  276. }
  277. if ((s_cl5Desc.clLock = PR_NewLock()) == NULL) {
  278. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  279. "cl5Init - Failed to create on close lock; NSPR error - %d\n",
  280. PR_GetError());
  281. return CL5_SYSTEM_ERROR;
  282. }
  283. if ((s_cl5Desc.clCvar = PR_NewCondVar(s_cl5Desc.clLock)) == NULL) {
  284. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  285. "cl5Init - Failed to create on close cvar; NSPR error - %d\n",
  286. PR_GetError());
  287. return CL5_SYSTEM_ERROR;
  288. }
  289. if ((clcache_init(&s_cl5Desc.dbEnv) != 0)) {
  290. return CL5_SYSTEM_ERROR;
  291. }
  292. s_cl5Desc.dbState = CL5_STATE_CLOSED;
  293. s_cl5Desc.fatalError = PR_FALSE;
  294. s_cl5Desc.dbRmOnClose = PR_FALSE;
  295. s_cl5Desc.threadCount = 0;
  296. if (NULL == cl5_diskfull_lock) {
  297. cl5_diskfull_lock = PR_NewLock();
  298. }
  299. return CL5_SUCCESS;
  300. }
  301. /* Name: cl5Cleanup
  302. Description: performs cleanup of the changelog module; must be called by a single
  303. thread; it closes changelog if it is still open.
  304. Parameters: none
  305. Return: none
  306. */
  307. void
  308. cl5Cleanup()
  309. {
  310. /* close db if it is still open */
  311. if (s_cl5Desc.dbState == CL5_STATE_OPEN) {
  312. cl5Close();
  313. }
  314. if (s_cl5Desc.stLock)
  315. slapi_destroy_rwlock(s_cl5Desc.stLock);
  316. s_cl5Desc.stLock = NULL;
  317. if (cl5_diskfull_lock) {
  318. PR_DestroyLock(cl5_diskfull_lock);
  319. cl5_diskfull_lock = NULL;
  320. }
  321. if (s_cl5Desc.clLock != NULL) {
  322. PR_DestroyLock(s_cl5Desc.clLock);
  323. s_cl5Desc.clLock = NULL;
  324. }
  325. if (s_cl5Desc.clCvar != NULL) {
  326. PR_DestroyCondVar(s_cl5Desc.clCvar);
  327. s_cl5Desc.clCvar = NULL;
  328. }
  329. memset(&s_cl5Desc, 0, sizeof(s_cl5Desc));
  330. }
  331. /* Name: cl5Open
  332. Description: opens changelog; must be called after changelog is
  333. initialized using cl5Init. It is thread safe and the second
  334. call is ignored.
  335. Parameters: dir - changelog dir
  336. config - db configuration parameters; currently not used
  337. Return: CL5_SUCCESS if successfull;
  338. CL5_BAD_DATA if invalid directory is passed;
  339. CL5_BAD_STATE if changelog is not initialized;
  340. CL5_BAD_DBVERSION if dbversion file is missing or has unexpected data
  341. CL5_SYSTEM_ERROR if NSPR error occured (during db directory creation);
  342. CL5_MEMORY_ERROR if memory allocation fails;
  343. CL5_DB_ERROR if db initialization fails.
  344. */
  345. int
  346. cl5Open(const char *dir, const CL5DBConfig *config)
  347. {
  348. int rc;
  349. if (dir == NULL) {
  350. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5Open: null directory\n");
  351. return CL5_BAD_DATA;
  352. }
  353. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  354. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  355. "cl5Open - Changelog is not initialized\n");
  356. return CL5_BAD_STATE;
  357. }
  358. /* prevent state from changing */
  359. slapi_rwlock_wrlock(s_cl5Desc.stLock);
  360. /* already open - ignore */
  361. if (s_cl5Desc.dbState == CL5_STATE_OPEN) {
  362. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  363. "cl5Open - changelog already opened; request ignored\n");
  364. rc = CL5_SUCCESS;
  365. goto done;
  366. } else if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {
  367. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  368. "cl5Open - Invalid state - %d\n", s_cl5Desc.dbState);
  369. rc = CL5_BAD_STATE;
  370. goto done;
  371. }
  372. rc = _cl5Open(dir, config, CL5_OPEN_NORMAL);
  373. if (rc != CL5_SUCCESS) {
  374. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  375. "cl5Open - Failed to open changelog\n");
  376. goto done;
  377. }
  378. /* dispatch global threads like deadlock detection, trimming, etc */
  379. rc = _cl5DispatchDBThreads();
  380. if (rc != CL5_SUCCESS) {
  381. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  382. "cl5Open - Failed to start database monitoring threads\n");
  383. _cl5Close();
  384. } else {
  385. s_cl5Desc.dbState = CL5_STATE_OPEN;
  386. clcache_set_config();
  387. /* Set the cl encryption algorithm (if configured) */
  388. rc = clcrypt_init(config, &s_cl5Desc.clcrypt_handle);
  389. }
  390. done:
  391. slapi_rwlock_unlock(s_cl5Desc.stLock);
  392. return rc;
  393. }
  394. /* Name: cl5Close
  395. Description: closes changelog; waits until all threads are done using changelog;
  396. call is ignored if changelog is already closed.
  397. Parameters: none
  398. Return: CL5_SUCCESS if successful;
  399. CL5_BAD_STATE if db is not in the open or closed state;
  400. CL5_SYSTEM_ERROR if NSPR call fails;
  401. CL5_DB_ERROR if db shutdown fails
  402. */
  403. int
  404. cl5Close()
  405. {
  406. int rc = CL5_SUCCESS;
  407. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  408. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  409. "cl5Close - Changelog is not initialized\n");
  410. return CL5_BAD_STATE;
  411. }
  412. slapi_rwlock_wrlock(s_cl5Desc.stLock);
  413. /* already closed - ignore */
  414. if (s_cl5Desc.dbState == CL5_STATE_CLOSED) {
  415. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
  416. "cl5Close - Changelog closed; request ignored\n");
  417. slapi_rwlock_unlock(s_cl5Desc.stLock);
  418. return CL5_SUCCESS;
  419. } else if (s_cl5Desc.dbState != CL5_STATE_OPEN) {
  420. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  421. "cl5Close - Invalid state - %d\n", s_cl5Desc.dbState);
  422. slapi_rwlock_unlock(s_cl5Desc.stLock);
  423. return CL5_BAD_STATE;
  424. }
  425. /* signal changelog closing to all threads */
  426. s_cl5Desc.dbState = CL5_STATE_CLOSING;
  427. PR_Lock(s_cl5Desc.clLock);
  428. PR_NotifyCondVar(s_cl5Desc.clCvar);
  429. PR_Unlock(s_cl5Desc.clLock);
  430. _cl5Close();
  431. s_cl5Desc.dbState = CL5_STATE_CLOSED;
  432. slapi_rwlock_unlock(s_cl5Desc.stLock);
  433. return rc;
  434. }
  435. /* Name: cl5Delete
  436. Description: removes changelog; changelog must be in the closed state.
  437. Parameters: dir - changelog directory
  438. Return: CL5_SUCCESS if successful;
  439. CL5_BAD_STATE if the changelog is not in closed state;
  440. CL5_BAD_DATA if invalid directory supplied
  441. CL5_SYSTEM_ERROR if NSPR call fails
  442. */
  443. int
  444. cl5Delete(const char *dir)
  445. {
  446. int rc;
  447. if (dir == NULL) {
  448. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, "cl5Delete - NULL directory\n");
  449. return CL5_BAD_DATA;
  450. }
  451. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  452. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  453. "cl5Delete - Changelog is not initialized\n");
  454. return CL5_BAD_STATE;
  455. }
  456. slapi_rwlock_wrlock(s_cl5Desc.stLock);
  457. if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {
  458. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  459. "cl5Delete - Invalid state - %d\n", s_cl5Desc.dbState);
  460. slapi_rwlock_unlock(s_cl5Desc.stLock);
  461. return CL5_BAD_STATE;
  462. }
  463. rc = _cl5Delete(dir, PR_TRUE /* remove changelog dir */);
  464. if (rc != CL5_SUCCESS) {
  465. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  466. "cl5Delete - Failed to remove changelog\n");
  467. }
  468. slapi_rwlock_unlock(s_cl5Desc.stLock);
  469. return rc;
  470. }
  471. /* Name: cl5DeleteDBSync
  472. Description: The same as cl5DeleteDB except the function does not return
  473. until the file is removed.
  474. */
  475. int
  476. cl5DeleteDBSync(Object *replica)
  477. {
  478. Object *obj;
  479. int rc;
  480. CL5DBFile *file;
  481. if (replica == NULL) {
  482. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  483. "cl5DeleteDBSync - invalid database id\n");
  484. return CL5_BAD_DATA;
  485. }
  486. /* changelog is not initialized */
  487. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  488. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync - "
  489. "Changelog is not initialized\n");
  490. return CL5_BAD_STATE;
  491. }
  492. /* make sure that changelog stays open while operation is in progress */
  493. rc = _cl5AddThread();
  494. if (rc != CL5_SUCCESS)
  495. return rc;
  496. rc = _cl5GetDBFile(replica, &obj);
  497. if (rc == CL5_SUCCESS) {
  498. char *filename = NULL;
  499. file = (CL5DBFile *)object_get_data(obj);
  500. PR_ASSERT(file);
  501. /* file->name is freed in _cl5DBDeleteFile */
  502. filename = slapi_ch_strdup(file->name);
  503. _cl5DBDeleteFile(obj);
  504. /* wait until the file is gone */
  505. while (PR_Access(filename, PR_ACCESS_EXISTS) == PR_SUCCESS) {
  506. DS_Sleep(PR_MillisecondsToInterval(100));
  507. }
  508. slapi_ch_free_string(&filename);
  509. } else {
  510. Replica *r = (Replica *)object_get_data(replica);
  511. PR_ASSERT(r);
  512. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync - "
  513. "File for replica at (%s) not found\n",
  514. slapi_sdn_get_dn(replica_get_root(r)));
  515. }
  516. _cl5RemoveThread();
  517. return rc;
  518. }
  519. /* Name: cl5GetUpperBoundRUV
  520. Description: retrieves vector for that represents the upper bound of the changes for a replica.
  521. Parameters: r - replica for which the purge vector is requested
  522. ruv - contains a copy of the purge ruv if function is successful;
  523. unchanged otherwise. It is responsibility of the caller to free
  524. the ruv when it is no longer is in use
  525. Return: CL5_SUCCESS if function is successful
  526. CL5_BAD_STATE if the changelog is not initialized;
  527. CL5_BAD_DATA - if NULL id is supplied
  528. CL5_NOTFOUND, if changelog file for replica is not found
  529. */
  530. int
  531. cl5GetUpperBoundRUV(Replica *r, RUV **ruv)
  532. {
  533. int rc;
  534. Object *r_obj, *file_obj;
  535. CL5DBFile *file;
  536. if (r == NULL || ruv == NULL) {
  537. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  538. "cl5GetUpperBoundRUV - Invalid parameters\n");
  539. return CL5_BAD_DATA;
  540. }
  541. /* changelog is not initialized */
  542. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  543. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV - "
  544. "Changelog is not initialized\n");
  545. return CL5_BAD_STATE;
  546. }
  547. /* make sure that changelog stays open while operation is in progress */
  548. rc = _cl5AddThread();
  549. if (rc != CL5_SUCCESS)
  550. return rc;
  551. /* create a temporary replica object because of the interface we have */
  552. r_obj = object_new(r, NULL);
  553. rc = _cl5GetDBFile(r_obj, &file_obj);
  554. if (rc == CL5_SUCCESS) {
  555. file = (CL5DBFile *)object_get_data(file_obj);
  556. PR_ASSERT(file && file->maxRUV);
  557. *ruv = ruv_dup(file->maxRUV);
  558. object_release(file_obj);
  559. } else {
  560. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV - "
  561. "Could not find DB object for replica\n");
  562. }
  563. object_release(r_obj);
  564. _cl5RemoveThread();
  565. return rc;
  566. }
  567. /* Name: cl5ExportLDIF
  568. Description: dumps changelog to an LDIF file; changelog can be open or closed.
  569. Parameters: clDir - changelog dir
  570. ldifFile - full path to ldif file to write
  571. replicas - optional list of replicas whose changes should be exported;
  572. if the list is NULL, entire changelog is exported.
  573. Return: CL5_SUCCESS if function is successfull;
  574. CL5_BAD_DATA if invalid parameter is passed;
  575. CL5_BAD_STATE if changelog is not initialized;
  576. CL5_DB_ERROR if db api fails;
  577. CL5_SYSTEM_ERROR if NSPR call fails;
  578. CL5_MEMORY_ERROR if memory allocation fials.
  579. */
  580. int
  581. cl5ExportLDIF(const char *ldifFile, Object **replicas)
  582. {
  583. int i;
  584. int rc;
  585. PRFileDesc *prFile = NULL;
  586. Object *obj;
  587. if (ldifFile == NULL) {
  588. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  589. "cl5ExportLDIF - null ldif file name\n");
  590. return CL5_BAD_DATA;
  591. }
  592. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  593. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  594. "cl5ExportLDIF - Changelog is not initialized\n");
  595. return CL5_BAD_STATE;
  596. }
  597. /* make sure that changelog is open while operation is in progress */
  598. rc = _cl5AddThread();
  599. if (rc != CL5_SUCCESS)
  600. return rc;
  601. prFile = PR_Open(ldifFile, PR_WRONLY | PR_CREATE_FILE | PR_TRUNCATE, 0600);
  602. if (prFile == NULL) {
  603. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  604. "cl5ExportLDIF - Failed to open (%s) file; NSPR error - %d\n",
  605. ldifFile, PR_GetError());
  606. rc = CL5_SYSTEM_ERROR;
  607. goto done;
  608. }
  609. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
  610. "cl5ExportLDIF: starting changelog export to (%s) ...\n", ldifFile);
  611. if (replicas) /* export only selected files */
  612. {
  613. for (i = 0; replicas[i]; i++) {
  614. rc = _cl5GetDBFile(replicas[i], &obj);
  615. if (rc == CL5_SUCCESS) {
  616. rc = _cl5ExportFile(prFile, obj);
  617. object_release(obj);
  618. } else {
  619. Replica *r = (Replica *)object_get_data(replicas[i]);
  620. PR_ASSERT(r);
  621. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5ExportLDIF - "
  622. "Failed to locate changelog file for replica at (%s)\n",
  623. slapi_sdn_get_dn(replica_get_root(r)));
  624. }
  625. }
  626. } else /* export all files */
  627. {
  628. for (obj = objset_first_obj(s_cl5Desc.dbFiles); obj;
  629. obj = objset_next_obj(s_cl5Desc.dbFiles, obj)) {
  630. rc = _cl5ExportFile(prFile, obj);
  631. }
  632. }
  633. rc = CL5_SUCCESS;
  634. done:;
  635. _cl5RemoveThread();
  636. if (rc == CL5_SUCCESS)
  637. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
  638. "cl5ExportLDIF - Changelog export is finished.\n");
  639. if (prFile)
  640. PR_Close(prFile);
  641. return rc;
  642. }
  643. /* Name: cl5ImportLDIF
  644. Description: imports ldif file into changelog; changelog must be in the closed state
  645. Parameters: clDir - changelog dir
  646. ldifFile - absolute path to the ldif file to import
  647. replicas - list of replicas whose data should be imported.
  648. Return: CL5_SUCCESS if function is successfull;
  649. CL5_BAD_DATA if invalid parameter is passed;
  650. CL5_BAD_STATE if changelog is open or not inititalized;
  651. CL5_DB_ERROR if db api fails;
  652. CL5_SYSTEM_ERROR if NSPR call fails;
  653. CL5_MEMORY_ERROR if memory allocation fials.
  654. */
  655. int
  656. cl5ImportLDIF(const char *clDir, const char *ldifFile, Object **replicas)
  657. {
  658. #if defined(USE_OPENLDAP)
  659. LDIFFP *file = NULL;
  660. int buflen;
  661. ldif_record_lineno_t lineno = 0;
  662. #else
  663. FILE *file = NULL;
  664. int lineno = 0;
  665. #endif
  666. int rc;
  667. char *buff = NULL;
  668. slapi_operation_parameters op;
  669. Object *prim_replica_obj = NULL;
  670. Object *replica_obj = NULL;
  671. Object *file_obj = NULL;
  672. Replica *prim_replica = NULL;
  673. char *replGen = NULL;
  674. CL5DBFile *dbfile = NULL;
  675. struct berval **purgevals = NULL;
  676. struct berval **maxvals = NULL;
  677. int purgeidx = 0;
  678. int maxidx = 0;
  679. int maxpurgesz = 0;
  680. int maxmaxsz = 0;
  681. int entryCount = 0;
  682. /* validate params */
  683. if (ldifFile == NULL) {
  684. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  685. "cl5ImportLDIF - null ldif file name\n");
  686. return CL5_BAD_DATA;
  687. }
  688. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  689. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  690. "cl5ImportLDIF - Changelog is not initialized\n");
  691. return CL5_BAD_STATE;
  692. }
  693. if (replicas == NULL) {
  694. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  695. "cl5ImportLDIF - null list of replicas\n");
  696. return CL5_BAD_DATA;
  697. }
  698. prim_replica_obj = replicas[0];
  699. if (NULL == prim_replica_obj) {
  700. /* Never happens for now. (see replica_execute_ldif2cl_task) */
  701. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  702. "cl5ImportLDIF - empty replica list\n");
  703. return CL5_BAD_DATA;
  704. }
  705. prim_replica = (Replica *)object_get_data(prim_replica_obj);
  706. /* make sure that nobody change changelog state while import is in progress */
  707. slapi_rwlock_wrlock(s_cl5Desc.stLock);
  708. /* make sure changelog is closed */
  709. if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {
  710. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  711. "cl5ImportLDIF - Invalid state - %d \n", s_cl5Desc.dbState);
  712. slapi_rwlock_unlock(s_cl5Desc.stLock);
  713. return CL5_BAD_STATE;
  714. }
  715. /* open LDIF file */
  716. #if defined(USE_OPENLDAP)
  717. file = ldif_open(ldifFile, "r");
  718. #else
  719. file = fopen(ldifFile, "r"); /* XXXggood Does fopen reliably work if > 255 files open? */
  720. #endif
  721. if (file == NULL) {
  722. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  723. "cl5ImportLDIF - Failed to open (%s) ldif file; system error - %d\n",
  724. ldifFile, errno);
  725. rc = CL5_SYSTEM_ERROR;
  726. goto done;
  727. }
  728. /* remove changelog */
  729. rc = _cl5Delete(clDir, PR_FALSE);
  730. if (rc != CL5_SUCCESS) {
  731. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  732. "cl5ImportLDIF - Failed to remove changelog\n");
  733. goto done;
  734. }
  735. /* open changelog */
  736. rc = _cl5Open(clDir, NULL, CL5_OPEN_LDIF2CL);
  737. if (rc != CL5_SUCCESS) {
  738. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  739. "cl5ImportLDIF - Failed to open changelog\n");
  740. goto done;
  741. }
  742. s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */
  743. /* read entries and write them to changelog */
  744. #if defined(USE_OPENLDAP)
  745. while (ldif_read_record(file, &lineno, &buff, &buflen))
  746. #else
  747. while ((buff = ldif_get_entry(file, &lineno)) != NULL)
  748. #endif
  749. {
  750. rc = _cl5LDIF2Operation(buff, &op, &replGen);
  751. if (rc != CL5_SUCCESS) {
  752. /*
  753. * clpurgeruv: {replicageneration} 4d13a124000000010000
  754. * clpurgeruv: {replica 2 ldap://host:port}
  755. * clpurgeruv: {replica 1 ldap://host:port}
  756. * clmaxruv: {replicageneration} 4d13a124000000010000
  757. * clmaxruv: {replica 2} <mincsn> <maxcsn> <timestamp>
  758. * clmaxruv: {replica 1} <mincsn> <maxcsn> <timestamp>
  759. */
  760. char *line;
  761. char *next = buff;
  762. struct berval type, value;
  763. int freeval = 0;
  764. while ((line = ldif_getline(&next)) != NULL) {
  765. rc = slapi_ldif_parse_line(line, &type, &value, &freeval);
  766. /* ruv_dump (dbfile->purgeRUV, "clpurgeruv", prFile); */
  767. if (0 == strcasecmp(type.bv_val, "clpurgeruv")) {
  768. if (maxpurgesz < purgeidx + 2) {
  769. if (!maxpurgesz) {
  770. maxpurgesz = 4 * (purgeidx + 2);
  771. } else {
  772. maxpurgesz *= 2;
  773. }
  774. purgevals = (struct berval **)slapi_ch_realloc(
  775. (char *)purgevals,
  776. sizeof(struct berval *) * maxpurgesz);
  777. }
  778. purgevals[purgeidx++] = slapi_ch_bvdup(&value);
  779. purgevals[purgeidx] = NULL; /* make sure NULL terminated */
  780. }
  781. /* ruv_dump (dbfile->maxRUV, "clmaxruv", prFile); */
  782. else if (0 == strcasecmp(type.bv_val, "clmaxruv")) {
  783. if (maxmaxsz < maxidx + 2) {
  784. if (!maxmaxsz) {
  785. maxmaxsz = 4 * (maxidx + 2);
  786. } else {
  787. maxmaxsz *= 2;
  788. }
  789. maxvals = (struct berval **)slapi_ch_realloc(
  790. (char *)maxvals,
  791. sizeof(struct berval *) * maxmaxsz);
  792. }
  793. /* {replica #} min_csn csn [last_modified] */
  794. /* get rid of last_modified, if any */
  795. maxvals[maxidx++] = slapi_ch_bvdup(&value);
  796. maxvals[maxidx] = NULL; /* make sure NULL terminated */
  797. }
  798. if (freeval) {
  799. slapi_ch_free_string(&value.bv_val);
  800. }
  801. }
  802. slapi_ch_free_string(&buff);
  803. #if defined(USE_OPENLDAP)
  804. buflen = 0;
  805. #endif
  806. goto next;
  807. }
  808. slapi_ch_free_string(&buff);
  809. #if defined(USE_OPENLDAP)
  810. buflen = 0;
  811. #endif
  812. /* if we perform selective import, check if the operation should be wriiten to changelog */
  813. replica_obj = _cl5GetReplica(&op, replGen);
  814. if (replica_obj == NULL) {
  815. /*
  816. * changetype: delete
  817. * replgen: 4d13a124000000010000
  818. * csn: 4d23b909000000020000
  819. * nsuniqueid: 00000000-00000000-00000000-00000000
  820. * dn: cn=start iteration
  821. */
  822. rc = _cl5WriteOperation(replica_get_name(prim_replica),
  823. replGen, &op, 1);
  824. if (rc != CL5_SUCCESS) {
  825. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  826. "cl5ImportLDIF - "
  827. "Failed to write operation to the changelog: "
  828. "type: %lu, dn: %s\n",
  829. op.operation_type, REPL_GET_DN(&op.target_address));
  830. slapi_ch_free_string(&replGen);
  831. operation_parameters_done(&op);
  832. goto done;
  833. }
  834. entryCount++;
  835. goto next;
  836. }
  837. if (!replicas || _cl5ReplicaInList(replica_obj, replicas)) {
  838. /* write operation creates the file if it does not exist */
  839. rc = _cl5WriteOperation(replica_get_name((Replica *)object_get_data(replica_obj)),
  840. replGen, &op, 1);
  841. if (rc != CL5_SUCCESS) {
  842. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  843. "cl5ImportLDIF - "
  844. "Failed to write operation to the changelog: "
  845. "type: %lu, dn: %s\n",
  846. op.operation_type, REPL_GET_DN(&op.target_address));
  847. object_release(replica_obj);
  848. slapi_ch_free_string(&replGen);
  849. operation_parameters_done(&op);
  850. goto done;
  851. }
  852. entryCount++;
  853. }
  854. next:
  855. if (replica_obj) {
  856. object_release(replica_obj);
  857. }
  858. slapi_ch_free_string(&replGen);
  859. operation_parameters_done(&op);
  860. }
  861. /* Set RUVs and entry count */
  862. file_obj = objset_first_obj(s_cl5Desc.dbFiles);
  863. while (file_obj) {
  864. dbfile = (CL5DBFile *)object_get_data(file_obj);
  865. if (0 == strcasecmp(dbfile->replName, replica_get_name(prim_replica))) {
  866. break;
  867. }
  868. dbfile = NULL;
  869. file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);
  870. }
  871. if (dbfile) {
  872. if (purgeidx > 0) {
  873. ruv_destroy(&dbfile->purgeRUV);
  874. rc = ruv_init_from_bervals(purgevals, &dbfile->purgeRUV);
  875. }
  876. if (maxidx > 0) {
  877. ruv_destroy(&dbfile->maxRUV);
  878. rc = ruv_init_from_bervals(maxvals, &dbfile->maxRUV);
  879. }
  880. dbfile->entryCount = entryCount;
  881. }
  882. if (file_obj) {
  883. object_release(file_obj);
  884. }
  885. done:
  886. for (purgeidx = 0; purgevals && purgevals[purgeidx]; purgeidx++) {
  887. slapi_ch_bvfree(&purgevals[purgeidx]);
  888. }
  889. slapi_ch_free((void **)&purgevals);
  890. for (maxidx = 0; maxvals && maxvals[maxidx]; maxidx++) {
  891. slapi_ch_bvfree(&maxvals[maxidx]);
  892. }
  893. slapi_ch_free((void **)&maxvals);
  894. if (file) {
  895. #if defined(USE_OPENLDAP)
  896. ldif_close(file);
  897. #else
  898. fclose(file);
  899. #endif
  900. }
  901. if (CL5_STATE_OPEN == s_cl5Desc.dbState) {
  902. _cl5Close();
  903. s_cl5Desc.dbState = CL5_STATE_CLOSED; /* force to change the state */
  904. }
  905. slapi_rwlock_unlock(s_cl5Desc.stLock);
  906. return rc;
  907. }
  908. /* Name: cl5GetState
  909. Description: returns database state
  910. Parameters: none
  911. Return: changelog state
  912. */
  913. int
  914. cl5GetState()
  915. {
  916. return s_cl5Desc.dbState;
  917. }
  918. /* Name: cl5ConfigTrimming
  919. Description: sets changelog trimming parameters; changelog must be open.
  920. Parameters: maxEntries - maximum number of entries in the chnagelog (in all files);
  921. maxAge - maximum entry age;
  922. compactInterval - interval to compact changelog db;
  923. trimInterval - changelog trimming interval.
  924. Return: CL5_SUCCESS if successful;
  925. CL5_BAD_STATE if changelog is not open
  926. */
  927. int
  928. cl5ConfigTrimming(int maxEntries, const char *maxAge, int compactInterval, int trimInterval)
  929. {
  930. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  931. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  932. "cl5ConfigTrimming - Changelog is not initialized\n");
  933. return CL5_BAD_STATE;
  934. }
  935. /* make sure changelog is not closed while trimming configuration
  936. is updated.*/
  937. if (CL5_SUCCESS != _cl5AddThread()) {
  938. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  939. "cl5ConfigTrimming - Could not start changelog trimming thread\n");
  940. return CL5_BAD_STATE;
  941. }
  942. PR_Lock(s_cl5Desc.dbTrim.lock);
  943. if (maxAge) {
  944. /* don't ignore this argument */
  945. if (strcmp(maxAge, CL5_STR_IGNORE) != 0) {
  946. s_cl5Desc.dbTrim.maxAge = slapi_parse_duration(maxAge);
  947. }
  948. } else {
  949. /* unlimited */
  950. s_cl5Desc.dbTrim.maxAge = 0;
  951. }
  952. if (maxEntries != CL5_NUM_IGNORE) {
  953. s_cl5Desc.dbTrim.maxEntries = maxEntries;
  954. }
  955. if (compactInterval != CL5_NUM_IGNORE) {
  956. s_cl5Desc.dbTrim.compactInterval = compactInterval;
  957. }
  958. if (trimInterval != CL5_NUM_IGNORE) {
  959. s_cl5Desc.dbTrim.trimInterval = trimInterval;
  960. }
  961. /* The config was updated, notify the changelog trimming thread */
  962. PR_Lock(s_cl5Desc.clLock);
  963. PR_NotifyCondVar(s_cl5Desc.clCvar);
  964. PR_Unlock(s_cl5Desc.clLock);
  965. PR_Unlock(s_cl5Desc.dbTrim.lock);
  966. _cl5RemoveThread();
  967. return CL5_SUCCESS;
  968. }
  969. /* Name: cl5GetOperation
  970. Description: retireves operation specified by its csn and databaseid
  971. Parameters: op - must contain csn and databaseid; the rest of data is
  972. filled if function is successfull
  973. Return: CL5_SUCCESS if function is successfull;
  974. CL5_BAD_DATA if invalid op is passed;
  975. CL5_BAD_STATE if db has not been initialized;
  976. CL5_NOTFOUND if entry was not found;
  977. CL5_DB_ERROR if any other db error occured;
  978. CL5_BADFORMAT if db data format does not match entry format.
  979. */
  980. int
  981. cl5GetOperation(Object *replica, slapi_operation_parameters *op)
  982. {
  983. int rc;
  984. char *agmt_name;
  985. agmt_name = get_thread_private_agmtname();
  986. if (replica == NULL) {
  987. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation - NULL replica\n");
  988. return CL5_BAD_DATA;
  989. }
  990. if (op == NULL) {
  991. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation - NULL operation\n");
  992. return CL5_BAD_DATA;
  993. }
  994. if (op->csn == NULL) {
  995. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "%s: cl5GetOperation - operation contains no CSN\n", agmt_name);
  996. return CL5_BAD_DATA;
  997. }
  998. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  999. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1000. "cl5GetOperation - %s - Changelog is not initialized\n", agmt_name);
  1001. return CL5_BAD_STATE;
  1002. }
  1003. /* make sure that changelog is open while operation is in progress */
  1004. rc = _cl5AddThread();
  1005. if (rc != CL5_SUCCESS)
  1006. return rc;
  1007. rc = _cl5GetOperation(replica, op);
  1008. _cl5RemoveThread();
  1009. return rc;
  1010. }
  1011. /* Name: cl5GetFirstOperation
  1012. Description: retrieves first operation for a particular database
  1013. replica - replica for which the operation should be retrieved.
  1014. Parameters: op - buffer to store the operation;
  1015. iterator - to be passed to the call to cl5GetNextOperation
  1016. Return: CL5_SUCCESS, if successful
  1017. CL5_BADDATA, if operation is NULL
  1018. CL5_BAD_STATE, if changelog is not open
  1019. CL5_DB_ERROR, if db call fails
  1020. */
  1021. int
  1022. cl5GetFirstOperation(Object *replica, slapi_operation_parameters *op, void **iterator)
  1023. {
  1024. int rc;
  1025. CL5Entry entry;
  1026. Object *obj;
  1027. char *agmt_name;
  1028. if (replica == NULL || op == NULL || iterator == NULL) {
  1029. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1030. "cl5GetFirstOperation - Invalid argument\n");
  1031. return CL5_BAD_DATA;
  1032. }
  1033. *iterator = NULL;
  1034. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  1035. agmt_name = get_thread_private_agmtname();
  1036. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1037. "cl5GetFirstOperation - %s - Changelog is not initialized\n", agmt_name);
  1038. return CL5_BAD_STATE;
  1039. }
  1040. /* make sure that changelog stays open while operation is in progress */
  1041. rc = _cl5AddThread();
  1042. if (rc != CL5_SUCCESS)
  1043. return rc;
  1044. rc = _cl5GetDBFile(replica, &obj);
  1045. if (rc != CL5_SUCCESS) {
  1046. _cl5RemoveThread();
  1047. return rc;
  1048. }
  1049. entry.op = op;
  1050. /* Callers of this function should cl5_operation_parameters_done(op) */
  1051. rc = _cl5GetFirstEntry(obj, &entry, iterator, NULL);
  1052. object_release(obj);
  1053. _cl5RemoveThread();
  1054. return rc;
  1055. }
  1056. /* Name: cl5GetNextOperation
  1057. Description: retrieves the next op from the changelog as defined by the iterator;
  1058. changelog must be open.
  1059. Parameters: op - returned operation, if function is successful
  1060. iterator - in: identifies op to retrieve; out: identifies next op
  1061. Return: CL5_SUCCESS, if successful
  1062. CL5_BADDATA, if op is NULL
  1063. CL5_BAD_STATE, if changelog is not open
  1064. CL5_NOTFOUND, empty changelog
  1065. CL5_DB_ERROR, if db call fails
  1066. */
  1067. int
  1068. cl5GetNextOperation(slapi_operation_parameters *op, void *iterator)
  1069. {
  1070. CL5Entry entry;
  1071. if (op == NULL || iterator == NULL || !_cl5IsValidIterator(iterator)) {
  1072. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1073. "cl5GetNextOperation - Invalid argument\n");
  1074. return CL5_BAD_DATA;
  1075. }
  1076. if (s_cl5Desc.dbState != CL5_STATE_OPEN) {
  1077. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1078. "cl5GetNextOperation - Changelog is not open\n");
  1079. return CL5_BAD_STATE;
  1080. }
  1081. /* we don't need to increment thread count since cl5GetFirstOperation
  1082. locked the file through which we are iterating */
  1083. entry.op = op;
  1084. /* Callers of this function should cl5_operation_parameters_done(op) */
  1085. return _cl5GetNextEntry(&entry, iterator);
  1086. }
  1087. /* Name: cl5DestroyIterator
  1088. Description: destroys iterator once iteration through changelog is done
  1089. Parameters: iterator - iterator to destroy
  1090. Return: none
  1091. */
  1092. void
  1093. cl5DestroyIterator(void *iterator)
  1094. {
  1095. CL5Iterator *it = (CL5Iterator *)iterator;
  1096. if (it == NULL)
  1097. return;
  1098. /* close cursor */
  1099. if (it->cursor)
  1100. it->cursor->c_close(it->cursor);
  1101. if (it->file)
  1102. object_release(it->file);
  1103. slapi_ch_free((void **)&it);
  1104. }
  1105. /* Name: cl5WriteOperationTxn
  1106. Description: writes operation to changelog
  1107. Parameters: replName - name of the replica to which operation applies
  1108. replGen - replica generation for the operation
  1109. !!!Note that we pass name and generation rather than
  1110. replica object since generation can change while operation
  1111. is in progress (if the data is reloaded). !!!
  1112. op - operation to write
  1113. local - this is a non-replicated operation
  1114. txn - the transaction containing this operation
  1115. Return: CL5_SUCCESS if function is successfull;
  1116. CL5_BAD_DATA if invalid op is passed;
  1117. CL5_BAD_STATE if db has not been initialized;
  1118. CL5_MEMORY_ERROR if memory allocation failed;
  1119. CL5_DB_ERROR if any other db error occured;
  1120. */
  1121. int
  1122. cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local, void *txn)
  1123. {
  1124. int rc;
  1125. if (op == NULL) {
  1126. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1127. "cl5WriteOperationTxn - NULL operation passed\n");
  1128. return CL5_BAD_DATA;
  1129. }
  1130. if (!IsValidOperation(op)) {
  1131. return CL5_BAD_DATA;
  1132. }
  1133. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  1134. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1135. "cl5WriteOperationTxn - Changelog is not initialized\n");
  1136. return CL5_BAD_STATE;
  1137. }
  1138. /* make sure that changelog is open while operation is in progress */
  1139. rc = _cl5AddThread();
  1140. if (rc != CL5_SUCCESS)
  1141. return rc;
  1142. rc = _cl5WriteOperationTxn(replName, replGen, op, local, txn);
  1143. /* update the upper bound ruv vector */
  1144. if (rc == CL5_SUCCESS) {
  1145. Object *file_obj = NULL;
  1146. if (_cl5GetDBFileByReplicaName(replName, replGen, &file_obj) == CL5_SUCCESS) {
  1147. rc = _cl5UpdateRUV(file_obj, op->csn, PR_FALSE, PR_FALSE);
  1148. object_release(file_obj);
  1149. }
  1150. }
  1151. _cl5RemoveThread();
  1152. return rc;
  1153. }
  1154. /* Name: cl5WriteOperation
  1155. Description: writes operation to changelog
  1156. Parameters: replName - name of the replica to which operation applies
  1157. replGen - replica generation for the operation
  1158. !!!Note that we pass name and generation rather than
  1159. replica object since generation can change while operation
  1160. is in progress (if the data is reloaded). !!!
  1161. op - operation to write
  1162. local - this is a non-replicated operation
  1163. Return: CL5_SUCCESS if function is successfull;
  1164. CL5_BAD_DATA if invalid op is passed;
  1165. CL5_BAD_STATE if db has not been initialized;
  1166. CL5_MEMORY_ERROR if memory allocation failed;
  1167. CL5_DB_ERROR if any other db error occured;
  1168. */
  1169. int
  1170. cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local)
  1171. {
  1172. return cl5WriteOperationTxn(replName, replGen, op, local, NULL);
  1173. }
  1174. /* Name: cl5CreateReplayIterator
  1175. Description: creates an iterator that allows to retrieve changes that should
  1176. to be sent to the consumer identified by ruv. The iteration is performed by
  1177. repeated calls to cl5GetNextOperationToReplay.
  1178. Parameters: replica - replica whose data we wish to iterate;
  1179. ruv - consumer ruv;
  1180. iterator - iterator to be passed to cl5GetNextOperationToReplay call
  1181. Return: CL5_SUCCESS, if function is successful;
  1182. CL5_MISSING_DATA, if data that should be in the changelog is missing
  1183. CL5_PURGED_DATA, if some data that consumer needs has been purged.
  1184. Note that the iterator can be non null if the supplier contains
  1185. some data that needs to be sent to the consumer
  1186. CL5_NOTFOUND if the consumer is up to data with respect to the supplier
  1187. CL5_BAD_DATA if invalid parameter is passed;
  1188. CL5_BAD_STATE if db has not been open;
  1189. CL5_DB_ERROR if any other db error occurred;
  1190. CL5_MEMORY_ERROR if memory allocation fails.
  1191. Algorithm: Build a list of csns from consumer's and supplier's ruv. For each element
  1192. of the consumer's ruv put max csn into the csn list. For each element
  1193. of the supplier's ruv not in the consumer's ruv put min csn from the
  1194. supplier's ruv into the list. The list contains, for each known replica,
  1195. the starting point for changes to be sent to the consumer.
  1196. Sort the list in ascending order.
  1197. Build a hash which contains, for each known replica, whether the
  1198. supplier can bring the consumer up to data with respect to that replica.
  1199. The hash is used to decide whether a change can be sent to the consumer
  1200. Find the replica with the smallest csn in the list for which
  1201. we can bring the consumer up to date.
  1202. Position the db cursor on the change entry that corresponds to this csn.
  1203. Hash entries are created for each replica traversed so far. sendChanges
  1204. flag is set to FALSE for all replicas except the last traversed.
  1205. */
  1206. int
  1207. cl5CreateReplayIteratorEx(Private_Repl_Protocol *prp, const RUV *consumerRuv, CL5ReplayIterator **iterator, ReplicaId consumerRID)
  1208. {
  1209. int rc;
  1210. Object *replica;
  1211. Object *obj = NULL;
  1212. replica = prp->replica_object;
  1213. if (replica == NULL || consumerRuv == NULL || iterator == NULL) {
  1214. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1215. "cl5CreateReplayIteratorEx - Invalid parameter\n");
  1216. return CL5_BAD_DATA;
  1217. }
  1218. *iterator = NULL;
  1219. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  1220. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1221. "cl5CreateReplayIteratorEx - Changelog is not initialized\n");
  1222. return CL5_BAD_STATE;
  1223. }
  1224. /* make sure that changelog is open while operation is in progress */
  1225. rc = _cl5AddThread();
  1226. if (rc != CL5_SUCCESS)
  1227. return rc;
  1228. rc = _cl5GetDBFile(replica, &obj);
  1229. if (rc == CL5_SUCCESS && obj) {
  1230. /* iterate through the ruv in csn order to find first master for which
  1231. we can replay changes */
  1232. rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, obj, iterator, NULL);
  1233. } else {
  1234. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1235. "cl5CreateReplayIteratorEx - Could not find DB object for replica\n");
  1236. }
  1237. if (rc != CL5_SUCCESS) {
  1238. if (obj)
  1239. object_release(obj);
  1240. /* release the thread */
  1241. _cl5RemoveThread();
  1242. }
  1243. return rc;
  1244. }
  1245. /* cl5CreateReplayIterator is now a wrapper for cl5CreateReplayIteratorEx */
  1246. int
  1247. cl5CreateReplayIterator(Private_Repl_Protocol *prp, const RUV *consumerRuv, CL5ReplayIterator **iterator)
  1248. {
  1249. /* DBDB : I thought it should be possible to refactor this like so, but it seems to not work.
  1250. Possibly the ordering of the calls is significant.
  1251. ReplicaId consumerRID = agmt_get_consumer_rid ( prp->agmt, prp->conn );
  1252. return cl5CreateReplayIteratorEx(prp,consumerRuv,iterator,consumerRID); */
  1253. int rc;
  1254. Object *replica;
  1255. Object *obj = NULL;
  1256. replica = prp->replica_object;
  1257. if (replica == NULL || consumerRuv == NULL || iterator == NULL) {
  1258. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1259. "cl5CreateReplayIterator - Invalid parameter\n");
  1260. return CL5_BAD_DATA;
  1261. }
  1262. *iterator = NULL;
  1263. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  1264. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1265. "cl5CreateReplayIterator - Changelog is not initialized\n");
  1266. return CL5_BAD_STATE;
  1267. }
  1268. /* make sure that changelog is open while operation is in progress */
  1269. rc = _cl5AddThread();
  1270. if (rc != CL5_SUCCESS)
  1271. return rc;
  1272. rc = _cl5GetDBFile(replica, &obj);
  1273. if (rc == CL5_SUCCESS && obj) {
  1274. /* iterate through the ruv in csn order to find first master for which
  1275. we can replay changes */
  1276. ReplicaId consumerRID = agmt_get_consumer_rid(prp->agmt, prp->conn);
  1277. int continue_on_missing = agmt_get_ignoremissing(prp->agmt);
  1278. int save_cont_miss = continue_on_missing;
  1279. rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, obj, iterator, &continue_on_missing);
  1280. if (save_cont_miss == 1 && continue_on_missing == 0) {
  1281. /* the option to continue once on a missing csn was used, rest */
  1282. agmt_set_ignoremissing(prp->agmt, 0);
  1283. }
  1284. } else {
  1285. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1286. "cl5CreateReplayIterator - Could not find DB object for replica\n");
  1287. }
  1288. if (rc != CL5_SUCCESS) {
  1289. if (obj)
  1290. object_release(obj);
  1291. /* release the thread */
  1292. _cl5RemoveThread();
  1293. }
  1294. return rc;
  1295. }
  1296. /* Name: cl5GetNextOperationToReplay
  1297. Description: retrieves next operation to be sent to a particular consumer and
  1298. that was created on a particular master. Consumer and master info
  1299. is encoded in the iterator parameter that must be created by call
  1300. to cl5CreateReplayIterator.
  1301. Parameters: iterator - iterator that identifies next entry to retrieve;
  1302. op - operation retrieved if function is successful
  1303. Return: CL5_SUCCESS if function is successfull;
  1304. CL5_BAD_DATA if invalid parameter is passed;
  1305. CL5_NOTFOUND if end of iteration list is reached
  1306. CL5_DB_ERROR if any other db error occured;
  1307. CL5_BADFORMAT if data in db is of unrecognized format;
  1308. CL5_MEMORY_ERROR if memory allocation fails.
  1309. Algorithm: Iterate through changelog entries until a change is found that
  1310. originated at the replica for which we are sending changes
  1311. (based on the information in the iteration hash) and
  1312. whose csn is larger than the csn already seen by the consumer
  1313. If change originated at the replica not in the hash,
  1314. determine whether we should send changes originated at the replica
  1315. and add replica entry into the hash. We can send the changes for
  1316. the replica if the current csn is smaller or equal to the csn
  1317. in the consumer's ruv (if present) or if it is equal to the min
  1318. csn in the supplier's ruv.
  1319. */
  1320. int
  1321. cl5GetNextOperationToReplay(CL5ReplayIterator *iterator, CL5Entry *entry)
  1322. {
  1323. CSN *csn;
  1324. char *key, *data;
  1325. size_t keylen, datalen;
  1326. char *agmt_name;
  1327. int rc = 0;
  1328. agmt_name = get_thread_private_agmtname();
  1329. if (entry == NULL) {
  1330. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1331. "cl5GetNextOperationToReplay - %s - Invalid parameter passed\n", agmt_name);
  1332. return CL5_BAD_DATA;
  1333. }
  1334. rc = clcache_get_next_change(iterator->clcache, (void **)&key, &keylen, (void **)&data, &datalen, &csn);
  1335. if (rc == DB_NOTFOUND) {
  1336. /*
  1337. * Abort means we've figured out that we've passed the replica Min CSN,
  1338. * so we should stop looping through the changelog
  1339. */
  1340. return CL5_NOTFOUND;
  1341. }
  1342. if (rc != 0) {
  1343. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5GetNextOperationToReplay - %s - "
  1344. "Failed to read next entry; DB error %d\n",
  1345. agmt_name, rc);
  1346. return CL5_DB_ERROR;
  1347. }
  1348. if (is_cleaned_rid(csn_get_replicaid(csn))) {
  1349. /*
  1350. * This operation is from a deleted replica. During the cleanallruv task the
  1351. * replicas are cleaned first before this instance is. This can cause the
  1352. * server to basically do a full update over and over. So we have to watch for
  1353. * this, and not send these operations out.
  1354. */
  1355. return CL5_IGNORE_OP;
  1356. }
  1357. /* there is an entry we should return */
  1358. /* Callers of this function should cl5_operation_parameters_done(op) */
  1359. if (0 != cl5DBData2Entry(data, datalen, entry)) {
  1360. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1361. "cl5GetNextOperationToReplay - %s - Failed to format entry rc=%d\n", agmt_name, rc);
  1362. return rc;
  1363. }
  1364. return CL5_SUCCESS;
  1365. }
  1366. /* Name: cl5DestroyReplayIterator
  1367. Description: destorys iterator
  1368. Parameters: iterator - iterator to destory
  1369. Return: none
  1370. */
  1371. void
  1372. cl5DestroyReplayIterator(CL5ReplayIterator **iterator)
  1373. {
  1374. if (iterator == NULL) {
  1375. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1376. "cl5DestroyReplayIterator - Invalid iterator passed\n");
  1377. return;
  1378. }
  1379. clcache_return_buffer(&(*iterator)->clcache);
  1380. if ((*iterator)->fileObj) {
  1381. object_release((*iterator)->fileObj);
  1382. (*iterator)->fileObj = NULL;
  1383. }
  1384. /* release supplier's ruv */
  1385. if ((*iterator)->supplierRuvObj) {
  1386. object_release((*iterator)->supplierRuvObj);
  1387. (*iterator)->supplierRuvObj = NULL;
  1388. }
  1389. slapi_ch_free((void **)iterator);
  1390. /* this thread no longer holds a db reference, release it */
  1391. _cl5RemoveThread();
  1392. }
  1393. /* Name: cl5DeleteOnClose
  1394. Description: marks changelog for deletion when it is closed
  1395. Parameters: flag; if flag = 1 then delete else don't
  1396. Return: none
  1397. */
  1398. void
  1399. cl5DeleteOnClose(PRBool rm)
  1400. {
  1401. s_cl5Desc.dbRmOnClose = rm;
  1402. }
  1403. /* Name: cl5GetDir
  1404. Description: returns changelog directory
  1405. Parameters: none
  1406. Return: copy of the directory; caller needs to free the string
  1407. */
  1408. char *
  1409. cl5GetDir()
  1410. {
  1411. if (s_cl5Desc.dbDir == NULL) {
  1412. return NULL;
  1413. } else {
  1414. return slapi_ch_strdup(s_cl5Desc.dbDir);
  1415. }
  1416. }
  1417. /* Name: cl5Exist
  1418. Description: checks if a changelog exists in the specified directory;
  1419. We consider changelog to exist if it contains the dbversion file.
  1420. Parameters: clDir - directory to check
  1421. Return: 1 - if changelog exists; 0 - otherwise
  1422. */
  1423. PRBool
  1424. cl5Exist(const char *clDir)
  1425. {
  1426. char fName[MAXPATHLEN + 1];
  1427. int rc;
  1428. PR_snprintf(fName, MAXPATHLEN, "%s/%s", clDir, VERSION_FILE);
  1429. rc = PR_Access(fName, PR_ACCESS_EXISTS);
  1430. return (rc == PR_SUCCESS);
  1431. }
  1432. /* Name: cl5GetOperationCount
  1433. Description: returns number of entries in the changelog. The changelog must be
  1434. open for the value to be meaningful.
  1435. Parameters: replica - optional parameter that specifies the replica whose operations
  1436. we wish to count; if NULL all changelog entries are counted
  1437. Return: number of entries in the changelog
  1438. */
  1439. int
  1440. cl5GetOperationCount(Object *replica)
  1441. {
  1442. Object *obj;
  1443. CL5DBFile *file;
  1444. int count = 0;
  1445. int rc;
  1446. if (s_cl5Desc.dbState == CL5_STATE_NONE) {
  1447. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1448. "cl5GetOperationCount - Changelog is not initialized\n");
  1449. return -1;
  1450. }
  1451. /* make sure that changelog is open while operation is in progress */
  1452. rc = _cl5AddThread();
  1453. if (rc != CL5_SUCCESS)
  1454. return -1;
  1455. if (replica == NULL) /* compute total entry count */
  1456. {
  1457. obj = objset_first_obj(s_cl5Desc.dbFiles);
  1458. while (obj) {
  1459. file = (CL5DBFile *)object_get_data(obj);
  1460. PR_ASSERT(file);
  1461. count += file->entryCount;
  1462. obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
  1463. }
  1464. } else /* return count for particular db */
  1465. {
  1466. /* select correct db file */
  1467. rc = _cl5GetDBFile(replica, &obj);
  1468. if (rc == CL5_SUCCESS) {
  1469. file = (CL5DBFile *)object_get_data(obj);
  1470. PR_ASSERT(file);
  1471. count = file->entryCount;
  1472. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1473. "cl5GetOperationCount - Found DB object %p\n", obj);
  1474. object_release(obj);
  1475. } else {
  1476. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1477. "cl5GetOperationCount - Could not get DB object for replica\n");
  1478. count = 0;
  1479. }
  1480. }
  1481. _cl5RemoveThread();
  1482. return count;
  1483. }
  1484. /***** Helper Functions *****/
  1485. /* this call happens under state lock */
  1486. static int
  1487. _cl5Open(const char *dir, const CL5DBConfig *config, CL5OpenMode openMode)
  1488. {
  1489. int rc;
  1490. PR_ASSERT(dir);
  1491. /* setup db configuration parameters */
  1492. if (config) {
  1493. _cl5SetDBConfig(config);
  1494. } else {
  1495. _cl5SetDefaultDBConfig();
  1496. }
  1497. /* initialize trimming */
  1498. rc = _cl5TrimInit();
  1499. if (rc != CL5_SUCCESS) {
  1500. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1501. "_cl5Open - Failed to initialize trimming\n");
  1502. goto done;
  1503. }
  1504. /* create the changelog directory if it does not exist */
  1505. rc = cl5CreateDirIfNeeded(dir);
  1506. if (rc != CL5_SUCCESS) {
  1507. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1508. "_cl5Open - Failed to create changelog directory (%s)\n", dir);
  1509. goto done;
  1510. }
  1511. if (s_cl5Desc.dbDir) {
  1512. slapi_ch_free_string(&s_cl5Desc.dbDir);
  1513. }
  1514. s_cl5Desc.dbDir = slapi_ch_strdup(dir);
  1515. /* check database version */
  1516. rc = _cl5CheckDBVersion();
  1517. if (rc != CL5_SUCCESS) {
  1518. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5Open - Invalid db version\n");
  1519. goto done;
  1520. }
  1521. s_cl5Desc.dbOpenMode = openMode;
  1522. /* initialize db environment */
  1523. rc = _cl5AppInit();
  1524. if (rc != CL5_SUCCESS) {
  1525. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1526. "_cl5Open - Failed to initialize db environment\n");
  1527. goto done;
  1528. }
  1529. /* init the clcache */
  1530. if ((clcache_init(&s_cl5Desc.dbEnv) != 0)) {
  1531. rc = CL5_SYSTEM_ERROR;
  1532. goto done;
  1533. }
  1534. /* open database files */
  1535. rc = _cl5DBOpen();
  1536. if (rc != CL5_SUCCESS) {
  1537. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1538. "_cl5Open - Failed to open changelog database\n");
  1539. goto done;
  1540. }
  1541. done:;
  1542. if (rc != CL5_SUCCESS) {
  1543. _cl5Close();
  1544. }
  1545. return rc;
  1546. }
  1547. int
  1548. cl5CreateDirIfNeeded(const char *dirName)
  1549. {
  1550. int rc;
  1551. char buff[MAXPATHLEN + 1];
  1552. char *t;
  1553. PR_ASSERT(dirName);
  1554. rc = PR_Access(dirName, PR_ACCESS_EXISTS);
  1555. if (rc == PR_SUCCESS) {
  1556. return CL5_SUCCESS;
  1557. }
  1558. /* directory does not exist - try to create */
  1559. PL_strncpyz(buff, dirName, sizeof(buff) - 1);
  1560. t = strchr(buff, '/');
  1561. /* skip first slash */
  1562. if (t) {
  1563. t = strchr(t + 1, '/');
  1564. }
  1565. while (t) {
  1566. *t = '\0';
  1567. if (PR_Access(buff, PR_ACCESS_EXISTS) != PR_SUCCESS) {
  1568. rc = PR_MkDir(buff, DIR_CREATE_MODE);
  1569. if (rc != PR_SUCCESS) {
  1570. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1571. "cl5CreateDirIfNeeded - Failed to create dir (%s); NSPR error - %d\n",
  1572. dirName, PR_GetError());
  1573. return CL5_SYSTEM_ERROR;
  1574. }
  1575. }
  1576. *t++ = FILE_PATHSEP;
  1577. t = strchr(t, '/');
  1578. }
  1579. /* last piece */
  1580. rc = PR_MkDir(buff, DIR_CREATE_MODE);
  1581. if (rc != PR_SUCCESS) {
  1582. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1583. "cl5CreateDirIfNeeded - Failed to create dir; NSPR error - %d\n",
  1584. PR_GetError());
  1585. return CL5_SYSTEM_ERROR;
  1586. }
  1587. return CL5_SUCCESS;
  1588. }
  1589. static int
  1590. _cl5AppInit(void)
  1591. {
  1592. int rc = -1; /* initialize to failure */
  1593. DB_ENV *dbEnv = NULL;
  1594. size_t pagesize = 0;
  1595. int openflags = 0;
  1596. char *cookie = NULL;
  1597. Slapi_Backend *be = slapi_get_first_backend(&cookie);
  1598. while (be) {
  1599. rc = slapi_back_get_info(be, BACK_INFO_DBENV, (void **)&dbEnv);
  1600. if ((LDAP_SUCCESS == rc) && dbEnv) {
  1601. rc = slapi_back_get_info(be,
  1602. BACK_INFO_INDEXPAGESIZE, (void **)&pagesize);
  1603. if ((LDAP_SUCCESS == rc) && pagesize) {
  1604. rc = slapi_back_get_info(be,
  1605. BACK_INFO_DBENV_OPENFLAGS, (void **)&openflags);
  1606. if (LDAP_SUCCESS == rc) {
  1607. break; /* Successfully fetched */
  1608. }
  1609. }
  1610. }
  1611. be = slapi_get_next_backend(cookie);
  1612. }
  1613. slapi_ch_free((void **)&cookie);
  1614. if (rc == 0 && dbEnv && pagesize) {
  1615. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1616. "_cl5AppInit - Fetched backend dbEnv (%p)\n", dbEnv);
  1617. s_cl5Desc.dbEnv = dbEnv;
  1618. s_cl5Desc.dbEnvOpenFlags = openflags;
  1619. s_cl5Desc.dbConfig.pageSize = pagesize;
  1620. return CL5_SUCCESS;
  1621. } else {
  1622. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1623. "_cl5AppInit - Failed to fetch backend dbenv (%p) and/or "
  1624. "index page size (%lu)\n",
  1625. dbEnv, (long unsigned int)pagesize);
  1626. return CL5_DB_ERROR;
  1627. }
  1628. }
  1629. static int
  1630. _cl5DBOpen()
  1631. {
  1632. PRBool dbFile;
  1633. PRDir *dir;
  1634. PRDirEntry *entry = NULL;
  1635. int rc = -1; /* initialize to failure */
  1636. Object *replica;
  1637. int count = 0;
  1638. /* create lock that guarantees that each file is only added once to the list */
  1639. s_cl5Desc.fileLock = PR_NewLock();
  1640. /* loop over all db files and open them; file name format is cl5_<dbid>.<dbext> */
  1641. dir = PR_OpenDir(s_cl5Desc.dbDir);
  1642. if (dir == NULL) {
  1643. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1644. "_cl5DBOpen - Failed to open changelog dir; NSPR error - %d\n",
  1645. PR_GetError());
  1646. return CL5_SYSTEM_ERROR;
  1647. }
  1648. /* initialize set of db file objects */
  1649. s_cl5Desc.dbFiles = objset_new(NULL);
  1650. while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
  1651. if (NULL == entry->name) {
  1652. break;
  1653. }
  1654. dbFile = _cl5FileName2Replica(entry->name, &replica);
  1655. if (dbFile) /* this is db file, not a log or dbversion; those are just skipped */
  1656. {
  1657. /* we only open files for existing replicas */
  1658. if (replica) {
  1659. rc = _cl5DBOpenFile(replica, NULL /* file object */,
  1660. PR_FALSE /* check for duplicates */);
  1661. if (rc != CL5_SUCCESS) {
  1662. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen - "
  1663. "Error opening file %s\n",
  1664. entry->name);
  1665. return rc;
  1666. }
  1667. object_release(replica);
  1668. count++;
  1669. } else /* there is no matching replica for the file - remove */
  1670. {
  1671. char fullpathname[MAXPATHLEN];
  1672. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen - "
  1673. "File %s has no matching replica; removing\n",
  1674. entry->name);
  1675. PR_snprintf(fullpathname, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name);
  1676. rc = s_cl5Desc.dbEnv->dbremove(s_cl5Desc.dbEnv,
  1677. 0, fullpathname, 0,
  1678. DEFAULT_DB_ENV_OP_FLAGS);
  1679. if (rc != 0) {
  1680. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1681. "_cl5DBOpen - Failed to remove (%s) file; "
  1682. "libdb error - %d (%s)\n",
  1683. fullpathname, rc, db_strerror(rc));
  1684. if (PR_Delete(fullpathname) != PR_SUCCESS) {
  1685. PRErrorCode prerr = PR_GetError();
  1686. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1687. "_cl5DBOpen - Failed to remove (%s) file; "
  1688. "nspr error - %d (%s)\n",
  1689. fullpathname, prerr, slapd_pr_strerror(prerr));
  1690. }
  1691. }
  1692. }
  1693. }
  1694. }
  1695. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen - "
  1696. "Opened %d existing databases in %s\n",
  1697. count, s_cl5Desc.dbDir);
  1698. PR_CloseDir(dir);
  1699. return CL5_SUCCESS;
  1700. }
  1701. /* this function assumes that the entry was validated
  1702. using IsValidOperation
  1703. Data in db format:
  1704. ------------------
  1705. <1 byte version><1 byte change_type><sizeof time_t time><null terminated csn>
  1706. <null terminated uniqueid><null terminated targetdn>
  1707. [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....]
  1708. mod format:
  1709. -----------
  1710. <1 byte modop><null terminated attr name><4 byte value count>
  1711. <4 byte value size><value1><4 byte value size><value2>
  1712. */
  1713. static int
  1714. _cl5Entry2DBData(const CL5Entry *entry, char **data, PRUint32 *len)
  1715. {
  1716. int size = 1 /* version */ + 1 /* operation type */ + sizeof(time_t);
  1717. char *pos;
  1718. PRUint32 t;
  1719. slapi_operation_parameters *op;
  1720. LDAPMod **add_mods = NULL;
  1721. char *rawDN = NULL;
  1722. char s[CSN_STRSIZE];
  1723. PR_ASSERT(entry && entry->op && data && len);
  1724. op = entry->op;
  1725. PR_ASSERT(op->target_address.uniqueid);
  1726. /* compute size of the buffer needed to hold the data */
  1727. size += CSN_STRSIZE;
  1728. size += strlen(op->target_address.uniqueid) + 1;
  1729. switch (op->operation_type) {
  1730. case SLAPI_OPERATION_ADD:
  1731. if (op->p.p_add.parentuniqueid)
  1732. size += strlen(op->p.p_add.parentuniqueid) + 1;
  1733. else
  1734. size++; /* we just store NULL char */
  1735. slapi_entry2mods(op->p.p_add.target_entry, &rawDN /* dn */, &add_mods);
  1736. size += strlen(rawDN) + 1;
  1737. /* Need larger buffer for the encrypted changelog */
  1738. if (s_cl5Desc.clcrypt_handle) {
  1739. size += (_cl5GetModsSize(add_mods) * (1 + BACK_CRYPT_OUTBUFF_EXTLEN));
  1740. } else {
  1741. size += _cl5GetModsSize(add_mods);
  1742. }
  1743. break;
  1744. case SLAPI_OPERATION_MODIFY:
  1745. size += REPL_GET_DN_LEN(&op->target_address) + 1;
  1746. /* Need larger buffer for the encrypted changelog */
  1747. if (s_cl5Desc.clcrypt_handle) {
  1748. size += (_cl5GetModsSize(op->p.p_modify.modify_mods) * (1 + BACK_CRYPT_OUTBUFF_EXTLEN));
  1749. } else {
  1750. size += _cl5GetModsSize(op->p.p_modify.modify_mods);
  1751. }
  1752. break;
  1753. case SLAPI_OPERATION_MODRDN:
  1754. size += REPL_GET_DN_LEN(&op->target_address) + 1;
  1755. /* 1 for deleteoldrdn */
  1756. size += strlen(op->p.p_modrdn.modrdn_newrdn) + 2;
  1757. if (REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address))
  1758. size += REPL_GET_DN_LEN(&op->p.p_modrdn.modrdn_newsuperior_address) + 1;
  1759. else
  1760. size++; /* for NULL char */
  1761. if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
  1762. size += strlen(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + 1;
  1763. else
  1764. size++; /* for NULL char */
  1765. /* Need larger buffer for the encrypted changelog */
  1766. if (s_cl5Desc.clcrypt_handle) {
  1767. size += (_cl5GetModsSize(op->p.p_modrdn.modrdn_mods) * (1 + BACK_CRYPT_OUTBUFF_EXTLEN));
  1768. } else {
  1769. size += _cl5GetModsSize(op->p.p_modrdn.modrdn_mods);
  1770. }
  1771. break;
  1772. case SLAPI_OPERATION_DELETE:
  1773. size += REPL_GET_DN_LEN(&op->target_address) + 1;
  1774. break;
  1775. }
  1776. /* allocate data buffer */
  1777. (*data) = slapi_ch_malloc(size);
  1778. if ((*data) == NULL) {
  1779. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1780. "_cl5Entry2DBData - Failed to allocate data buffer\n");
  1781. return CL5_MEMORY_ERROR;
  1782. }
  1783. /* fill in the data buffer */
  1784. pos = *data;
  1785. /* write a byte of version */
  1786. (*pos) = V_5;
  1787. pos++;
  1788. /* write change type */
  1789. (*pos) = (unsigned char)op->operation_type;
  1790. pos++;
  1791. /* write time */
  1792. t = PR_htonl((PRUint32)entry->time);
  1793. memcpy(pos, &t, sizeof(t));
  1794. pos += sizeof(t);
  1795. /* write csn */
  1796. _cl5WriteString(csn_as_string(op->csn, PR_FALSE, s), &pos);
  1797. /* write UniqueID */
  1798. _cl5WriteString(op->target_address.uniqueid, &pos);
  1799. /* figure out what else we need to write depending on the operation type */
  1800. switch (op->operation_type) {
  1801. case SLAPI_OPERATION_ADD:
  1802. _cl5WriteString(op->p.p_add.parentuniqueid, &pos);
  1803. _cl5WriteString(rawDN, &pos);
  1804. _cl5WriteMods(add_mods, &pos);
  1805. slapi_ch_free((void **)&rawDN);
  1806. ldap_mods_free(add_mods, 1);
  1807. break;
  1808. case SLAPI_OPERATION_MODIFY:
  1809. _cl5WriteString(REPL_GET_DN(&op->target_address), &pos);
  1810. _cl5WriteMods(op->p.p_modify.modify_mods, &pos);
  1811. break;
  1812. case SLAPI_OPERATION_MODRDN:
  1813. _cl5WriteString(REPL_GET_DN(&op->target_address), &pos);
  1814. _cl5WriteString(op->p.p_modrdn.modrdn_newrdn, &pos);
  1815. *pos = (PRUint8)op->p.p_modrdn.modrdn_deloldrdn;
  1816. pos++;
  1817. _cl5WriteString(REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address), &pos);
  1818. _cl5WriteString(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos);
  1819. _cl5WriteMods(op->p.p_modrdn.modrdn_mods, &pos);
  1820. break;
  1821. case SLAPI_OPERATION_DELETE:
  1822. _cl5WriteString(REPL_GET_DN(&op->target_address), &pos);
  1823. break;
  1824. }
  1825. /* (*len) != size in case encrypted */
  1826. (*len) = pos - *data;
  1827. if (*len > size) {
  1828. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1829. "_cl5Entry2DBData - real len %d > estimated size %d\n",
  1830. *len, size);
  1831. return CL5_MEMORY_ERROR;
  1832. }
  1833. return CL5_SUCCESS;
  1834. }
  1835. /*
  1836. Data in db format:
  1837. ------------------
  1838. <1 byte version><1 byte change_type><sizeof time_t time><null terminated dbid>
  1839. <null terminated csn><null terminated uniqueid><null terminated targetdn>
  1840. [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....]
  1841. mod format:
  1842. -----------
  1843. <1 byte modop><null terminated attr name><4 byte value count>
  1844. <4 byte value size><value1><4 byte value size><value2>
  1845. */
  1846. int
  1847. cl5DBData2Entry(const char *data, PRUint32 len __attribute__((unused)), CL5Entry *entry)
  1848. {
  1849. int rc;
  1850. PRUint8 version;
  1851. char *pos = (char *)data;
  1852. char *strCSN;
  1853. PRUint32 thetime;
  1854. slapi_operation_parameters *op;
  1855. LDAPMod **add_mods;
  1856. char *rawDN = NULL;
  1857. char s[CSN_STRSIZE];
  1858. PR_ASSERT(data && entry && entry->op);
  1859. /* ONREPL - check that we do not go beyond the end of the buffer */
  1860. /* read byte of version */
  1861. version = (PRUint8)(*pos);
  1862. if (version != V_5) {
  1863. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1864. "cl5DBData2Entry - Invalid data version\n");
  1865. return CL5_BAD_FORMAT;
  1866. }
  1867. op = entry->op;
  1868. pos += sizeof(version);
  1869. /* read change type */
  1870. op->operation_type = (PRUint8)(*pos);
  1871. pos++;
  1872. /* need to do the copy first, to skirt around alignment problems on
  1873. certain architectures */
  1874. memcpy((char *)&thetime, pos, sizeof(thetime));
  1875. entry->time = (time_t)PR_ntohl(thetime);
  1876. pos += sizeof(thetime);
  1877. /* read csn */
  1878. _cl5ReadString(&strCSN, &pos);
  1879. if (op->csn == NULL || strcmp(strCSN, csn_as_string(op->csn, PR_FALSE, s)) != 0) {
  1880. op->csn = csn_new_by_string(strCSN);
  1881. }
  1882. slapi_ch_free((void **)&strCSN);
  1883. /* read UniqueID */
  1884. _cl5ReadString(&op->target_address.uniqueid, &pos);
  1885. /* figure out what else we need to read depending on the operation type */
  1886. switch (op->operation_type) {
  1887. case SLAPI_OPERATION_ADD:
  1888. _cl5ReadString(&op->p.p_add.parentuniqueid, &pos);
  1889. /* richm: need to free parentuniqueid */
  1890. _cl5ReadString(&rawDN, &pos);
  1891. op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
  1892. /* convert mods to entry */
  1893. rc = _cl5ReadMods(&add_mods, &pos);
  1894. slapi_mods2entry(&(op->p.p_add.target_entry), rawDN, add_mods);
  1895. ldap_mods_free(add_mods, 1);
  1896. break;
  1897. case SLAPI_OPERATION_MODIFY:
  1898. _cl5ReadString(&rawDN, &pos);
  1899. op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
  1900. rc = _cl5ReadMods(&op->p.p_modify.modify_mods, &pos);
  1901. break;
  1902. case SLAPI_OPERATION_MODRDN:
  1903. _cl5ReadString(&rawDN, &pos);
  1904. op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
  1905. _cl5ReadString(&op->p.p_modrdn.modrdn_newrdn, &pos);
  1906. op->p.p_modrdn.modrdn_deloldrdn = *pos;
  1907. pos++;
  1908. _cl5ReadString(&rawDN, &pos);
  1909. op->p.p_modrdn.modrdn_newsuperior_address.sdn = slapi_sdn_new_dn_passin(rawDN);
  1910. _cl5ReadString(&op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos);
  1911. rc = _cl5ReadMods(&op->p.p_modrdn.modrdn_mods, &pos);
  1912. break;
  1913. case SLAPI_OPERATION_DELETE:
  1914. _cl5ReadString(&rawDN, &pos);
  1915. op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
  1916. rc = CL5_SUCCESS;
  1917. break;
  1918. default:
  1919. rc = CL5_BAD_FORMAT;
  1920. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1921. "cl5DBData2Entry - Failed to format entry\n");
  1922. break;
  1923. }
  1924. return rc;
  1925. }
  1926. /* thread management functions */
  1927. static int
  1928. _cl5DispatchDBThreads(void)
  1929. {
  1930. PRThread *pth = NULL;
  1931. pth = PR_CreateThread(PR_USER_THREAD, (VFP)(void *)_cl5TrimMain,
  1932. NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  1933. PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
  1934. if (NULL == pth) {
  1935. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  1936. "_cl5DispatchDBThreads - Failed to create trimming thread"
  1937. "; NSPR error - %d\n",
  1938. PR_GetError());
  1939. return CL5_SYSTEM_ERROR;
  1940. }
  1941. return CL5_SUCCESS;
  1942. }
  1943. static int
  1944. _cl5AddThread(void)
  1945. {
  1946. /* lock the state lock so that nobody can change the state
  1947. while backup is in progress
  1948. */
  1949. slapi_rwlock_rdlock(s_cl5Desc.stLock);
  1950. /* open changelog if it is not already open */
  1951. if (s_cl5Desc.dbState != CL5_STATE_OPEN) {
  1952. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1953. "_cl5AddThread - Invalid changelog state - %d\n", s_cl5Desc.dbState);
  1954. slapi_rwlock_unlock(s_cl5Desc.stLock);
  1955. return CL5_BAD_STATE;
  1956. }
  1957. slapi_rwlock_unlock(s_cl5Desc.stLock);
  1958. /* increment global thread count to make sure that changelog does not close while
  1959. backup is in progress */
  1960. PR_AtomicIncrement(&s_cl5Desc.threadCount);
  1961. return CL5_SUCCESS;
  1962. }
  1963. static void
  1964. _cl5RemoveThread(void)
  1965. {
  1966. PR_ASSERT(s_cl5Desc.threadCount > 0);
  1967. PR_AtomicDecrement(&s_cl5Desc.threadCount);
  1968. }
  1969. /* data conversion functions */
  1970. static void
  1971. _cl5WriteString(const char *str, char **buff)
  1972. {
  1973. if (str) {
  1974. strcpy(*buff, str);
  1975. (*buff) += strlen(str) + 1;
  1976. } else /* just write NULL char */
  1977. {
  1978. (**buff) = '\0';
  1979. (*buff)++;
  1980. }
  1981. }
  1982. static void
  1983. _cl5ReadString(char **str, char **buff)
  1984. {
  1985. if (str) {
  1986. int len = strlen(*buff);
  1987. if (len) {
  1988. *str = slapi_ch_strdup(*buff);
  1989. (*buff) += len + 1;
  1990. } else /* just null char - skip it */
  1991. {
  1992. *str = NULL;
  1993. (*buff)++;
  1994. }
  1995. } else /* just skip this string */
  1996. {
  1997. (*buff) += strlen(*buff) + 1;
  1998. }
  1999. }
  2000. /* mods format:
  2001. -----------
  2002. <4 byte mods count><mod1><mod2>...
  2003. mod format:
  2004. -----------
  2005. <1 byte modop><null terminated attr name><4 byte count>
  2006. <4 byte size><value1><4 byte size><value2>...
  2007. */
  2008. static void
  2009. _cl5WriteMods(LDAPMod **mods, char **buff)
  2010. {
  2011. PRInt32 i;
  2012. char *mod_start;
  2013. PRInt32 count = 0;
  2014. if (mods == NULL)
  2015. return;
  2016. /* skip mods count */
  2017. mod_start = (*buff) + sizeof(count);
  2018. /* write mods*/
  2019. for (i = 0; mods[i]; i++) {
  2020. if (0 <= _cl5WriteMod(mods[i], &mod_start)) {
  2021. count++;
  2022. }
  2023. }
  2024. count = PR_htonl(count);
  2025. memcpy(*buff, &count, sizeof(count));
  2026. (*buff) = mod_start;
  2027. }
  2028. /*
  2029. * return values:
  2030. * positive: no need to encrypt && succeeded to write a mod
  2031. * 0: succeeded to encrypt && write a mod
  2032. * netative: failed to encrypt && no write to the changelog
  2033. */
  2034. static int
  2035. _cl5WriteMod(LDAPMod *mod, char **buff)
  2036. {
  2037. char *orig_pos;
  2038. char *pos;
  2039. PRInt32 count;
  2040. struct berval *bv;
  2041. struct berval *encbv;
  2042. struct berval *bv_to_use;
  2043. Slapi_Mod smod;
  2044. int rc = -1;
  2045. if (NULL == mod) {
  2046. return rc;
  2047. }
  2048. if (SLAPD_UNHASHED_PW_NOLOG == slapi_config_get_unhashed_pw_switch()) {
  2049. if (0 == strcasecmp(mod->mod_type, PSEUDO_ATTR_UNHASHEDUSERPASSWORD)) {
  2050. /* If nsslapd-unhashed-pw-switch == nolog, skip writing it to cl. */
  2051. return rc;
  2052. }
  2053. }
  2054. slapi_mod_init_byref(&smod, mod);
  2055. orig_pos = pos = *buff;
  2056. /* write mod op */
  2057. *pos = (PRUint8)slapi_mod_get_operation(&smod);
  2058. pos++;
  2059. /* write attribute name */
  2060. _cl5WriteString(slapi_mod_get_type(&smod), &pos);
  2061. /* write value count */
  2062. count = PR_htonl(slapi_mod_get_num_values(&smod));
  2063. memcpy(pos, &count, sizeof(count));
  2064. pos += sizeof(PRInt32);
  2065. /* if the mod has no values, eg delete attr or replace attr without values
  2066. * do not reset buffer
  2067. */
  2068. rc = 0;
  2069. bv = slapi_mod_get_first_value(&smod);
  2070. while (bv) {
  2071. encbv = NULL;
  2072. rc = clcrypt_encrypt_value(s_cl5Desc.clcrypt_handle,
  2073. bv, &encbv);
  2074. if (rc > 0) {
  2075. /* no encryption needed. use the original bv */
  2076. bv_to_use = bv;
  2077. } else if ((0 == rc) && encbv) {
  2078. /* successfully encrypted. use the encrypted bv */
  2079. bv_to_use = encbv;
  2080. } else { /* failed */
  2081. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2082. "_cl5WriteMod - Encrypting \"%s: %s\" failed\n",
  2083. slapi_mod_get_type(&smod), bv->bv_val);
  2084. bv_to_use = NULL;
  2085. rc = -1;
  2086. break;
  2087. }
  2088. if (bv_to_use) {
  2089. _cl5WriteBerval(bv_to_use, &pos);
  2090. }
  2091. slapi_ch_bvfree(&encbv);
  2092. bv = slapi_mod_get_next_value(&smod);
  2093. }
  2094. if (rc < 0) {
  2095. (*buff) = orig_pos;
  2096. } else {
  2097. (*buff) = pos;
  2098. }
  2099. slapi_mod_done(&smod);
  2100. return rc;
  2101. }
  2102. /* mods format:
  2103. -----------
  2104. <4 byte mods count><mod1><mod2>...
  2105. mod format:
  2106. -----------
  2107. <1 byte modop><null terminated attr name><4 byte count>
  2108. {<4 byte size><value1><4 byte size><value2>... ||
  2109. <null terminated str1> <null terminated str2>...}
  2110. */
  2111. static int
  2112. _cl5ReadMods(LDAPMod ***mods, char **buff)
  2113. {
  2114. char *pos = *buff;
  2115. int i;
  2116. int rc;
  2117. PRInt32 mod_count;
  2118. Slapi_Mods smods;
  2119. Slapi_Mod smod;
  2120. /* need to copy first, to skirt around alignment problems on certain
  2121. architectures */
  2122. memcpy((char *)&mod_count, *buff, sizeof(mod_count));
  2123. mod_count = PR_ntohl(mod_count);
  2124. pos += sizeof(mod_count);
  2125. slapi_mods_init(&smods, mod_count);
  2126. for (i = 0; i < mod_count; i++) {
  2127. rc = _cl5ReadMod(&smod, &pos);
  2128. if (rc != CL5_SUCCESS) {
  2129. slapi_mods_done(&smods);
  2130. return rc;
  2131. }
  2132. slapi_mods_add_smod(&smods, &smod);
  2133. }
  2134. *buff = pos;
  2135. *mods = slapi_mods_get_ldapmods_passout(&smods);
  2136. slapi_mods_done(&smods);
  2137. return CL5_SUCCESS;
  2138. }
  2139. static int
  2140. _cl5ReadMod(Slapi_Mod *smod, char **buff)
  2141. {
  2142. char *pos = *buff;
  2143. int i;
  2144. PRInt32 val_count;
  2145. char *type;
  2146. int op;
  2147. struct berval bv;
  2148. struct berval *decbv;
  2149. struct berval *bv_to_use;
  2150. int rc = 0;
  2151. op = (*pos) & 0x000000FF;
  2152. pos++;
  2153. _cl5ReadString(&type, &pos);
  2154. /* need to do the copy first, to skirt around alignment problems on
  2155. certain architectures */
  2156. memcpy((char *)&val_count, pos, sizeof(val_count));
  2157. val_count = PR_ntohl(val_count);
  2158. pos += sizeof(PRInt32);
  2159. slapi_mod_init(smod, val_count);
  2160. slapi_mod_set_operation(smod, op | LDAP_MOD_BVALUES);
  2161. slapi_mod_set_type(smod, type);
  2162. slapi_ch_free((void **)&type);
  2163. for (i = 0; i < val_count; i++) {
  2164. _cl5ReadBerval(&bv, &pos);
  2165. decbv = NULL;
  2166. rc = 0;
  2167. rc = clcrypt_decrypt_value(s_cl5Desc.clcrypt_handle,
  2168. &bv, &decbv);
  2169. if (rc > 0) {
  2170. /* not encrypted. use the original bv */
  2171. bv_to_use = &bv;
  2172. } else if ((0 == rc) && decbv) {
  2173. /* successfully decrypted. use the decrypted bv */
  2174. bv_to_use = decbv;
  2175. } else { /* failed */
  2176. char encstr[128];
  2177. char *encend = encstr + 128;
  2178. char *ptr;
  2179. int i;
  2180. for (i = 0, ptr = encstr; (i < bv.bv_len) && (ptr < encend - 4);
  2181. i++, ptr += 3) {
  2182. sprintf(ptr, "%x", 0xff & bv.bv_val[i]);
  2183. }
  2184. if (ptr >= encend - 4) {
  2185. sprintf(ptr, "...");
  2186. ptr += 3;
  2187. }
  2188. *ptr = '\0';
  2189. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2190. "_cl5ReadMod - Decrypting \"%s: %s\" failed\n",
  2191. slapi_mod_get_type(smod), encstr);
  2192. bv_to_use = NULL;
  2193. }
  2194. if (bv_to_use) {
  2195. slapi_mod_add_value(smod, bv_to_use);
  2196. }
  2197. slapi_ch_bvfree(&decbv);
  2198. slapi_ch_free((void **)&bv.bv_val);
  2199. }
  2200. (*buff) = pos;
  2201. return CL5_SUCCESS;
  2202. }
  2203. static int
  2204. _cl5GetModsSize(LDAPMod **mods)
  2205. {
  2206. int size;
  2207. int i;
  2208. if (mods == NULL)
  2209. return 0;
  2210. size = sizeof(PRInt32);
  2211. for (i = 0; mods[i]; i++) {
  2212. size += _cl5GetModSize(mods[i]);
  2213. }
  2214. return size;
  2215. }
  2216. static int
  2217. _cl5GetModSize(LDAPMod *mod)
  2218. {
  2219. int size;
  2220. int i;
  2221. size = 1 + strlen(mod->mod_type) + 1 + sizeof(mod->mod_op);
  2222. i = 0;
  2223. if (mod->mod_op & LDAP_MOD_BVALUES) /* values are in binary form */
  2224. {
  2225. while (mod->mod_bvalues != NULL && mod->mod_bvalues[i] != NULL) {
  2226. size += (PRInt32)mod->mod_bvalues[i]->bv_len + sizeof(PRInt32);
  2227. i++;
  2228. }
  2229. } else /* string data */
  2230. {
  2231. PR_ASSERT(0); /* ggood string values should never be used in the server */
  2232. }
  2233. return size;
  2234. }
  2235. static void
  2236. _cl5ReadBerval(struct berval *bv, char **buff)
  2237. {
  2238. PRUint32 length = 0;
  2239. PRUint32 net_length = 0;
  2240. PR_ASSERT(bv && buff);
  2241. /***PINAKI need to do the copy first, to skirt around alignment problems on
  2242. certain architectures */
  2243. /* DBDB : struct berval.bv_len is defined as unsigned long
  2244. * But code here expects it to be 32-bits in size.
  2245. * On 64-bit machines, this is not the case.
  2246. * I changed the code to consistently use 32-bit (4-byte)
  2247. * values on the encoded side. This means that it's
  2248. * possible to generate a huge berval that will not
  2249. * be encoded properly. However, this seems unlikely
  2250. * to happen in reality, and I felt that retaining the
  2251. * old on-disk format for the changely in the 64-bit
  2252. * version of the server was important.
  2253. */
  2254. memcpy((char *)&net_length, *buff, sizeof(net_length));
  2255. length = PR_ntohl(net_length);
  2256. *buff += sizeof(net_length);
  2257. bv->bv_len = length;
  2258. if (bv->bv_len > 0) {
  2259. bv->bv_val = slapi_ch_malloc(bv->bv_len);
  2260. memcpy(bv->bv_val, *buff, bv->bv_len);
  2261. *buff += bv->bv_len;
  2262. } else {
  2263. bv->bv_val = NULL;
  2264. }
  2265. }
  2266. static void
  2267. _cl5WriteBerval(struct berval *bv, char **buff)
  2268. {
  2269. PRUint32 length = 0;
  2270. PRUint32 net_length = 0;
  2271. length = (PRUint32)bv->bv_len;
  2272. net_length = PR_htonl(length);
  2273. memcpy(*buff, &net_length, sizeof(net_length));
  2274. *buff += sizeof(net_length);
  2275. memcpy(*buff, bv->bv_val, length);
  2276. *buff += length;
  2277. }
  2278. /* data format: <value count> <value size> <value> <value size> <value> ..... */
  2279. static int
  2280. _cl5ReadBervals(struct berval ***bv, char **buff, unsigned int size __attribute__((unused)))
  2281. {
  2282. PRInt32 count;
  2283. int i;
  2284. char *pos;
  2285. PR_ASSERT(bv && buff);
  2286. /* ONREPL - need to check that we don't go beyond the end of the buffer */
  2287. pos = *buff;
  2288. memcpy((char *)&count, pos, sizeof(count));
  2289. count = PR_htonl(count);
  2290. pos += sizeof(count);
  2291. /* allocate bervals */
  2292. *bv = (struct berval **)slapi_ch_malloc((count + 1) * sizeof(struct berval *));
  2293. if (*bv == NULL) {
  2294. return CL5_MEMORY_ERROR;
  2295. }
  2296. for (i = 0; i < count; i++) {
  2297. (*bv)[i] = (struct berval *)slapi_ch_malloc(sizeof(struct berval));
  2298. if ((*bv)[i] == NULL) {
  2299. ber_bvecfree(*bv);
  2300. return CL5_MEMORY_ERROR;
  2301. }
  2302. _cl5ReadBerval((*bv)[i], &pos);
  2303. }
  2304. (*bv)[count] = NULL;
  2305. *buff = pos;
  2306. return CL5_SUCCESS;
  2307. }
  2308. /* data format: <value count> <value size> <value> <value size> <value> ..... */
  2309. static int
  2310. _cl5WriteBervals(struct berval **bv, char **buff, u_int32_t *size)
  2311. {
  2312. PRInt32 count, net_count;
  2313. char *pos;
  2314. int i;
  2315. PR_ASSERT(bv && buff && size);
  2316. /* compute number of values and size of the buffer to hold them */
  2317. *size = sizeof(count);
  2318. for (count = 0; bv[count]; count++) {
  2319. *size += (u_int32_t)(sizeof(PRInt32) + (PRInt32)bv[count]->bv_len);
  2320. }
  2321. /* allocate buffer */
  2322. *buff = (char *)slapi_ch_malloc(*size);
  2323. if (*buff == NULL) {
  2324. *size = 0;
  2325. return CL5_MEMORY_ERROR;
  2326. }
  2327. /* fill the buffer */
  2328. pos = *buff;
  2329. net_count = PR_htonl(count);
  2330. memcpy(pos, &net_count, sizeof(net_count));
  2331. pos += sizeof(net_count);
  2332. for (i = 0; i < count; i++) {
  2333. _cl5WriteBerval(bv[i], &pos);
  2334. }
  2335. return CL5_SUCCESS;
  2336. }
  2337. /* upgrade from db33 to db41
  2338. * 1. Run recovery on the database environment using the DB_ENV->open method
  2339. * 2. Remove any Berkeley DB environment using the DB_ENV->remove method
  2340. * 3. Remove any Berkeley DB transaction log files
  2341. * 4. extention .db3 -> .db4
  2342. */
  2343. static int
  2344. _cl5UpgradeMajor(char *fromVersion, char *toVersion)
  2345. {
  2346. PRDir *dir = NULL;
  2347. PRDirEntry *entry = NULL;
  2348. DB *thisdb = NULL;
  2349. CL5OpenMode backup;
  2350. int rc = 0;
  2351. backup = s_cl5Desc.dbOpenMode;
  2352. s_cl5Desc.dbOpenMode = CL5_OPEN_CLEAN_RECOVER;
  2353. /* CL5_OPEN_CLEAN_RECOVER does 1 and 2 */
  2354. rc = _cl5AppInit();
  2355. if (rc != CL5_SUCCESS) {
  2356. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2357. "_cl5UpgradeMajor - Failed to open the db env\n");
  2358. return rc;
  2359. }
  2360. s_cl5Desc.dbOpenMode = backup;
  2361. dir = PR_OpenDir(s_cl5Desc.dbDir);
  2362. if (dir == NULL) {
  2363. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2364. "_cl5UpgradeMajor - Failed to open changelog dir %s; NSPR error - %d\n",
  2365. s_cl5Desc.dbDir, PR_GetError());
  2366. goto out;
  2367. }
  2368. while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
  2369. if (NULL == entry->name) {
  2370. break;
  2371. }
  2372. if (_cl5FileEndsWith(entry->name, DB_EXTENSION_DB3) ||
  2373. _cl5FileEndsWith(entry->name, DB_EXTENSION_DB4)) {
  2374. char oName[MAXPATHLEN + 1];
  2375. char nName[MAXPATHLEN + 1];
  2376. char *p = NULL;
  2377. char c;
  2378. int baselen = 0;
  2379. PR_snprintf(oName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name);
  2380. p = strstr(oName, DB_EXTENSION_DB3);
  2381. if (NULL == p) {
  2382. p = strstr(oName, DB_EXTENSION_DB4);
  2383. if (NULL == p) {
  2384. continue;
  2385. }
  2386. }
  2387. /* db->rename closes DB; need to create every time */
  2388. rc = db_create(&thisdb, s_cl5Desc.dbEnv, 0);
  2389. if (0 != rc) {
  2390. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2391. "_cl5UpgradeMajor - Failed to get db handle\n");
  2392. goto out;
  2393. }
  2394. baselen = p - oName;
  2395. c = *p;
  2396. *p = '\0';
  2397. PR_snprintf(nName, MAXPATHLEN + 1, "%s", oName);
  2398. PR_snprintf(nName + baselen, MAXPATHLEN + 1 - baselen, "%s", DB_EXTENSION);
  2399. *p = c;
  2400. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2401. "_cl5UpgradeMajor - Renaming %s to %s\n", oName, nName);
  2402. rc = thisdb->rename(thisdb, (const char *)oName, NULL /* subdb */,
  2403. (const char *)nName, 0);
  2404. if (rc != PR_SUCCESS) {
  2405. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2406. "_cl5UpgradeMajor - Failed to rename file (%s -> %s); "
  2407. "db error - %d %s\n",
  2408. oName, nName, rc, db_strerror(rc));
  2409. break;
  2410. }
  2411. }
  2412. }
  2413. /* update the version file */
  2414. _cl5WriteDBVersion();
  2415. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name_cl,
  2416. "_cl5UpgradeMajor - Upgrading from %s to %s is successfully done (%s)\n",
  2417. fromVersion, toVersion, s_cl5Desc.dbDir);
  2418. out:
  2419. if (NULL != dir) {
  2420. PR_CloseDir(dir);
  2421. }
  2422. return rc;
  2423. }
  2424. /* upgrade from db41 -> db42 -> db43 -> db44 -> db45
  2425. * 1. Run recovery on the database environment using the DB_ENV->open method
  2426. * 2. Remove any Berkeley DB environment using the DB_ENV->remove method
  2427. * 3. Remove any Berkeley DB transaction log files
  2428. */
  2429. static int
  2430. _cl5UpgradeMinor(char *fromVersion, char *toVersion)
  2431. {
  2432. CL5OpenMode backup;
  2433. int rc = 0;
  2434. backup = s_cl5Desc.dbOpenMode;
  2435. s_cl5Desc.dbOpenMode = CL5_OPEN_CLEAN_RECOVER;
  2436. /* CL5_OPEN_CLEAN_RECOVER does 1 and 2 */
  2437. rc = _cl5AppInit();
  2438. if (rc != CL5_SUCCESS) {
  2439. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2440. "_cl5UpgradeMinor - Failed to open the db env\n");
  2441. return rc;
  2442. }
  2443. s_cl5Desc.dbOpenMode = backup;
  2444. /* update the version file */
  2445. _cl5WriteDBVersion();
  2446. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name_cl,
  2447. "_cl5UpgradeMinor - Upgrading from %s to %s is successfully done (%s)\n",
  2448. fromVersion, toVersion, s_cl5Desc.dbDir);
  2449. return rc;
  2450. }
  2451. static int
  2452. _cl5CheckDBVersion(void)
  2453. {
  2454. char clVersion[VERSION_SIZE + 1];
  2455. char dbVersion[VERSION_SIZE + 1];
  2456. int rc;
  2457. if (!cl5Exist(s_cl5Desc.dbDir)) {
  2458. /* this is new changelog - write DB version and guardian file */
  2459. rc = _cl5WriteDBVersion();
  2460. } else {
  2461. char *versionp = NULL;
  2462. char *versionendp = NULL;
  2463. char *dotp = NULL;
  2464. int dbmajor = 0;
  2465. int dbminor = 0;
  2466. PR_snprintf(clVersion, VERSION_SIZE, "%s/%d.%d/%s",
  2467. BDB_IMPL, DB_VERSION_MAJOR, DB_VERSION_MINOR, BDB_REPLPLUGIN);
  2468. rc = _cl5ReadDBVersion(s_cl5Desc.dbDir, dbVersion, sizeof(dbVersion));
  2469. if (rc != CL5_SUCCESS) {
  2470. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2471. "_cl5CheckDBVersion - Invalid dbversion\n");
  2472. rc = CL5_BAD_DBVERSION;
  2473. goto bailout;
  2474. }
  2475. versionendp = dbVersion + strlen(dbVersion);
  2476. /* get the version number */
  2477. /* old DBVERSION string: CL5_TYPE/REPL_PLUGIN_NAME/#.# */
  2478. if (PL_strncmp(dbVersion, CL5_TYPE, strlen(CL5_TYPE)) == 0) {
  2479. versionp = strrchr(dbVersion, '/');
  2480. }
  2481. /* new DBVERSION string: bdb/#.#/libreplication-plugin */
  2482. else if (PL_strncmp(dbVersion, BDB_IMPL, strlen(BDB_IMPL)) == 0) {
  2483. versionp = strchr(dbVersion, '/');
  2484. }
  2485. if (NULL == versionp || versionp == versionendp) {
  2486. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2487. "_cl5CheckDBVersion - Invalid dbversion: %s\n", dbVersion);
  2488. rc = CL5_BAD_DBVERSION;
  2489. goto bailout;
  2490. }
  2491. dotp = strchr(++versionp, '.');
  2492. if (NULL != dotp) {
  2493. *dotp = '\0';
  2494. dbmajor = strtol(versionp, (char **)NULL, 10);
  2495. dbminor = strtol(dotp + 1, (char **)NULL, 10);
  2496. *dotp = '.';
  2497. } else {
  2498. dbmajor = strtol(versionp, (char **)NULL, 10);
  2499. }
  2500. if (dbmajor < DB_VERSION_MAJOR) {
  2501. /* upgrade */
  2502. rc = _cl5UpgradeMajor(dbVersion, clVersion);
  2503. if (rc != CL5_SUCCESS) {
  2504. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2505. "_cl5CheckDBVersion - Upgrade %s -> %s failed\n",
  2506. dbVersion, clVersion);
  2507. rc = CL5_BAD_DBVERSION;
  2508. }
  2509. } else if (dbminor < DB_VERSION_MINOR) {
  2510. /* minor upgrade */
  2511. rc = _cl5UpgradeMinor(dbVersion, clVersion);
  2512. if (rc != CL5_SUCCESS) {
  2513. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2514. "_cl5CheckDBVersion - Upgrade %s -> %s failed\n",
  2515. dbVersion, clVersion);
  2516. rc = CL5_BAD_DBVERSION;
  2517. }
  2518. }
  2519. }
  2520. bailout:
  2521. return rc;
  2522. }
  2523. static int
  2524. _cl5ReadDBVersion(const char *dir, char *clVersion, int buflen)
  2525. {
  2526. int rc;
  2527. PRFileDesc *file;
  2528. char fName[MAXPATHLEN + 1];
  2529. char buff[BUFSIZ];
  2530. PRInt32 size;
  2531. char *tok;
  2532. char *iter = NULL;
  2533. if (clVersion) {
  2534. clVersion[0] = '\0';
  2535. }
  2536. PR_snprintf(fName, MAXPATHLEN, "%s/%s", dir, VERSION_FILE);
  2537. file = PR_Open(fName, PR_RDONLY, 777);
  2538. if (file == NULL) {
  2539. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2540. "_cl5ReadDBVersion - Failed to open DBVERSION; NSPR error - %d\n",
  2541. PR_GetError());
  2542. return CL5_SYSTEM_ERROR;
  2543. }
  2544. size = slapi_read_buffer(file, buff, BUFSIZ);
  2545. if (size < 0) {
  2546. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2547. "_cl5ReadDBVersion - Failed to read DBVERSION; NSPR error - %d\n",
  2548. PR_GetError());
  2549. PR_Close(file);
  2550. return CL5_SYSTEM_ERROR;
  2551. }
  2552. /* parse the data */
  2553. buff[size] = '\0';
  2554. tok = ldap_utf8strtok_r(buff, "\n", &iter);
  2555. if (tok) {
  2556. if (clVersion) {
  2557. PL_strncpyz(clVersion, tok, buflen);
  2558. }
  2559. }
  2560. rc = PR_Close(file);
  2561. if (rc != PR_SUCCESS) {
  2562. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2563. "_cl5ReadDBVersion - Failed to close DBVERSION; NSPR error - %d\n",
  2564. PR_GetError());
  2565. return CL5_SYSTEM_ERROR;
  2566. }
  2567. return CL5_SUCCESS;
  2568. }
  2569. static int
  2570. _cl5WriteDBVersion(void)
  2571. {
  2572. int rc;
  2573. PRFileDesc *file;
  2574. char fName[MAXPATHLEN + 1];
  2575. char clVersion[VERSION_SIZE + 1];
  2576. PRInt32 len, size;
  2577. PR_snprintf(fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, VERSION_FILE);
  2578. file = PR_Open(fName, PR_WRONLY | PR_CREATE_FILE | PR_TRUNCATE,
  2579. s_cl5Desc.dbConfig.fileMode);
  2580. if (file == NULL) {
  2581. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2582. "_cl5WriteDBVersion - Failed to open DBVERSION; NSPR error - %d\n",
  2583. PR_GetError());
  2584. return CL5_SYSTEM_ERROR;
  2585. }
  2586. /* write changelog version */
  2587. PR_snprintf(clVersion, VERSION_SIZE, "%s/%d.%d/%s\n",
  2588. BDB_IMPL, DB_VERSION_MAJOR, DB_VERSION_MINOR, BDB_REPLPLUGIN);
  2589. len = strlen(clVersion);
  2590. size = slapi_write_buffer(file, clVersion, len);
  2591. if (size != len) {
  2592. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2593. "_cl5WriteDBVersion - Failed to write DBVERSION; NSPR error - %d\n",
  2594. PR_GetError());
  2595. PR_Close(file);
  2596. return CL5_SYSTEM_ERROR;
  2597. }
  2598. rc = PR_Close(file);
  2599. if (rc != PR_SUCCESS) {
  2600. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2601. "_cl5WriteDBVersion - Failed to close DBVERSION; NSPR error - %d\n",
  2602. PR_GetError());
  2603. return CL5_SYSTEM_ERROR;
  2604. }
  2605. return CL5_SUCCESS;
  2606. }
  2607. /* must be called under the state lock */
  2608. static void
  2609. _cl5Close(void)
  2610. {
  2611. PRIntervalTime interval;
  2612. if (s_cl5Desc.dbState != CL5_STATE_CLOSED) /* Don't try to close twice */
  2613. {
  2614. /* cl5Close() set the state flag to CL5_STATE_CLOSING, which should
  2615. trigger all of the db housekeeping threads to exit, and which will
  2616. eventually cause no new update threads to start - so we wait here
  2617. for those other threads to finish before we proceed */
  2618. interval = PR_MillisecondsToInterval(100);
  2619. while (s_cl5Desc.threadCount > 0) {
  2620. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2621. "_cl5Close -Waiting for threads to exit: %d thread(s) still active\n",
  2622. s_cl5Desc.threadCount);
  2623. DS_Sleep(interval);
  2624. }
  2625. /* There should now be no threads accessing any of the changelog databases -
  2626. it is safe to remove those databases */
  2627. _cl5DBClose();
  2628. /* cleanup trimming */
  2629. _cl5TrimCleanup();
  2630. /* remove changelog if requested */
  2631. if (s_cl5Desc.dbRmOnClose) {
  2632. if (_cl5Delete(s_cl5Desc.dbDir, 1) != CL5_SUCCESS) {
  2633. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2634. "cl5Close - Failed to remove changelog\n");
  2635. }
  2636. s_cl5Desc.dbRmOnClose = PR_FALSE;
  2637. }
  2638. slapi_ch_free((void **)&s_cl5Desc.dbDir);
  2639. memset(&s_cl5Desc.dbConfig, 0, sizeof(s_cl5Desc.dbConfig));
  2640. s_cl5Desc.fatalError = PR_FALSE;
  2641. s_cl5Desc.threadCount = 0;
  2642. s_cl5Desc.dbOpenMode = CL5_OPEN_NONE;
  2643. }
  2644. }
  2645. static void
  2646. _cl5DBClose(void)
  2647. {
  2648. if (NULL != s_cl5Desc.dbFiles) {
  2649. Object *obj;
  2650. for (obj = objset_first_obj(s_cl5Desc.dbFiles); obj;
  2651. obj = objset_next_obj(s_cl5Desc.dbFiles, obj)) {
  2652. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2653. "_cl5DBClose - Deleting DB object %p\n", obj);
  2654. }
  2655. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2656. "_cl5DBClose - Closing databases in %s\n", s_cl5Desc.dbDir);
  2657. objset_delete(&s_cl5Desc.dbFiles);
  2658. }
  2659. if (NULL != s_cl5Desc.fileLock) {
  2660. PR_DestroyLock(s_cl5Desc.fileLock);
  2661. s_cl5Desc.fileLock = NULL;
  2662. }
  2663. }
  2664. /* see if the given file is a changelog db file */
  2665. static int
  2666. _cl5IsDbFile(const char *fname)
  2667. {
  2668. if (!fname || !*fname) {
  2669. return 0;
  2670. }
  2671. if (!strcmp(fname, VERSION_FILE)) {
  2672. return 1;
  2673. }
  2674. if (_cl5FileEndsWith(fname, DB_EXTENSION)) {
  2675. return 1;
  2676. }
  2677. return 0; /* not a filename we recognize as being associated with the db */
  2678. }
  2679. /* state lock must be locked */
  2680. static int
  2681. _cl5Delete(const char *clDir, int rmDir)
  2682. {
  2683. PRDir *dir;
  2684. char filename[MAXPATHLEN + 1];
  2685. PRDirEntry *entry = NULL;
  2686. int rc;
  2687. int dirisempty = 1;
  2688. /* remove all files in the directory and the directory */
  2689. dir = PR_OpenDir(clDir);
  2690. if (dir == NULL) {
  2691. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2692. "_cl5Delete - Failed to open changelog dir; NSPR error - %d\n",
  2693. PR_GetError());
  2694. return CL5_SYSTEM_ERROR;
  2695. }
  2696. while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
  2697. if (NULL == entry->name) {
  2698. break;
  2699. }
  2700. if (!_cl5IsDbFile(entry->name)) {
  2701. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2702. "_cl5Delete - Skipping file [%s/%s] because it is not a changelogdb file.\n",
  2703. clDir, entry->name);
  2704. dirisempty = 0; /* skipped at least one file - dir not empty */
  2705. continue;
  2706. }
  2707. PR_snprintf(filename, MAXPATHLEN, "%s/%s", clDir, entry->name);
  2708. /* _cl5Delete deletes the whole changelog directory with all the files
  2709. * underneath. Thus, we can just remove them physically. */
  2710. if (0 == strcmp(entry->name, VERSION_FILE)) {
  2711. /* DBVERSION */
  2712. rc = PR_Delete(filename) != PR_SUCCESS;
  2713. if (PR_SUCCESS != rc) {
  2714. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2715. "_cl5Delete - Failed to remove \"%s\"; NSPR error - %d\n",
  2716. filename, PR_GetError());
  2717. }
  2718. } else {
  2719. /* DB files */
  2720. rc = s_cl5Desc.dbEnv->dbremove(s_cl5Desc.dbEnv, 0, filename, 0,
  2721. DEFAULT_DB_ENV_OP_FLAGS);
  2722. if (rc) {
  2723. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2724. "_cl5Delete - Failed to remove \"%s\"; "
  2725. "libdb error - %d (%s)\n",
  2726. filename, rc, db_strerror(rc));
  2727. }
  2728. }
  2729. }
  2730. rc = PR_CloseDir(dir);
  2731. if (rc != PR_SUCCESS) {
  2732. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2733. "_cl5Delete - Failed to close changelog dir (%s); NSPR error - %d\n",
  2734. clDir, PR_GetError());
  2735. return CL5_SYSTEM_ERROR;
  2736. }
  2737. if (rmDir && dirisempty) {
  2738. rc = PR_RmDir(clDir);
  2739. if (rc != 0) {
  2740. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2741. "_cl5Delete - Failed to remove changelog dir (%s); errno = %d\n",
  2742. clDir, errno);
  2743. return CL5_SYSTEM_ERROR;
  2744. }
  2745. } else if (rmDir && !dirisempty) {
  2746. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2747. "_cl5Delete - Changelog dir (%s) is not empty - cannot remove\n",
  2748. clDir);
  2749. }
  2750. /* invalidate the clcache */
  2751. clcache_destroy();
  2752. return CL5_SUCCESS;
  2753. }
  2754. static void
  2755. _cl5SetDefaultDBConfig(void)
  2756. {
  2757. s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE;
  2758. }
  2759. static void
  2760. _cl5SetDBConfig(const CL5DBConfig *config)
  2761. {
  2762. /* s_cl5Desc.dbConfig.pageSize is retrieved from backend */
  2763. /* Some other configuration parameters are hardcoded... */
  2764. s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE;
  2765. }
  2766. /* Trimming helper functions */
  2767. static int
  2768. _cl5TrimInit(void)
  2769. {
  2770. /* just create the lock while we are singlethreaded */
  2771. s_cl5Desc.dbTrim.lock = PR_NewLock();
  2772. if (s_cl5Desc.dbTrim.lock == NULL) {
  2773. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2774. "_cl5InitTrimming - Failed to create lock; NSPR error - %d\n",
  2775. PR_GetError());
  2776. return CL5_SYSTEM_ERROR;
  2777. } else {
  2778. return CL5_SUCCESS;
  2779. }
  2780. }
  2781. static void
  2782. _cl5TrimCleanup(void)
  2783. {
  2784. if (s_cl5Desc.dbTrim.lock)
  2785. PR_DestroyLock(s_cl5Desc.dbTrim.lock);
  2786. memset(&s_cl5Desc.dbTrim, 0, sizeof(s_cl5Desc.dbTrim));
  2787. }
  2788. static int
  2789. _cl5TrimMain(void *param __attribute__((unused)))
  2790. {
  2791. time_t timePrev = slapi_current_utc_time();
  2792. time_t timeCompactPrev = slapi_current_utc_time();
  2793. time_t timeNow;
  2794. PR_AtomicIncrement(&s_cl5Desc.threadCount);
  2795. while (s_cl5Desc.dbState != CL5_STATE_CLOSING) {
  2796. timeNow = slapi_current_utc_time();
  2797. if (timeNow - timePrev >= s_cl5Desc.dbTrim.trimInterval) {
  2798. /* time to trim */
  2799. timePrev = timeNow;
  2800. _cl5DoTrimming();
  2801. }
  2802. if ((s_cl5Desc.dbTrim.compactInterval > 0) &&
  2803. (timeNow - timeCompactPrev >= s_cl5Desc.dbTrim.compactInterval)) {
  2804. /* time to trim */
  2805. timeCompactPrev = timeNow;
  2806. _cl5CompactDBs();
  2807. }
  2808. if (NULL == s_cl5Desc.clLock) {
  2809. /* most likely, emergency */
  2810. break;
  2811. }
  2812. PR_Lock(s_cl5Desc.clLock);
  2813. PR_WaitCondVar(s_cl5Desc.clCvar, PR_SecondsToInterval(s_cl5Desc.dbTrim.trimInterval));
  2814. PR_Unlock(s_cl5Desc.clLock);
  2815. }
  2816. PR_AtomicDecrement(&s_cl5Desc.threadCount);
  2817. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5TrimMain - Exiting\n");
  2818. return 0;
  2819. }
  2820. /*
  2821. * We remove an entry if it has been replayed to all consumers and the number
  2822. * of entries in the changelog is larger than maxEntries or age of the entry
  2823. * is larger than maxAge. Also we can't purge entries which correspond to max
  2824. * csns in the supplier's ruv. Here is a example where we can get into trouble:
  2825. *
  2826. * The server is setup with time based trimming and no consumer's
  2827. * At some point all the entries are trimmed from the changelog.
  2828. * At a later point a consumer is added and initialized online.
  2829. * Then a change is made on the supplier.
  2830. * To update the consumer, the supplier would attempt to locate the last
  2831. * change sent to the consumer in the changelog and will fail because the
  2832. * change was removed.
  2833. */
  2834. static void
  2835. _cl5DoTrimming(void)
  2836. {
  2837. Object *obj;
  2838. long numToTrim;
  2839. PR_Lock(s_cl5Desc.dbTrim.lock);
  2840. /*
  2841. * We are trimming all the changelogs. We trim file by file which
  2842. * means that some files will be trimmed more often than other. We
  2843. * might have to fix that by, for example, randomizing the starting
  2844. * point.
  2845. */
  2846. obj = objset_first_obj(s_cl5Desc.dbFiles);
  2847. while (obj && _cl5CanTrim((time_t)0, &numToTrim)) {
  2848. _cl5TrimFile(obj, &numToTrim);
  2849. obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
  2850. }
  2851. if (obj)
  2852. object_release(obj);
  2853. PR_Unlock(s_cl5Desc.dbTrim.lock);
  2854. return;
  2855. }
  2856. /*
  2857. * We are purging a changelog after a cleanAllRUV task. Find the specific
  2858. * changelog for the backend that is being cleaned, and purge all the records
  2859. * with the cleaned rid.
  2860. */
  2861. static void
  2862. _cl5DoPurging(cleanruv_purge_data *purge_data)
  2863. {
  2864. ReplicaId rid = purge_data->cleaned_rid;
  2865. const Slapi_DN *suffix_sdn = purge_data->suffix_sdn;
  2866. const char *replName = purge_data->replName;
  2867. char *replGen = purge_data->replGen;
  2868. char *fileName;
  2869. Object *obj;
  2870. PR_Lock(s_cl5Desc.dbTrim.lock);
  2871. fileName = _cl5MakeFileName(replName, replGen);
  2872. obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
  2873. if (obj) {
  2874. /* We found our changelog, now purge it */
  2875. _cl5PurgeRID(obj, rid);
  2876. object_release(obj);
  2877. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2878. "_cl5DoPurging - Purged rid (%d) from suffix (%s)\n",
  2879. rid, slapi_sdn_get_dn(suffix_sdn));
  2880. } else {
  2881. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2882. "_cl5DoPurging - Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
  2883. rid, fileName, slapi_sdn_get_dn(suffix_sdn));
  2884. }
  2885. PR_Unlock(s_cl5Desc.dbTrim.lock);
  2886. return;
  2887. }
  2888. /* clear free page files to reduce changelog */
  2889. static void
  2890. _cl5CompactDBs(void)
  2891. {
  2892. int rc;
  2893. Object *fileObj = NULL;
  2894. CL5DBFile *dbFile = NULL;
  2895. DB *db = NULL;
  2896. DB_TXN *txnid = NULL;
  2897. DB_COMPACT c_data = {0};
  2898. PR_Lock(s_cl5Desc.dbTrim.lock);
  2899. rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
  2900. if (rc) {
  2901. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2902. "_cl5CompactDBs - Failed to begin transaction; db error - %d %s\n",
  2903. rc, db_strerror(rc));
  2904. goto bail;
  2905. }
  2906. for (fileObj = objset_first_obj(s_cl5Desc.dbFiles);
  2907. fileObj;
  2908. fileObj = objset_next_obj(s_cl5Desc.dbFiles, fileObj)) {
  2909. dbFile = (CL5DBFile *)object_get_data(fileObj);
  2910. if (!dbFile) {
  2911. continue;
  2912. }
  2913. db = dbFile->db;
  2914. rc = db->compact(db, txnid, NULL /*start*/, NULL /*stop*/,
  2915. &c_data, DB_FREE_SPACE, NULL /*end*/);
  2916. if (rc) {
  2917. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2918. "_cl5CompactDBs - Failed to compact %s; db error - %d %s\n",
  2919. dbFile->replName, rc, db_strerror(rc));
  2920. goto bail;
  2921. }
  2922. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2923. "_cl5CompactDBs - %s - %d pages freed\n",
  2924. dbFile->replName, c_data.compact_pages_free);
  2925. }
  2926. bail:
  2927. if (fileObj) {
  2928. object_release(fileObj);
  2929. }
  2930. if (rc) {
  2931. rc = TXN_ABORT(txnid);
  2932. if (rc) {
  2933. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2934. "_cl5CompactDBs - Failed to abort transaction; db error - %d %s\n",
  2935. rc, db_strerror(rc));
  2936. }
  2937. } else {
  2938. rc = TXN_COMMIT(txnid);
  2939. if (rc) {
  2940. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2941. "_cl5CompactDBs - Failed to commit transaction; db error - %d %s\n",
  2942. rc, db_strerror(rc));
  2943. }
  2944. }
  2945. PR_Unlock(s_cl5Desc.dbTrim.lock);
  2946. return;
  2947. }
  2948. /*
  2949. * If the rid is not set it is the very first iteration of the changelog.
  2950. * If the rid is set, we are doing another pass, and we have a key as our
  2951. * starting point.
  2952. */
  2953. static int
  2954. _cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key)
  2955. {
  2956. DBC *cursor = NULL;
  2957. DBT data = {0};
  2958. CL5Iterator *it;
  2959. CL5DBFile *file;
  2960. int rc;
  2961. file = (CL5DBFile *)object_get_data(obj);
  2962. /* create cursor */
  2963. rc = file->db->cursor(file->db, txnid, &cursor, 0);
  2964. if (rc != 0) {
  2965. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  2966. "_cl5PurgeGetFirstEntry - Failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
  2967. rc = CL5_DB_ERROR;
  2968. goto done;
  2969. }
  2970. key->flags = DB_DBT_MALLOC;
  2971. data.flags = DB_DBT_MALLOC;
  2972. while ((rc = cursor->c_get(cursor, key, &data, rid ? DB_SET : DB_NEXT)) == 0) {
  2973. /* skip service entries on the first pass (rid == 0)*/
  2974. if (!rid && cl5HelperEntry((char *)key->data, NULL)) {
  2975. slapi_ch_free(&key->data);
  2976. slapi_ch_free(&(data.data));
  2977. continue;
  2978. }
  2979. /* format entry */
  2980. rc = cl5DBData2Entry(data.data, data.size, entry);
  2981. slapi_ch_free(&(data.data));
  2982. if (rc != 0) {
  2983. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  2984. "_cl5PurgeGetFirstEntry - Failed to format entry: %d\n", rc);
  2985. goto done;
  2986. }
  2987. it = (CL5Iterator *)slapi_ch_malloc(sizeof(CL5Iterator));
  2988. it->cursor = cursor;
  2989. object_acquire(obj);
  2990. it->file = obj;
  2991. *(CL5Iterator **)iterator = it;
  2992. return CL5_SUCCESS;
  2993. }
  2994. slapi_ch_free(&key->data);
  2995. slapi_ch_free(&(data.data));
  2996. /* walked of the end of the file */
  2997. if (rc == DB_NOTFOUND) {
  2998. rc = CL5_NOTFOUND;
  2999. goto done;
  3000. }
  3001. /* db error occured while iterating */
  3002. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3003. "_cl5PurgeGetFirstEntry - Failed to get entry; db error - %d %s\n",
  3004. rc, db_strerror(rc));
  3005. rc = CL5_DB_ERROR;
  3006. done:
  3007. /*
  3008. * We didn't success in assigning this cursor to the iterator,
  3009. * so we need to free the cursor here.
  3010. */
  3011. if (cursor)
  3012. cursor->c_close(cursor);
  3013. return rc;
  3014. }
  3015. /*
  3016. * Get the next entry. If we get a lock error we will restart the process
  3017. * starting at the current key.
  3018. */
  3019. static int
  3020. _cl5PurgeGetNextEntry(CL5Entry *entry, void *iterator, DBT *key)
  3021. {
  3022. CL5Iterator *it;
  3023. DBT data = {0};
  3024. int rc;
  3025. it = (CL5Iterator *)iterator;
  3026. key->flags = DB_DBT_MALLOC;
  3027. data.flags = DB_DBT_MALLOC;
  3028. while ((rc = it->cursor->c_get(it->cursor, key, &data, DB_NEXT)) == 0) {
  3029. if (cl5HelperEntry((char *)key->data, NULL)) {
  3030. slapi_ch_free(&key->data);
  3031. slapi_ch_free(&(data.data));
  3032. continue;
  3033. }
  3034. /* format entry */
  3035. rc = cl5DBData2Entry(data.data, data.size, entry);
  3036. slapi_ch_free(&(data.data));
  3037. if (rc != 0) {
  3038. if (rc != CL5_DB_LOCK_ERROR) {
  3039. /* Not a lock error, free the key */
  3040. slapi_ch_free(&key->data);
  3041. }
  3042. slapi_log_err(rc == CL5_DB_LOCK_ERROR ? SLAPI_LOG_REPL : SLAPI_LOG_ERR,
  3043. repl_plugin_name_cl,
  3044. "_cl5PurgeGetNextEntry - Failed to format entry: %d\n",
  3045. rc);
  3046. }
  3047. return rc;
  3048. }
  3049. slapi_ch_free(&(data.data));
  3050. /* walked of the end of the file or entry is out of range */
  3051. if (rc == 0 || rc == DB_NOTFOUND) {
  3052. slapi_ch_free(&key->data);
  3053. return CL5_NOTFOUND;
  3054. }
  3055. if (rc != CL5_DB_LOCK_ERROR) {
  3056. /* Not a lock error, free the key */
  3057. slapi_ch_free(&key->data);
  3058. }
  3059. /* cursor operation failed */
  3060. slapi_log_err(rc == CL5_DB_LOCK_ERROR ? SLAPI_LOG_REPL : SLAPI_LOG_ERR,
  3061. repl_plugin_name_cl,
  3062. "_cl5PurgeGetNextEntry - Failed to get entry; db error - %d %s\n",
  3063. rc, db_strerror(rc));
  3064. return rc;
  3065. }
  3066. #define MAX_RETRIES 10
  3067. /*
  3068. * _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid)
  3069. *
  3070. * Clean the entire changelog of updates from the "cleaned rid" via CLEANALLRUV
  3071. * Delete entries in batches so we don't consume too many db locks, and we don't
  3072. * lockup the changelog during the entire purging process using one transaction.
  3073. * We save the key from the last iteration so we don't have to start from the
  3074. * beginning for each new iteration.
  3075. */
  3076. static void
  3077. _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid)
  3078. {
  3079. slapi_operation_parameters op = {0};
  3080. ReplicaId csn_rid;
  3081. CL5Entry entry;
  3082. DB_TXN *txnid = NULL;
  3083. DBT key = {0};
  3084. void *iterator = NULL;
  3085. long totalTrimmed = 0;
  3086. long trimmed = 0;
  3087. char *starting_key = NULL;
  3088. int batch_count = 0;
  3089. int db_lock_retry_count = 0;
  3090. int first_pass = 1;
  3091. int finished = 0;
  3092. int rc = 0;
  3093. PR_ASSERT(obj);
  3094. entry.op = &op;
  3095. /*
  3096. * Keep processing the changelog until we are done, shutting down, or we
  3097. * maxed out on the db lock retries.
  3098. */
  3099. while (!finished && db_lock_retry_count < MAX_RETRIES && !slapi_is_shutting_down()) {
  3100. trimmed = 0;
  3101. /*
  3102. * Sleep a bit to allow others to use the changelog - we can't hog the
  3103. * changelog for the entire purge.
  3104. */
  3105. DS_Sleep(PR_MillisecondsToInterval(100));
  3106. rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
  3107. if (rc != 0) {
  3108. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3109. "_cl5PurgeRID - Failed to begin transaction; db error - %d %s. "
  3110. "Changelog was not purged of rid(%d)\n",
  3111. rc, db_strerror(rc), cleaned_rid);
  3112. return;
  3113. }
  3114. /*
  3115. * Check every changelog entry for the cleaned rid
  3116. */
  3117. rc = _cl5PurgeGetFirstEntry(obj, &entry, &iterator, txnid, first_pass ? 0 : cleaned_rid, &key);
  3118. first_pass = 0;
  3119. while (rc == CL5_SUCCESS && !slapi_is_shutting_down()) {
  3120. /*
  3121. * Store the new starting key - we need this starting key in case
  3122. * we run out of locks and have to start the transaction over.
  3123. */
  3124. slapi_ch_free_string(&starting_key);
  3125. starting_key = slapi_ch_strdup((char *)key.data);
  3126. if (trimmed == 10000 || (batch_count && trimmed == batch_count)) {
  3127. /*
  3128. * Break out, and commit these deletes. Do not free the key,
  3129. * we need it for the next pass.
  3130. */
  3131. cl5_operation_parameters_done(&op);
  3132. db_lock_retry_count = 0; /* reset the retry count */
  3133. break;
  3134. }
  3135. if (op.csn) {
  3136. csn_rid = csn_get_replicaid(op.csn);
  3137. if (csn_rid == cleaned_rid) {
  3138. rc = _cl5CurrentDeleteEntry(iterator);
  3139. if (rc != CL5_SUCCESS) {
  3140. /* log error */
  3141. cl5_operation_parameters_done(&op);
  3142. if (rc == CL5_DB_LOCK_ERROR) {
  3143. /*
  3144. * Ran out of locks, need to restart the transaction.
  3145. * Reduce the the batch count and reset the key to
  3146. * the starting point
  3147. */
  3148. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3149. "_cl5PurgeRID - Ran out of db locks deleting entry. "
  3150. "Reduce the batch value and restart.\n");
  3151. batch_count = trimmed - 10;
  3152. if (batch_count < 10) {
  3153. batch_count = 10;
  3154. }
  3155. trimmed = 0;
  3156. slapi_ch_free(&(key.data));
  3157. key.data = starting_key;
  3158. starting_key = NULL;
  3159. db_lock_retry_count++;
  3160. break;
  3161. } else {
  3162. /* fatal error */
  3163. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3164. "_cl5PurgeRID - Fatal error (%d)\n", rc);
  3165. slapi_ch_free(&(key.data));
  3166. finished = 1;
  3167. break;
  3168. }
  3169. }
  3170. trimmed++;
  3171. }
  3172. }
  3173. slapi_ch_free(&(key.data));
  3174. cl5_operation_parameters_done(&op);
  3175. rc = _cl5PurgeGetNextEntry(&entry, iterator, &key);
  3176. if (rc == CL5_DB_LOCK_ERROR) {
  3177. /*
  3178. * Ran out of locks, need to restart the transaction.
  3179. * Reduce the the batch count and reset the key to the starting
  3180. * point.
  3181. */
  3182. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3183. "_cl5PurgeRID - Ran out of db locks getting the next entry. "
  3184. "Reduce the batch value and restart.\n");
  3185. batch_count = trimmed - 10;
  3186. if (batch_count < 10) {
  3187. batch_count = 10;
  3188. }
  3189. trimmed = 0;
  3190. cl5_operation_parameters_done(&op);
  3191. slapi_ch_free(&(key.data));
  3192. key.data = starting_key;
  3193. starting_key = NULL;
  3194. db_lock_retry_count++;
  3195. break;
  3196. }
  3197. }
  3198. if (rc == CL5_NOTFOUND) {
  3199. /* Scanned the entire changelog, we're done */
  3200. finished = 1;
  3201. }
  3202. /* Destroy the iterator before we finish with the txn */
  3203. cl5DestroyIterator(iterator);
  3204. /*
  3205. * Commit or abort the txn
  3206. */
  3207. if (rc == CL5_SUCCESS || rc == CL5_NOTFOUND) {
  3208. rc = TXN_COMMIT(txnid);
  3209. if (rc != 0) {
  3210. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3211. "_cl5PurgeRID - Failed to commit transaction; db error - %d %s. "
  3212. "Changelog was not completely purged of rid (%d)\n",
  3213. rc, db_strerror(rc), cleaned_rid);
  3214. break;
  3215. } else if (finished) {
  3216. /* We're done */
  3217. totalTrimmed += trimmed;
  3218. break;
  3219. } else {
  3220. /* Not done yet */
  3221. totalTrimmed += trimmed;
  3222. trimmed = 0;
  3223. }
  3224. } else {
  3225. rc = TXN_ABORT(txnid);
  3226. if (rc != 0) {
  3227. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3228. "_cl5PurgeRID - Failed to abort transaction; db error - %d %s. "
  3229. "Changelog was not completely purged of rid (%d)\n",
  3230. rc, db_strerror(rc), cleaned_rid);
  3231. }
  3232. if (batch_count == 0) {
  3233. /* This was not a retry. Fatal error, break out */
  3234. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3235. "_cl5PurgeRID - Changelog was not purged of rid (%d)\n",
  3236. cleaned_rid);
  3237. break;
  3238. }
  3239. }
  3240. }
  3241. slapi_ch_free_string(&starting_key);
  3242. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3243. "_cl5PurgeRID - Removed (%ld entries) that originated from rid (%d)\n",
  3244. totalTrimmed, cleaned_rid);
  3245. }
  3246. /* Note that each file contains changes for a single replicated area.
  3247. trimming algorithm:
  3248. */
  3249. #define CL5_TRIM_MAX_PER_TRANSACTION 10
  3250. static void
  3251. _cl5TrimFile(Object *obj, long *numToTrim)
  3252. {
  3253. DB_TXN *txnid;
  3254. RUV *ruv = NULL;
  3255. CL5Entry entry;
  3256. slapi_operation_parameters op = {0};
  3257. ReplicaId csn_rid;
  3258. void *it;
  3259. int finished = 0, totalTrimmed = 0, count;
  3260. PRBool abort;
  3261. char strCSN[CSN_STRSIZE];
  3262. int rc;
  3263. PR_ASSERT(obj);
  3264. /* construct the ruv up to which we can purge */
  3265. rc = _cl5GetRUV2Purge2(obj, &ruv);
  3266. if (rc != CL5_SUCCESS || ruv == NULL) {
  3267. return;
  3268. }
  3269. entry.op = &op;
  3270. while (!finished && !slapi_is_shutting_down()) {
  3271. it = NULL;
  3272. count = 0;
  3273. txnid = NULL;
  3274. abort = PR_FALSE;
  3275. /* DB txn lock accessed pages until the end of the transaction. */
  3276. rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
  3277. if (rc != 0) {
  3278. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3279. "_cl5TrimFile - Failed to begin transaction; db error - %d %s\n",
  3280. rc, db_strerror(rc));
  3281. finished = PR_TRUE;
  3282. break;
  3283. }
  3284. finished = _cl5GetFirstEntry(obj, &entry, &it, txnid);
  3285. while (!finished && !slapi_is_shutting_down()) {
  3286. /*
  3287. * This change can be trimmed if it exceeds purge
  3288. * parameters and has been seen by all consumers.
  3289. */
  3290. if (op.csn == NULL) {
  3291. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "_cl5TrimFile - "
  3292. "Operation missing csn, moving on to next entry.\n");
  3293. cl5_operation_parameters_done(&op);
  3294. finished = _cl5GetNextEntry(&entry, it);
  3295. continue;
  3296. }
  3297. csn_rid = csn_get_replicaid(op.csn);
  3298. if ((*numToTrim > 0 || _cl5CanTrim(entry.time, numToTrim)) &&
  3299. ruv_covers_csn_strict(ruv, op.csn)) {
  3300. rc = _cl5CurrentDeleteEntry(it);
  3301. if (rc == CL5_SUCCESS) {
  3302. rc = _cl5UpdateRUV(obj, op.csn, PR_FALSE, PR_TRUE);
  3303. }
  3304. if (rc == CL5_SUCCESS) {
  3305. if (*numToTrim > 0)
  3306. (*numToTrim)--;
  3307. count++;
  3308. } else {
  3309. /* The above two functions have logged the error */
  3310. abort = PR_TRUE;
  3311. }
  3312. } else {
  3313. /* The changelog DB is time ordered. If we can not trim
  3314. * a CSN, we will not be allowed to trim the rest of the
  3315. * CSNs generally. However, the maxcsn of each replica ID
  3316. * is always kept in the changelog as an anchor for
  3317. * replaying future changes. We have to skip those anchor
  3318. * CSNs, otherwise a non-active replica ID could block
  3319. * the trim forever.
  3320. */
  3321. CSN *maxcsn = NULL;
  3322. ruv_get_largest_csn_for_replica(ruv, csn_rid, &maxcsn);
  3323. if (csn_compare(op.csn, maxcsn) != 0) {
  3324. /* op.csn is not anchor CSN */
  3325. finished = 1;
  3326. } else {
  3327. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  3328. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3329. "_cl5TrimFile - Changelog purge skipped anchor csn %s\n",
  3330. csn_as_string(maxcsn, PR_FALSE, strCSN));
  3331. }
  3332. /* extra read to skip the current record */
  3333. cl5_operation_parameters_done(&op);
  3334. finished = _cl5GetNextEntry(&entry, it);
  3335. }
  3336. if (maxcsn)
  3337. csn_free(&maxcsn);
  3338. }
  3339. cl5_operation_parameters_done(&op);
  3340. if (finished || abort || count >= CL5_TRIM_MAX_PER_TRANSACTION) {
  3341. /* If we reach CL5_TRIM_MAX_PER_TRANSACTION,
  3342. * we close the cursor,
  3343. * commit the transaction and restart a new transaction
  3344. */
  3345. break;
  3346. }
  3347. finished = _cl5GetNextEntry(&entry, it);
  3348. }
  3349. /* MAB: We need to close the cursor BEFORE the txn commits/aborts.
  3350. * If we don't respect this order, we'll screw up the database,
  3351. * placing it in DB_RUNRECOVERY mode
  3352. */
  3353. cl5DestroyIterator(it);
  3354. if (abort) {
  3355. finished = 1;
  3356. rc = TXN_ABORT(txnid);
  3357. if (rc != 0) {
  3358. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3359. "_cl5TrimFile - Failed to abort transaction; db error - %d %s\n",
  3360. rc, db_strerror(rc));
  3361. }
  3362. } else {
  3363. rc = TXN_COMMIT(txnid);
  3364. if (rc != 0) {
  3365. finished = 1;
  3366. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3367. "_cl5TrimFile - Failed to commit transaction; db error - %d %s\n",
  3368. rc, db_strerror(rc));
  3369. } else {
  3370. totalTrimmed += count;
  3371. }
  3372. }
  3373. } /* While (!finished) */
  3374. if (ruv)
  3375. ruv_destroy(&ruv);
  3376. if (totalTrimmed) {
  3377. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5TrimFile - Trimmed %d changes from the changelog\n",
  3378. totalTrimmed);
  3379. }
  3380. }
  3381. static PRBool
  3382. _cl5CanTrim(time_t time, long *numToTrim)
  3383. {
  3384. *numToTrim = 0;
  3385. if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
  3386. return PR_FALSE;
  3387. }
  3388. if (s_cl5Desc.dbTrim.maxAge == 0) {
  3389. *numToTrim = cl5GetOperationCount(NULL) - s_cl5Desc.dbTrim.maxEntries;
  3390. return (*numToTrim > 0);
  3391. }
  3392. if (s_cl5Desc.dbTrim.maxEntries > 0 &&
  3393. (*numToTrim = cl5GetOperationCount(NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
  3394. return PR_TRUE;
  3395. }
  3396. if (time) {
  3397. return (slapi_current_utc_time() - time > s_cl5Desc.dbTrim.maxAge);
  3398. } else {
  3399. return PR_TRUE;
  3400. }
  3401. }
  3402. static int
  3403. _cl5ReadRUV(const char *replGen, Object *obj, PRBool purge)
  3404. {
  3405. int rc;
  3406. char csnStr[CSN_STRSIZE];
  3407. DBT key = {0}, data = {0};
  3408. struct berval **vals = NULL;
  3409. CL5DBFile *file;
  3410. char *pos;
  3411. char *agmt_name;
  3412. PR_ASSERT(replGen && obj);
  3413. file = (CL5DBFile *)object_get_data(obj);
  3414. PR_ASSERT(file);
  3415. agmt_name = get_thread_private_agmtname();
  3416. if (purge) { /* read purge vector entry */
  3417. key.data = _cl5GetHelperEntryKey(PURGE_RUV_TIME, csnStr);
  3418. } else { /* read upper bound vector */
  3419. key.data = _cl5GetHelperEntryKey(MAX_RUV_TIME, csnStr);
  3420. }
  3421. key.size = CSN_STRSIZE;
  3422. data.flags = DB_DBT_MALLOC;
  3423. rc = file->db->get(file->db, NULL /*txn*/, &key, &data, 0);
  3424. switch (rc) {
  3425. case 0:
  3426. pos = data.data;
  3427. rc = _cl5ReadBervals(&vals, &pos, data.size);
  3428. slapi_ch_free(&(data.data));
  3429. if (rc != CL5_SUCCESS)
  3430. goto done;
  3431. if (purge) {
  3432. rc = ruv_init_from_bervals(vals, &file->purgeRUV);
  3433. } else {
  3434. rc = ruv_init_from_bervals(vals, &file->maxRUV);
  3435. }
  3436. if (rc != RUV_SUCCESS) {
  3437. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3438. "_cl5ReadRUV - %s - Failed to initialize %s ruv; "
  3439. "RUV error %d\n",
  3440. agmt_name, purge ? "purge" : "upper bound", rc);
  3441. rc = CL5_RUV_ERROR;
  3442. goto done;
  3443. }
  3444. /* delete the entry; it is re-added when file
  3445. is successfully closed */
  3446. file->db->del(file->db, NULL, &key, 0);
  3447. rc = CL5_SUCCESS;
  3448. goto done;
  3449. case DB_NOTFOUND: /* RUV is lost - need to construct */
  3450. rc = _cl5ConstructRUV(replGen, obj, purge);
  3451. goto done;
  3452. default:
  3453. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3454. "_cl5ReadRUV - %s - Failed to get purge RUV; "
  3455. "db error - %d %s\n",
  3456. agmt_name, rc, db_strerror(rc));
  3457. rc = CL5_DB_ERROR;
  3458. goto done;
  3459. }
  3460. done:
  3461. ber_bvecfree(vals);
  3462. return rc;
  3463. }
  3464. static int
  3465. _cl5WriteRUV(CL5DBFile *file, PRBool purge)
  3466. {
  3467. int rc;
  3468. DBT key = {0}, data = {0};
  3469. char csnStr[CSN_STRSIZE];
  3470. struct berval **vals;
  3471. DB_TXN *txnid = NULL;
  3472. char *buff;
  3473. if ((purge && file->purgeRUV == NULL) || (!purge && file->maxRUV == NULL))
  3474. return CL5_SUCCESS;
  3475. if (purge) {
  3476. /* Set the minimum CSN of each vector to a dummy CSN that contains
  3477. * just a replica ID, e.g. 00000000000000010000.
  3478. * The minimum CSN in a purge RUV is not used so the value doesn't
  3479. * matter, but it needs to be set to something so that it can be
  3480. * flushed to changelog at shutdown and parsed at startup with the
  3481. * regular string-to-RUV parsing routines. */
  3482. ruv_insert_dummy_min_csn(file->purgeRUV);
  3483. key.data = _cl5GetHelperEntryKey(PURGE_RUV_TIME, csnStr);
  3484. rc = ruv_to_bervals(file->purgeRUV, &vals);
  3485. } else {
  3486. key.data = _cl5GetHelperEntryKey(MAX_RUV_TIME, csnStr);
  3487. rc = ruv_to_bervals(file->maxRUV, &vals);
  3488. }
  3489. key.size = CSN_STRSIZE;
  3490. rc = _cl5WriteBervals(vals, &buff, &data.size);
  3491. data.data = buff;
  3492. ber_bvecfree(vals);
  3493. if (rc != CL5_SUCCESS) {
  3494. return rc;
  3495. }
  3496. rc = file->db->put(file->db, txnid, &key, &data, 0);
  3497. slapi_ch_free(&(data.data));
  3498. if (rc == 0) {
  3499. return CL5_SUCCESS;
  3500. } else {
  3501. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3502. "_cl5WriteRUV - Failed to write %s RUV for file %s; db error - %d (%s)\n",
  3503. purge ? "purge" : "upper bound", file->name, rc, db_strerror(rc));
  3504. if (CL5_OS_ERR_IS_DISKFULL(rc)) {
  3505. cl5_set_diskfull();
  3506. return CL5_DB_ERROR;
  3507. }
  3508. return CL5_DB_ERROR;
  3509. }
  3510. }
  3511. /* This is a very slow process since we have to read every changelog entry.
  3512. Hopefully, this function is not called too often */
  3513. static int
  3514. _cl5ConstructRUV(const char *replGen, Object *obj, PRBool purge)
  3515. {
  3516. int rc;
  3517. CL5Entry entry;
  3518. void *iterator = NULL;
  3519. slapi_operation_parameters op = {0};
  3520. CL5DBFile *file;
  3521. ReplicaId rid;
  3522. PR_ASSERT(replGen && obj);
  3523. file = (CL5DBFile *)object_get_data(obj);
  3524. PR_ASSERT(file);
  3525. /* construct the RUV */
  3526. if (purge)
  3527. rc = ruv_init_new(replGen, 0, NULL, &file->purgeRUV);
  3528. else
  3529. rc = ruv_init_new(replGen, 0, NULL, &file->maxRUV);
  3530. if (rc != RUV_SUCCESS) {
  3531. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV - "
  3532. "Failed to initialize %s RUV for file %s; ruv error - %d\n",
  3533. purge ? "purge" : "upper bound", file->name, rc);
  3534. return CL5_RUV_ERROR;
  3535. }
  3536. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name_cl,
  3537. "_cl5ConstructRUV - Rebuilding the replication changelog RUV, "
  3538. "this may take several minutes...\n");
  3539. entry.op = &op;
  3540. rc = _cl5GetFirstEntry(obj, &entry, &iterator, NULL);
  3541. while (rc == CL5_SUCCESS) {
  3542. if (op.csn) {
  3543. rid = csn_get_replicaid(op.csn);
  3544. } else {
  3545. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name_cl, "_cl5ConstructRUV - "
  3546. "Operation missing csn, moving on to next entry.\n");
  3547. cl5_operation_parameters_done(&op);
  3548. rc = _cl5GetNextEntry(&entry, iterator);
  3549. continue;
  3550. }
  3551. if (is_cleaned_rid(rid)) {
  3552. /* skip this entry as the rid is invalid */
  3553. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV - "
  3554. "Skipping entry because its csn contains a cleaned rid(%d)\n",
  3555. rid);
  3556. cl5_operation_parameters_done(&op);
  3557. rc = _cl5GetNextEntry(&entry, iterator);
  3558. continue;
  3559. }
  3560. if (purge)
  3561. rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn);
  3562. else
  3563. rc = ruv_set_csns(file->maxRUV, op.csn, NULL);
  3564. cl5_operation_parameters_done(&op);
  3565. if (rc != RUV_SUCCESS) {
  3566. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV - "
  3567. "Failed to update %s RUV for file %s; ruv error - %d\n",
  3568. purge ? "purge" : "upper bound", file->name, rc);
  3569. rc = CL5_RUV_ERROR;
  3570. continue;
  3571. }
  3572. rc = _cl5GetNextEntry(&entry, iterator);
  3573. }
  3574. cl5_operation_parameters_done(&op);
  3575. if (iterator)
  3576. cl5DestroyIterator(iterator);
  3577. if (rc == CL5_NOTFOUND) {
  3578. rc = CL5_SUCCESS;
  3579. } else {
  3580. if (purge)
  3581. ruv_destroy(&file->purgeRUV);
  3582. else
  3583. ruv_destroy(&file->maxRUV);
  3584. }
  3585. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name_cl,
  3586. "_cl5ConstructRUV - Rebuilding replication changelog RUV complete. Result %d (%s)\n",
  3587. rc, rc ? "Failed to rebuild changelog RUV" : "Success");
  3588. return rc;
  3589. }
  3590. static int
  3591. _cl5UpdateRUV(Object *obj, CSN *csn, PRBool newReplica, PRBool purge)
  3592. {
  3593. ReplicaId rid;
  3594. int rc = RUV_SUCCESS; /* initialize rc to avoid erroneous logs */
  3595. CL5DBFile *file;
  3596. PR_ASSERT(obj && csn);
  3597. file = (CL5DBFile *)object_get_data(obj);
  3598. /*
  3599. * if purge is TRUE, file->purgeRUV must be set;
  3600. * if purge is FALSE, maxRUV must be set
  3601. */
  3602. PR_ASSERT(file && ((purge && file->purgeRUV) || (!purge && file->maxRUV)));
  3603. rid = csn_get_replicaid(csn);
  3604. /* update vector only if this replica is not yet part of RUV */
  3605. if (purge && newReplica) {
  3606. if (ruv_contains_replica(file->purgeRUV, rid)) {
  3607. return CL5_SUCCESS;
  3608. } else {
  3609. /* if the replica is not part of the purgeRUV yet, add it unless it's from a cleaned rid */
  3610. ruv_add_replica(file->purgeRUV, rid, multimaster_get_local_purl());
  3611. }
  3612. } else {
  3613. if (purge) {
  3614. rc = ruv_set_csns(file->purgeRUV, csn, NULL);
  3615. } else {
  3616. rc = ruv_set_csns(file->maxRUV, csn, NULL);
  3617. }
  3618. }
  3619. if (rc != RUV_SUCCESS) {
  3620. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5UpdatePurgeRUV - "
  3621. "Failed to update %s RUV for file %s; ruv error - %d\n",
  3622. purge ? "purge" : "upper bound", file->name, rc);
  3623. return CL5_RUV_ERROR;
  3624. }
  3625. return CL5_SUCCESS;
  3626. }
  3627. static int
  3628. _cl5EnumConsumerRUV(const ruv_enum_data *element, void *arg)
  3629. {
  3630. int rc;
  3631. RUV *ruv;
  3632. CSN *csn = NULL;
  3633. PR_ASSERT(element && element->csn && arg);
  3634. ruv = (RUV *)arg;
  3635. rc = ruv_get_largest_csn_for_replica(ruv, csn_get_replicaid(element->csn), &csn);
  3636. if (rc != RUV_SUCCESS || csn == NULL || csn_compare(element->csn, csn) < 0) {
  3637. ruv_set_max_csn(ruv, element->csn, NULL);
  3638. }
  3639. if (csn)
  3640. csn_free(&csn);
  3641. return 0;
  3642. }
  3643. static int
  3644. _cl5GetRUV2Purge2(Object *fileObj, RUV **ruv)
  3645. {
  3646. int rc = CL5_SUCCESS;
  3647. CL5DBFile *dbFile;
  3648. Object *rObj = NULL;
  3649. Replica *r = NULL;
  3650. Object *agmtObj = NULL;
  3651. Repl_Agmt *agmt;
  3652. Object *consRUVObj, *supRUVObj;
  3653. RUV *consRUV, *supRUV;
  3654. CSN *csn;
  3655. PR_ASSERT(fileObj && ruv);
  3656. if (!ruv) {
  3657. rc = CL5_UNKNOWN_ERROR;
  3658. goto done;
  3659. }
  3660. dbFile = (CL5DBFile *)object_get_data(fileObj);
  3661. PR_ASSERT(dbFile);
  3662. rObj = replica_get_by_name(dbFile->replName);
  3663. PR_ASSERT(rObj);
  3664. if (!rObj) {
  3665. rc = CL5_NOTFOUND;
  3666. goto done;
  3667. }
  3668. r = (Replica *)object_get_data(rObj);
  3669. PR_ASSERT(r);
  3670. /* We start with this replica's RUV. See note in _cl5DoTrimming */
  3671. supRUVObj = replica_get_ruv(r);
  3672. PR_ASSERT(supRUVObj);
  3673. supRUV = (RUV *)object_get_data(supRUVObj);
  3674. PR_ASSERT(supRUV);
  3675. *ruv = ruv_dup(supRUV);
  3676. object_release(supRUVObj);
  3677. agmtObj = agmtlist_get_first_agreement_for_replica(r);
  3678. while (agmtObj) {
  3679. agmt = (Repl_Agmt *)object_get_data(agmtObj);
  3680. PR_ASSERT(agmt);
  3681. if (!agmt_is_enabled(agmt)) {
  3682. agmtObj = agmtlist_get_next_agreement_for_replica(r, agmtObj);
  3683. continue;
  3684. }
  3685. consRUVObj = agmt_get_consumer_ruv(agmt);
  3686. if (consRUVObj) {
  3687. consRUV = (RUV *)object_get_data(consRUVObj);
  3688. rc = ruv_enumerate_elements(consRUV, _cl5EnumConsumerRUV, *ruv);
  3689. if (rc != RUV_SUCCESS) {
  3690. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetRUV2Purge2 - "
  3691. "Failed to construct ruv; ruv error - %d\n",
  3692. rc);
  3693. rc = CL5_RUV_ERROR;
  3694. object_release(consRUVObj);
  3695. object_release(agmtObj);
  3696. break;
  3697. }
  3698. object_release(consRUVObj);
  3699. }
  3700. agmtObj = agmtlist_get_next_agreement_for_replica(r, agmtObj);
  3701. }
  3702. /* check if there is any data in the constructed ruv - otherwise get rid of it */
  3703. if (ruv_get_max_csn(*ruv, &csn) != RUV_SUCCESS || csn == NULL) {
  3704. ruv_destroy(ruv);
  3705. } else {
  3706. csn_free(&csn);
  3707. }
  3708. done:
  3709. if (rObj)
  3710. object_release(rObj);
  3711. if (rc != CL5_SUCCESS && ruv)
  3712. ruv_destroy(ruv);
  3713. return rc;
  3714. }
  3715. static int
  3716. _cl5GetEntryCount(CL5DBFile *file)
  3717. {
  3718. int rc;
  3719. char csnStr[CSN_STRSIZE];
  3720. DBT key = {0}, data = {0};
  3721. DB_BTREE_STAT *stats = NULL;
  3722. PR_ASSERT(file);
  3723. /* read entry count. if the entry is there - the file was successfully closed
  3724. last time it was used */
  3725. key.data = _cl5GetHelperEntryKey(ENTRY_COUNT_TIME, csnStr);
  3726. key.size = CSN_STRSIZE;
  3727. data.flags = DB_DBT_MALLOC;
  3728. rc = file->db->get(file->db, NULL /*txn*/, &key, &data, 0);
  3729. switch (rc) {
  3730. case 0:
  3731. file->entryCount = *(int *)data.data;
  3732. slapi_ch_free(&(data.data));
  3733. /* delete the entry. the entry is re-added when file
  3734. is successfully closed */
  3735. file->db->del(file->db, NULL, &key, 0);
  3736. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3737. "_cl5GetEntryCount - %d changes for replica %s\n",
  3738. file->entryCount, file->replName);
  3739. return CL5_SUCCESS;
  3740. case DB_NOTFOUND:
  3741. file->entryCount = 0;
  3742. rc = file->db->stat(file->db, NULL, (void *)&stats, 0);
  3743. if (rc != 0) {
  3744. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3745. "_cl5GetEntryCount - Failed to get changelog statistics; "
  3746. "db error - %d %s\n",
  3747. rc, db_strerror(rc));
  3748. return CL5_DB_ERROR;
  3749. }
  3750. #ifdef DB30
  3751. file->entryCount = stats->bt_nrecs;
  3752. #else /* DB31 */
  3753. file->entryCount = stats->bt_ndata;
  3754. #endif
  3755. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3756. "_cl5GetEntryCount - %d changes for replica %s\n",
  3757. file->entryCount, file->replName);
  3758. slapi_ch_free((void **)&stats);
  3759. return CL5_SUCCESS;
  3760. default:
  3761. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3762. "_cl5GetEntryCount - Failed to get count entry; "
  3763. "db error - %d %s\n",
  3764. rc, db_strerror(rc));
  3765. return CL5_DB_ERROR;
  3766. }
  3767. }
  3768. static int
  3769. _cl5WriteEntryCount(CL5DBFile *file)
  3770. {
  3771. int rc;
  3772. DBT key = {0}, data = {0};
  3773. char csnStr[CSN_STRSIZE];
  3774. DB_TXN *txnid = NULL;
  3775. key.data = _cl5GetHelperEntryKey(ENTRY_COUNT_TIME, csnStr);
  3776. key.size = CSN_STRSIZE;
  3777. data.data = (void *)&file->entryCount;
  3778. data.size = sizeof(file->entryCount);
  3779. rc = file->db->put(file->db, txnid, &key, &data, 0);
  3780. if (rc == 0) {
  3781. return CL5_SUCCESS;
  3782. } else {
  3783. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3784. "_cl5WriteEntryCount - "
  3785. "Failed to write count entry for file %s; db error - %d %s\n",
  3786. file->name, rc, db_strerror(rc));
  3787. if (CL5_OS_ERR_IS_DISKFULL(rc)) {
  3788. cl5_set_diskfull();
  3789. return CL5_DB_ERROR;
  3790. }
  3791. return CL5_DB_ERROR;
  3792. }
  3793. }
  3794. static const char *
  3795. _cl5OperationType2Str(int type)
  3796. {
  3797. switch (type) {
  3798. case SLAPI_OPERATION_ADD:
  3799. return T_ADDCTSTR;
  3800. case SLAPI_OPERATION_MODIFY:
  3801. return T_MODIFYCTSTR;
  3802. case SLAPI_OPERATION_MODRDN:
  3803. return T_MODRDNCTSTR;
  3804. case SLAPI_OPERATION_DELETE:
  3805. return T_DELETECTSTR;
  3806. default:
  3807. return NULL;
  3808. }
  3809. }
  3810. static int
  3811. _cl5Str2OperationType(const char *str)
  3812. {
  3813. if (strcasecmp(str, T_ADDCTSTR) == 0)
  3814. return SLAPI_OPERATION_ADD;
  3815. if (strcasecmp(str, T_MODIFYCTSTR) == 0)
  3816. return SLAPI_OPERATION_MODIFY;
  3817. if (strcasecmp(str, T_MODRDNCTSTR) == 0)
  3818. return SLAPI_OPERATION_MODRDN;
  3819. if (strcasecmp(str, T_DELETECTSTR) == 0)
  3820. return SLAPI_OPERATION_DELETE;
  3821. return -1;
  3822. }
  3823. static int
  3824. _cl5Operation2LDIF(const slapi_operation_parameters *op, const char *replGen, char **ldifEntry, PRInt32 *lenLDIF)
  3825. {
  3826. int len = 2;
  3827. lenstr *l = NULL;
  3828. const char *strType;
  3829. const char *strDeleteOldRDN = "false";
  3830. char *buff, *start;
  3831. LDAPMod **add_mods;
  3832. char *rawDN = NULL;
  3833. char strCSN[CSN_STRSIZE];
  3834. PR_ASSERT(op && replGen && ldifEntry && IsValidOperation(op));
  3835. strType = _cl5OperationType2Str(op->operation_type);
  3836. csn_as_string(op->csn, PR_FALSE, strCSN);
  3837. /* find length of the buffer */
  3838. len += LDIF_SIZE_NEEDED(strlen(T_CHANGETYPESTR), strlen(strType));
  3839. len += LDIF_SIZE_NEEDED(strlen(T_REPLGEN), strlen(replGen));
  3840. len += LDIF_SIZE_NEEDED(strlen(T_CSNSTR), strlen(strCSN));
  3841. len += LDIF_SIZE_NEEDED(strlen(T_UNIQUEIDSTR), strlen(op->target_address.uniqueid));
  3842. switch (op->operation_type) {
  3843. case SLAPI_OPERATION_ADD:
  3844. if (NULL == op->p.p_add.target_entry) {
  3845. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3846. "_cl5Operation2LDIF - ADD - entry is NULL\n");
  3847. return CL5_BAD_FORMAT;
  3848. }
  3849. if (op->p.p_add.parentuniqueid)
  3850. len += LDIF_SIZE_NEEDED(strlen(T_PARENTIDSTR), strlen(op->p.p_add.parentuniqueid));
  3851. slapi_entry2mods(op->p.p_add.target_entry, &rawDN, &add_mods);
  3852. len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), strlen(rawDN));
  3853. l = make_changes_string(add_mods, NULL);
  3854. len += LDIF_SIZE_NEEDED(strlen(T_CHANGESTR), l->ls_len);
  3855. ldap_mods_free(add_mods, 1);
  3856. break;
  3857. case SLAPI_OPERATION_MODIFY:
  3858. if (NULL == op->p.p_modify.modify_mods) {
  3859. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3860. "_cl5Operation2LDIF - MODIFY - mods are NULL\n");
  3861. return CL5_BAD_FORMAT;
  3862. }
  3863. len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), REPL_GET_DN_LEN(&op->target_address));
  3864. l = make_changes_string(op->p.p_modify.modify_mods, NULL);
  3865. len += LDIF_SIZE_NEEDED(strlen(T_CHANGESTR), l->ls_len);
  3866. break;
  3867. case SLAPI_OPERATION_MODRDN:
  3868. if (NULL == op->p.p_modrdn.modrdn_mods) {
  3869. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3870. "_cl5Operation2LDIF - MODRDN - mods are NULL\n");
  3871. return CL5_BAD_FORMAT;
  3872. }
  3873. len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), REPL_GET_DN_LEN(&op->target_address));
  3874. len += LDIF_SIZE_NEEDED(strlen(T_NEWRDNSTR), strlen(op->p.p_modrdn.modrdn_newrdn));
  3875. strDeleteOldRDN = (op->p.p_modrdn.modrdn_deloldrdn ? "true" : "false");
  3876. len += LDIF_SIZE_NEEDED(strlen(T_DRDNFLAGSTR),
  3877. strlen(strDeleteOldRDN));
  3878. if (REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address))
  3879. len += LDIF_SIZE_NEEDED(strlen(T_NEWSUPERIORDNSTR),
  3880. REPL_GET_DN_LEN(&op->p.p_modrdn.modrdn_newsuperior_address));
  3881. if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
  3882. len += LDIF_SIZE_NEEDED(strlen(T_NEWSUPERIORIDSTR),
  3883. strlen(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid));
  3884. l = make_changes_string(op->p.p_modrdn.modrdn_mods, NULL);
  3885. len += LDIF_SIZE_NEEDED(strlen(T_CHANGESTR), l->ls_len);
  3886. break;
  3887. case SLAPI_OPERATION_DELETE:
  3888. if (NULL == REPL_GET_DN(&op->target_address)) {
  3889. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3890. "_cl5Operation2LDIF - DELETE - target dn is NULL\n");
  3891. return CL5_BAD_FORMAT;
  3892. }
  3893. len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), REPL_GET_DN_LEN(&op->target_address));
  3894. break;
  3895. default:
  3896. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3897. "_cl5Operation2LDIF - Invalid operation type - %lu\n", op->operation_type);
  3898. return CL5_BAD_FORMAT;
  3899. }
  3900. /* allocate buffer */
  3901. buff = slapi_ch_malloc(len);
  3902. start = buff;
  3903. if (buff == NULL) {
  3904. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  3905. "_cl5Operation2LDIF: memory allocation failed\n");
  3906. return CL5_MEMORY_ERROR;
  3907. }
  3908. /* fill buffer */
  3909. slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGETYPESTR, (char *)strType, strlen(strType), 0);
  3910. slapi_ldif_put_type_and_value_with_options(&buff, T_REPLGEN, (char *)replGen, strlen(replGen), 0);
  3911. slapi_ldif_put_type_and_value_with_options(&buff, T_CSNSTR, (char *)strCSN, strlen(strCSN), 0);
  3912. slapi_ldif_put_type_and_value_with_options(&buff, T_UNIQUEIDSTR, op->target_address.uniqueid,
  3913. strlen(op->target_address.uniqueid), 0);
  3914. switch (op->operation_type) {
  3915. case SLAPI_OPERATION_ADD:
  3916. if (op->p.p_add.parentuniqueid)
  3917. slapi_ldif_put_type_and_value_with_options(&buff, T_PARENTIDSTR,
  3918. op->p.p_add.parentuniqueid, strlen(op->p.p_add.parentuniqueid), 0);
  3919. slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, rawDN, strlen(rawDN), 0);
  3920. slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGESTR, l->ls_buf, l->ls_len, 0);
  3921. slapi_ch_free((void **)&rawDN);
  3922. break;
  3923. case SLAPI_OPERATION_MODIFY:
  3924. slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, REPL_GET_DN(&op->target_address),
  3925. REPL_GET_DN_LEN(&op->target_address), 0);
  3926. slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGESTR, l->ls_buf, l->ls_len, 0);
  3927. break;
  3928. case SLAPI_OPERATION_MODRDN:
  3929. slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, REPL_GET_DN(&op->target_address),
  3930. REPL_GET_DN_LEN(&op->target_address), 0);
  3931. slapi_ldif_put_type_and_value_with_options(&buff, T_NEWRDNSTR, op->p.p_modrdn.modrdn_newrdn,
  3932. strlen(op->p.p_modrdn.modrdn_newrdn), 0);
  3933. slapi_ldif_put_type_and_value_with_options(&buff, T_DRDNFLAGSTR, strDeleteOldRDN,
  3934. strlen(strDeleteOldRDN), 0);
  3935. if (REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address))
  3936. slapi_ldif_put_type_and_value_with_options(&buff, T_NEWSUPERIORDNSTR,
  3937. REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address),
  3938. REPL_GET_DN_LEN(&op->p.p_modrdn.modrdn_newsuperior_address), 0);
  3939. if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
  3940. slapi_ldif_put_type_and_value_with_options(&buff, T_NEWSUPERIORIDSTR,
  3941. op->p.p_modrdn.modrdn_newsuperior_address.uniqueid,
  3942. strlen(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid), 0);
  3943. slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGESTR, l->ls_buf, l->ls_len, 0);
  3944. break;
  3945. case SLAPI_OPERATION_DELETE:
  3946. slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, REPL_GET_DN(&op->target_address),
  3947. REPL_GET_DN_LEN(&op->target_address), 0);
  3948. break;
  3949. }
  3950. *buff = '\n';
  3951. buff++;
  3952. *buff = '\0';
  3953. *ldifEntry = start;
  3954. *lenLDIF = buff - start;
  3955. if (l)
  3956. lenstr_free(&l);
  3957. return CL5_SUCCESS;
  3958. }
  3959. static int
  3960. _cl5LDIF2Operation(char *ldifEntry, slapi_operation_parameters *op, char **replGen)
  3961. {
  3962. int rc;
  3963. int rval = CL5_BAD_FORMAT;
  3964. char *next, *line;
  3965. struct berval type, value;
  3966. struct berval bv_null = {0, NULL};
  3967. int freeval = 0;
  3968. Slapi_Mods *mods;
  3969. char *rawDN = NULL;
  3970. char *ldifEntryWork = slapi_ch_strdup(ldifEntry);
  3971. PR_ASSERT(op && ldifEntry && replGen);
  3972. memset(op, 0, sizeof(*op));
  3973. next = ldifEntryWork;
  3974. while ((line = ldif_getline(&next)) != NULL) {
  3975. if (*line == '\n' || *line == '\0') {
  3976. break;
  3977. }
  3978. /* this call modifies ldifEntry */
  3979. type = bv_null;
  3980. value = bv_null;
  3981. rc = slapi_ldif_parse_line(line, &type, &value, &freeval);
  3982. if (rc != 0) {
  3983. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  3984. "_cl5LDIF2Operation - Failed to parse ldif line, moving on...\n");
  3985. continue;
  3986. }
  3987. if (strncasecmp(type.bv_val, T_CHANGETYPESTR,
  3988. strlen(T_CHANGETYPESTR) > type.bv_len ? strlen(T_CHANGETYPESTR) : type.bv_len) == 0) {
  3989. op->operation_type = _cl5Str2OperationType(value.bv_val);
  3990. } else if (strncasecmp(type.bv_val, T_REPLGEN, type.bv_len) == 0) {
  3991. *replGen = slapi_ch_strdup(value.bv_val);
  3992. } else if (strncasecmp(type.bv_val, T_CSNSTR, type.bv_len) == 0) {
  3993. op->csn = csn_new_by_string(value.bv_val);
  3994. } else if (strncasecmp(type.bv_val, T_UNIQUEIDSTR, type.bv_len) == 0) {
  3995. op->target_address.uniqueid = slapi_ch_strdup(value.bv_val);
  3996. } else if (strncasecmp(type.bv_val, T_DNSTR, type.bv_len) == 0) {
  3997. PR_ASSERT(op->operation_type);
  3998. if (op->operation_type == SLAPI_OPERATION_ADD) {
  3999. rawDN = slapi_ch_strdup(value.bv_val);
  4000. op->target_address.sdn = slapi_sdn_new_dn_byval(rawDN);
  4001. } else
  4002. op->target_address.sdn = slapi_sdn_new_dn_byval(value.bv_val);
  4003. } else if (strncasecmp(type.bv_val, T_PARENTIDSTR, type.bv_len) == 0) {
  4004. op->p.p_add.parentuniqueid = slapi_ch_strdup(value.bv_val);
  4005. } else if (strncasecmp(type.bv_val, T_NEWRDNSTR, type.bv_len) == 0) {
  4006. op->p.p_modrdn.modrdn_newrdn = slapi_ch_strdup(value.bv_val);
  4007. } else if (strncasecmp(type.bv_val, T_DRDNFLAGSTR, type.bv_len) == 0) {
  4008. op->p.p_modrdn.modrdn_deloldrdn = (strncasecmp(value.bv_val, "true", value.bv_len) ? PR_FALSE : PR_TRUE);
  4009. } else if (strncasecmp(type.bv_val, T_NEWSUPERIORDNSTR, type.bv_len) == 0) {
  4010. op->p.p_modrdn.modrdn_newsuperior_address.sdn = slapi_sdn_new_dn_byval(value.bv_val);
  4011. } else if (strncasecmp(type.bv_val, T_NEWSUPERIORIDSTR, type.bv_len) == 0) {
  4012. op->p.p_modrdn.modrdn_newsuperior_address.uniqueid = slapi_ch_strdup(value.bv_val);
  4013. } else if (strncasecmp(type.bv_val, T_CHANGESTR,
  4014. strlen(T_CHANGESTR) > type.bv_len ? strlen(T_CHANGESTR) : type.bv_len) == 0) {
  4015. PR_ASSERT(op->operation_type);
  4016. switch (op->operation_type) {
  4017. case SLAPI_OPERATION_ADD:
  4018. /*
  4019. * When it comes here, case T_DNSTR is already
  4020. * passed and rawDN is supposed to set.
  4021. * But it's a good idea to make sure it is
  4022. * not NULL.
  4023. */
  4024. if (NULL == rawDN) {
  4025. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4026. "_cl5LDIF2Operation - corrupted format "
  4027. "for operation type - %lu\n",
  4028. op->operation_type);
  4029. slapi_ch_free_string(&ldifEntryWork);
  4030. return CL5_BAD_FORMAT;
  4031. }
  4032. mods = parse_changes_string(value.bv_val);
  4033. PR_ASSERT(mods);
  4034. slapi_mods2entry(&(op->p.p_add.target_entry), rawDN,
  4035. slapi_mods_get_ldapmods_byref(mods));
  4036. slapi_ch_free((void **)&rawDN);
  4037. slapi_mods_free(&mods);
  4038. break;
  4039. case SLAPI_OPERATION_MODIFY:
  4040. mods = parse_changes_string(value.bv_val);
  4041. PR_ASSERT(mods);
  4042. op->p.p_modify.modify_mods = slapi_mods_get_ldapmods_passout(mods);
  4043. slapi_mods_free(&mods);
  4044. break;
  4045. case SLAPI_OPERATION_MODRDN:
  4046. mods = parse_changes_string(value.bv_val);
  4047. PR_ASSERT(mods);
  4048. op->p.p_modrdn.modrdn_mods = slapi_mods_get_ldapmods_passout(mods);
  4049. slapi_mods_free(&mods);
  4050. break;
  4051. default:
  4052. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4053. "_cl5LDIF2Operation - Invalid operation type - %lu\n",
  4054. op->operation_type);
  4055. if (freeval) {
  4056. slapi_ch_free_string(&value.bv_val);
  4057. }
  4058. slapi_ch_free_string(&ldifEntryWork);
  4059. return CL5_BAD_FORMAT;
  4060. }
  4061. }
  4062. if (freeval) {
  4063. slapi_ch_free_string(&value.bv_val);
  4064. }
  4065. }
  4066. if ((0 != strncmp(ldifEntryWork, "clpurgeruv", 10)) && /* skip RUV; */
  4067. (0 != strncmp(ldifEntryWork, "clmaxruv", 8))) /* RUV has NULL op */
  4068. {
  4069. if (IsValidOperation(op)) {
  4070. rval = CL5_SUCCESS;
  4071. } else {
  4072. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4073. "_cl5LDIF2Operation - Invalid data format\n");
  4074. }
  4075. }
  4076. slapi_ch_free_string(&ldifEntryWork);
  4077. return rval;
  4078. }
  4079. static int
  4080. _cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local __attribute__((unused)), void *txn)
  4081. {
  4082. int rc;
  4083. int cnt;
  4084. DBT key = {0};
  4085. DBT *data = NULL;
  4086. char csnStr[CSN_STRSIZE];
  4087. PRIntervalTime interval;
  4088. CL5Entry entry;
  4089. CL5DBFile *file = NULL;
  4090. Object *file_obj = NULL;
  4091. DB_TXN *txnid = NULL;
  4092. DB_TXN *parent_txnid = (DB_TXN *)txn;
  4093. rc = _cl5GetDBFileByReplicaName(replName, replGen, &file_obj);
  4094. if (rc == CL5_NOTFOUND) {
  4095. rc = _cl5DBOpenFileByReplicaName(replName, replGen, &file_obj,
  4096. PR_TRUE /* check for duplicates */);
  4097. if (rc != CL5_SUCCESS) {
  4098. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4099. "_cl5WriteOperationTxn - Failed to find or open DB object for replica %s\n", replName);
  4100. return rc;
  4101. }
  4102. } else if (rc != CL5_SUCCESS) {
  4103. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4104. "_cl5WriteOperationTxn - Failed to get db file for target dn (%s)",
  4105. REPL_GET_DN(&op->target_address));
  4106. return CL5_OBJSET_ERROR;
  4107. }
  4108. /* assign entry time - used for trimming */
  4109. entry.time = slapi_current_utc_time();
  4110. entry.op = (slapi_operation_parameters *)op;
  4111. /* construct the key */
  4112. key.data = csn_as_string(op->csn, PR_FALSE, csnStr);
  4113. key.size = CSN_STRSIZE;
  4114. /* construct the data */
  4115. data = (DBT *)slapi_ch_calloc(1, sizeof(DBT));
  4116. rc = _cl5Entry2DBData(&entry, (char **)&data->data, &data->size);
  4117. if (rc != CL5_SUCCESS) {
  4118. char s[CSN_STRSIZE];
  4119. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4120. "_cl5WriteOperationTxn - Failed to convert entry with csn (%s) "
  4121. "to db format\n",
  4122. csn_as_string(op->csn, PR_FALSE, s));
  4123. goto done;
  4124. }
  4125. file = (CL5DBFile *)object_get_data(file_obj);
  4126. PR_ASSERT(file);
  4127. /* if this is part of ldif2cl - just write the entry without transaction */
  4128. if (s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL) {
  4129. rc = file->db->put(file->db, NULL, &key, data, 0);
  4130. if (rc != 0) {
  4131. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4132. "_cl5WriteOperationTxn - Failed to write entry; db error - %d %s\n",
  4133. rc, db_strerror(rc));
  4134. if (CL5_OS_ERR_IS_DISKFULL(rc)) {
  4135. cl5_set_diskfull();
  4136. }
  4137. rc = CL5_DB_ERROR;
  4138. }
  4139. goto done;
  4140. }
  4141. /* write the entry */
  4142. rc = EAGAIN;
  4143. cnt = 0;
  4144. while ((rc == EAGAIN || rc == DB_LOCK_DEADLOCK) && cnt < MAX_TRIALS) {
  4145. if (cnt != 0) {
  4146. /* abort previous transaction */
  4147. rc = TXN_ABORT(txnid);
  4148. if (rc != 0) {
  4149. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4150. "_cl5WriteOperationTxn - Failed to abort transaction; db error - %d %s\n",
  4151. rc, db_strerror(rc));
  4152. rc = CL5_DB_ERROR;
  4153. goto done;
  4154. }
  4155. /* back off */
  4156. interval = PR_MillisecondsToInterval(slapi_rand() % 100);
  4157. DS_Sleep(interval);
  4158. }
  4159. /* begin transaction */
  4160. rc = TXN_BEGIN(s_cl5Desc.dbEnv, parent_txnid, &txnid, 0);
  4161. if (rc != 0) {
  4162. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4163. "_cl5WriteOperationTxn - Failed to start transaction; db error - %d %s\n",
  4164. rc, db_strerror(rc));
  4165. rc = CL5_DB_ERROR;
  4166. goto done;
  4167. }
  4168. rc = file->db->put(file->db, txnid, &key, data, 0);
  4169. if (CL5_OS_ERR_IS_DISKFULL(rc)) {
  4170. slapi_log_err(SLAPI_LOG_CRIT, repl_plugin_name_cl,
  4171. "_cl5WriteOperationTxn - Changelog (%s) DISK FULL; db error - %d %s\n",
  4172. s_cl5Desc.dbDir, rc, db_strerror(rc));
  4173. cl5_set_diskfull();
  4174. rc = CL5_DB_ERROR;
  4175. goto done;
  4176. }
  4177. if (cnt != 0) {
  4178. if (rc == 0) {
  4179. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "_cl5WriteOperationTxn - "
  4180. "retry (%d) the transaction (csn=%s) succeeded\n",
  4181. cnt, (char *)key.data);
  4182. } else if ((cnt + 1) >= MAX_TRIALS) {
  4183. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "_cl5WriteOperationTxn - "
  4184. "retry (%d) the transaction (csn=%s) failed (rc=%d (%s))\n",
  4185. cnt, (char *)key.data, rc, db_strerror(rc));
  4186. }
  4187. }
  4188. cnt++;
  4189. }
  4190. if (rc == 0) /* we successfully added entry */
  4191. {
  4192. rc = TXN_COMMIT(txnid);
  4193. } else {
  4194. char s[CSN_STRSIZE];
  4195. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4196. "_cl5WriteOperationTxn - Failed to write entry with csn (%s); "
  4197. "db error - %d %s\n",
  4198. csn_as_string(op->csn, PR_FALSE, s),
  4199. rc, db_strerror(rc));
  4200. rc = TXN_ABORT(txnid);
  4201. if (rc != 0) {
  4202. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4203. "_cl5WriteOperationTxn - Failed to abort transaction; db error - %d %s\n",
  4204. rc, db_strerror(rc));
  4205. }
  4206. rc = CL5_DB_ERROR;
  4207. goto done;
  4208. }
  4209. /* update entry count - we assume that all entries are new */
  4210. PR_AtomicIncrement(&file->entryCount);
  4211. /* update purge vector if we have not seen any changes from this replica before */
  4212. _cl5UpdateRUV(file_obj, op->csn, PR_TRUE, PR_TRUE);
  4213. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4214. "cl5WriteOperationTxn - Successfully written entry with csn (%s)\n", csnStr);
  4215. rc = CL5_SUCCESS;
  4216. done:
  4217. if (data->data)
  4218. slapi_ch_free(&(data->data));
  4219. slapi_ch_free((void **)&data);
  4220. if (file_obj)
  4221. object_release(file_obj);
  4222. return rc;
  4223. }
  4224. static int
  4225. _cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local)
  4226. {
  4227. return _cl5WriteOperationTxn(replName, replGen, op, local, NULL);
  4228. }
  4229. static int
  4230. _cl5GetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid)
  4231. {
  4232. int rc;
  4233. DBC *cursor = NULL;
  4234. DBT key = {0}, data = {0};
  4235. CL5Iterator *it;
  4236. CL5DBFile *file;
  4237. PR_ASSERT(obj && entry && iterator);
  4238. file = (CL5DBFile *)object_get_data(obj);
  4239. PR_ASSERT(file);
  4240. /* create cursor */
  4241. rc = file->db->cursor(file->db, txnid, &cursor, 0);
  4242. if (rc != 0) {
  4243. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4244. "_cl5GetFirstEntry - Failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
  4245. rc = CL5_DB_ERROR;
  4246. goto done;
  4247. }
  4248. key.flags = DB_DBT_MALLOC;
  4249. data.flags = DB_DBT_MALLOC;
  4250. while ((rc = cursor->c_get(cursor, &key, &data, DB_NEXT)) == 0) {
  4251. /* skip service entries */
  4252. if (cl5HelperEntry((char *)key.data, NULL)) {
  4253. slapi_ch_free(&(key.data));
  4254. slapi_ch_free(&(data.data));
  4255. continue;
  4256. }
  4257. /* format entry */
  4258. slapi_ch_free(&(key.data));
  4259. rc = cl5DBData2Entry(data.data, data.size, entry);
  4260. slapi_ch_free(&(data.data));
  4261. if (rc != 0) {
  4262. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4263. "_cl5GetFirstOperation - Failed to format entry: %d\n", rc);
  4264. goto done;
  4265. }
  4266. it = (CL5Iterator *)slapi_ch_malloc(sizeof(CL5Iterator));
  4267. it->cursor = cursor;
  4268. object_acquire(obj);
  4269. it->file = obj;
  4270. *(CL5Iterator **)iterator = it;
  4271. return CL5_SUCCESS;
  4272. }
  4273. /*
  4274. * Bug 430172 - memory leaks after db "get" deadlocks, e.g. in CL5 trim
  4275. * Even when db->c_get() does not return success, memory may have been
  4276. * allocated in the DBT. This seems to happen when DB_DBT_MALLOC was set,
  4277. * the data being retrieved is larger than the page size, and we got
  4278. * DB_LOCK_DEADLOCK. libdb allocates the memory and then finds itself
  4279. * deadlocked trying to go through the overflow page list. It returns
  4280. * DB_LOCK_DEADLOCK which we've assumed meant that no memory was allocated
  4281. * for the DBT.
  4282. *
  4283. * The following slapi_ch_free frees the memory only when the value is
  4284. * non NULL, which is true if the situation described above occurs.
  4285. */
  4286. slapi_ch_free((void **)&key.data);
  4287. slapi_ch_free((void **)&data.data);
  4288. /* walked of the end of the file */
  4289. if (rc == DB_NOTFOUND) {
  4290. rc = CL5_NOTFOUND;
  4291. goto done;
  4292. }
  4293. /* db error occured while iterating */
  4294. /* On this path, the condition "rc != 0" cannot be false */
  4295. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4296. "_cl5GetFirstEntry - Failed to get entry; db error - %d %s\n",
  4297. rc, db_strerror(rc));
  4298. rc = CL5_DB_ERROR;
  4299. done:
  4300. /* error occured */
  4301. /* We didn't success in assigning this cursor to the iterator,
  4302. * so we need to free the cursor here */
  4303. if (cursor)
  4304. cursor->c_close(cursor);
  4305. return rc;
  4306. }
  4307. static int
  4308. _cl5GetNextEntry(CL5Entry *entry, void *iterator)
  4309. {
  4310. int rc;
  4311. CL5Iterator *it;
  4312. DBT key = {0}, data = {0};
  4313. PR_ASSERT(entry && iterator);
  4314. it = (CL5Iterator *)iterator;
  4315. key.flags = DB_DBT_MALLOC;
  4316. data.flags = DB_DBT_MALLOC;
  4317. while ((rc = it->cursor->c_get(it->cursor, &key, &data, DB_NEXT)) == 0) {
  4318. if (cl5HelperEntry((char *)key.data, NULL)) {
  4319. slapi_ch_free(&(key.data));
  4320. slapi_ch_free(&(data.data));
  4321. continue;
  4322. }
  4323. slapi_ch_free(&(key.data));
  4324. /* format entry */
  4325. rc = cl5DBData2Entry(data.data, data.size, entry);
  4326. slapi_ch_free(&(data.data));
  4327. if (rc != 0) {
  4328. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4329. "_cl5GetNextEntry - Failed to format entry: %d\n", rc);
  4330. }
  4331. return rc;
  4332. }
  4333. /*
  4334. * Bug 430172 - memory leaks after db "get" deadlocks, e.g. in CL5 trim
  4335. * Even when db->c_get() does not return success, memory may have been
  4336. * allocated in the DBT. This seems to happen when DB_DBT_MALLOC was set,
  4337. * the data being retrieved is larger than the page size, and we got
  4338. * DB_LOCK_DEADLOCK. libdb allocates the memory and then finds itself
  4339. * deadlocked trying to go through the overflow page list. It returns
  4340. * DB_LOCK_DEADLOCK which we've assumed meant that no memory was allocated
  4341. * for the DBT.
  4342. *
  4343. * The following slapi_ch_free frees the memory only when the value is
  4344. * non NULL, which is true if the situation described above occurs.
  4345. */
  4346. slapi_ch_free((void **)&key.data);
  4347. slapi_ch_free((void **)&data.data);
  4348. /* walked of the end of the file or entry is out of range */
  4349. if (rc == 0 || rc == DB_NOTFOUND) {
  4350. return CL5_NOTFOUND;
  4351. }
  4352. /* cursor operation failed */
  4353. slapi_log_err(rc == CL5_DB_LOCK_ERROR ? SLAPI_LOG_REPL : SLAPI_LOG_ERR,
  4354. repl_plugin_name_cl,
  4355. "_cl5GetNextEntry - Failed to get entry; db error - %d %s\n",
  4356. rc, db_strerror(rc));
  4357. return rc;
  4358. }
  4359. static int
  4360. _cl5CurrentDeleteEntry(void *iterator)
  4361. {
  4362. int rc;
  4363. CL5Iterator *it;
  4364. CL5DBFile *file;
  4365. PR_ASSERT(iterator);
  4366. it = (CL5Iterator *)iterator;
  4367. rc = it->cursor->c_del(it->cursor, 0);
  4368. if (rc == 0) {
  4369. /* decrement entry count */
  4370. file = (CL5DBFile *)object_get_data(it->file);
  4371. PR_AtomicDecrement(&file->entryCount);
  4372. return CL5_SUCCESS;
  4373. } else {
  4374. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4375. "_cl5CurrentDeleteEntry - Failed, err=%d %s\n",
  4376. rc, db_strerror(rc));
  4377. /*
  4378. * We don't free(close) the cursor here, as the caller will free it by
  4379. * a call to cl5DestroyIterator. Freeing it here is a potential bug,
  4380. * as the cursor can't be referenced later once freed.
  4381. */
  4382. return rc;
  4383. }
  4384. }
  4385. static PRBool
  4386. _cl5IsValidIterator(const CL5Iterator *iterator)
  4387. {
  4388. return (iterator && iterator->cursor && iterator->file);
  4389. }
  4390. static int
  4391. _cl5GetOperation(Object *replica, slapi_operation_parameters *op)
  4392. {
  4393. int rc;
  4394. DBT key = {0}, data = {0};
  4395. CL5DBFile *file;
  4396. CL5Entry entry;
  4397. Object *obj = NULL;
  4398. char csnStr[CSN_STRSIZE];
  4399. rc = _cl5GetDBFile(replica, &obj);
  4400. if (rc != CL5_SUCCESS || !obj) {
  4401. goto done;
  4402. }
  4403. file = (CL5DBFile *)object_get_data(obj);
  4404. PR_ASSERT(file);
  4405. /* construct the key */
  4406. key.data = csn_as_string(op->csn, PR_FALSE, csnStr);
  4407. key.size = CSN_STRSIZE;
  4408. data.flags = DB_DBT_MALLOC;
  4409. rc = file->db->get(file->db, NULL /*txn*/, &key, &data, 0);
  4410. switch (rc) {
  4411. case 0:
  4412. entry.op = op;
  4413. /* Callers of this function should cl5_operation_parameters_done(op) */
  4414. rc = cl5DBData2Entry(data.data, data.size, &entry);
  4415. if (rc == CL5_SUCCESS) {
  4416. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4417. "_cl5GetOperation - Successfully retrieved operation with csn (%s)\n",
  4418. csnStr);
  4419. } else {
  4420. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4421. "_cl5GetOperation - Failed to convert db data to operation;"
  4422. " csn - %s\n",
  4423. csnStr);
  4424. }
  4425. goto done;
  4426. case DB_NOTFOUND:
  4427. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4428. "_cl5GetOperation - Operation for csn (%s) is not found in db that should contain dn (%s)\n",
  4429. csnStr, REPL_GET_DN(&op->target_address));
  4430. rc = CL5_NOTFOUND;
  4431. goto done;
  4432. default:
  4433. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4434. "_cl5GetOperation - Failed to get entry for csn (%s); "
  4435. "db error - %d %s\n",
  4436. csnStr, rc, db_strerror(rc));
  4437. rc = CL5_DB_ERROR;
  4438. goto done;
  4439. }
  4440. done:
  4441. if (obj)
  4442. object_release(obj);
  4443. slapi_ch_free(&(data.data));
  4444. return rc;
  4445. }
  4446. PRBool
  4447. cl5HelperEntry(const char *csnstr, CSN *csnp)
  4448. {
  4449. CSN *csn;
  4450. time_t csnTime;
  4451. PRBool retval = PR_FALSE;
  4452. if (csnp) {
  4453. csn = csnp;
  4454. } else {
  4455. csn = csn_new_by_string(csnstr);
  4456. }
  4457. if (csn == NULL) {
  4458. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4459. "cl5HelperEntry - Failed to get csn time; csn error\n");
  4460. return PR_FALSE;
  4461. }
  4462. csnTime = csn_get_time(csn);
  4463. if (csnTime == ENTRY_COUNT_TIME || csnTime == PURGE_RUV_TIME) {
  4464. retval = PR_TRUE;
  4465. }
  4466. if (NULL == csnp)
  4467. csn_free(&csn);
  4468. return retval;
  4469. }
  4470. #ifdef FOR_DEBUGGING
  4471. /* Replay iteration helper functions */
  4472. static PRBool
  4473. _cl5ValidReplayIterator(const CL5ReplayIterator *iterator)
  4474. {
  4475. if (iterator == NULL ||
  4476. iterator->consumerRuv == NULL || iterator->supplierRuvObj == NULL ||
  4477. iterator->fileObj == NULL)
  4478. return PR_FALSE;
  4479. return PR_TRUE;
  4480. }
  4481. #endif
  4482. /* Algorithm: ONREPL!!!
  4483. */
  4484. struct replica_hash_entry
  4485. {
  4486. ReplicaId rid; /* replica id */
  4487. PRBool sendChanges; /* indicates whether changes should be sent for this replica */
  4488. };
  4489. static int
  4490. _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Object *replica, Object *fileObj, CL5ReplayIterator **iterator, int *continue_on_missing)
  4491. {
  4492. CLC_Buffer *clcache = NULL;
  4493. CL5DBFile *file;
  4494. CSN *startCSN = NULL;
  4495. char csnStr[CSN_STRSIZE];
  4496. int rc = CL5_SUCCESS;
  4497. Object *supplierRuvObj = NULL;
  4498. RUV *supplierRuv = NULL;
  4499. PRBool haveChanges = PR_FALSE;
  4500. char *agmt_name;
  4501. PR_ASSERT(consumerRuv && replica && fileObj && iterator);
  4502. csnStr[0] = '\0';
  4503. file = (CL5DBFile *)object_get_data(fileObj);
  4504. /* get supplier's RUV */
  4505. supplierRuvObj = replica_get_ruv((Replica *)object_get_data(replica));
  4506. PR_ASSERT(supplierRuvObj);
  4507. if (!supplierRuvObj) {
  4508. rc = CL5_UNKNOWN_ERROR;
  4509. goto done;
  4510. }
  4511. supplierRuv = (RUV *)object_get_data(supplierRuvObj);
  4512. PR_ASSERT(supplierRuv);
  4513. agmt_name = get_thread_private_agmtname();
  4514. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4515. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5PositionCursorForReplay - (%s): Consumer RUV:\n", agmt_name);
  4516. ruv_dump(consumerRuv, agmt_name, NULL);
  4517. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5PositionCursorForReplay - (%s): Supplier RUV:\n", agmt_name);
  4518. ruv_dump(supplierRuv, agmt_name, NULL);
  4519. }
  4520. /* initialize the changelog buffer and do the initial load */
  4521. rc = clcache_get_buffer(&clcache, file->db, consumerRID, consumerRuv, supplierRuv);
  4522. if (rc != 0)
  4523. goto done;
  4524. rc = clcache_load_buffer(clcache, &startCSN, continue_on_missing);
  4525. if (rc == 0) {
  4526. haveChanges = PR_TRUE;
  4527. rc = CL5_SUCCESS;
  4528. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4529. csn_as_string(startCSN, PR_FALSE, csnStr);
  4530. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4531. "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
  4532. }
  4533. } else if (rc == DB_NOTFOUND) {
  4534. /* buffer not loaded.
  4535. * either because no changes have to be sent ==> startCSN is NULL
  4536. * or the calculated startCSN cannot be found in the changelog
  4537. */
  4538. if (startCSN == NULL) {
  4539. rc = CL5_NOTFOUND;
  4540. goto done;
  4541. }
  4542. /* check whether this csn should be present */
  4543. rc = _cl5CheckMissingCSN(startCSN, supplierRuv, file);
  4544. if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
  4545. {
  4546. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4547. csn_as_string(startCSN, PR_FALSE, csnStr);
  4548. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4549. "repl_plugin_name_cl - %s: CSN %s not found, seems to be missing\n", agmt_name, csnStr);
  4550. }
  4551. } else /* we are not as up to date or we purged */
  4552. {
  4553. csn_as_string(startCSN, PR_FALSE, csnStr);
  4554. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4555. "repl_plugin_name_cl - %s: CSN %s not found, we aren't as up to date, or we purged\n",
  4556. agmt_name, csnStr);
  4557. }
  4558. } else {
  4559. csn_as_string(startCSN, PR_FALSE, csnStr);
  4560. /* db error */
  4561. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4562. "repl_plugin_name_cl - %s: Failed to retrieve change with CSN %s; db error - %d %s\n",
  4563. agmt_name, csnStr, rc, db_strerror(rc));
  4564. rc = CL5_DB_ERROR;
  4565. }
  4566. /* setup the iterator */
  4567. if (haveChanges) {
  4568. *iterator = (CL5ReplayIterator *)slapi_ch_calloc(1, sizeof(CL5ReplayIterator));
  4569. if (*iterator == NULL) {
  4570. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4571. "_cl5PositionCursorForReplay - %s - Failed to allocate iterator\n", agmt_name);
  4572. rc = CL5_MEMORY_ERROR;
  4573. goto done;
  4574. }
  4575. /* ONREPL - should we make a copy of both RUVs here ?*/
  4576. (*iterator)->fileObj = fileObj;
  4577. (*iterator)->clcache = clcache;
  4578. clcache = NULL;
  4579. (*iterator)->consumerRID = consumerRID;
  4580. (*iterator)->consumerRuv = consumerRuv;
  4581. (*iterator)->supplierRuvObj = supplierRuvObj;
  4582. } else if (rc == CL5_SUCCESS) {
  4583. /* we have no changes to send */
  4584. rc = CL5_NOTFOUND;
  4585. }
  4586. done:
  4587. if (clcache)
  4588. clcache_return_buffer(&clcache);
  4589. if (rc != CL5_SUCCESS) {
  4590. if (supplierRuvObj)
  4591. object_release(supplierRuvObj);
  4592. }
  4593. return rc;
  4594. }
  4595. struct ruv_it
  4596. {
  4597. CSN **csns; /* csn list */
  4598. int alloc; /* allocated size */
  4599. int pos; /* position in the list */
  4600. };
  4601. static int
  4602. ruv_consumer_iterator(const ruv_enum_data *enum_data, void *arg)
  4603. {
  4604. struct ruv_it *data = (struct ruv_it *)arg;
  4605. PR_ASSERT(data);
  4606. /* check if we have space for one more element */
  4607. if (data->pos >= data->alloc - 2) {
  4608. data->alloc += 4;
  4609. data->csns = (CSN **)slapi_ch_realloc((void *)data->csns, data->alloc * sizeof(CSN *));
  4610. }
  4611. data->csns[data->pos] = csn_dup(enum_data->csn);
  4612. data->pos++;
  4613. return 0;
  4614. }
  4615. static int
  4616. ruv_supplier_iterator(const ruv_enum_data *enum_data, void *arg)
  4617. {
  4618. int i;
  4619. PRBool found = PR_FALSE;
  4620. ReplicaId rid;
  4621. struct ruv_it *data = (struct ruv_it *)arg;
  4622. PR_ASSERT(data);
  4623. rid = csn_get_replicaid(enum_data->min_csn);
  4624. /* check if the replica that generated the csn is already in the list */
  4625. for (i = 0; i < data->pos; i++) {
  4626. if (rid == csn_get_replicaid(data->csns[i])) {
  4627. found = PR_TRUE;
  4628. /* remove datacsn[i] if it is greater or equal to the supplier's maxcsn */
  4629. if (csn_compare(data->csns[i], enum_data->csn) >= 0) {
  4630. int j;
  4631. csn_free(&data->csns[i]);
  4632. for (j = i + 1; j < data->pos; j++) {
  4633. data->csns[j - 1] = data->csns[j];
  4634. }
  4635. data->pos--;
  4636. }
  4637. break;
  4638. }
  4639. }
  4640. if (!found) {
  4641. /* check if we have space for one more element */
  4642. if (data->pos >= data->alloc - 2) {
  4643. data->alloc += 4;
  4644. data->csns = (CSN **)slapi_ch_realloc((void *)data->csns,
  4645. data->alloc * sizeof(CSN *));
  4646. }
  4647. data->csns[data->pos] = csn_dup(enum_data->min_csn);
  4648. data->pos++;
  4649. }
  4650. return 0;
  4651. }
  4652. static int
  4653. my_csn_compare(const void *arg1, const void *arg2)
  4654. {
  4655. return (csn_compare(*((CSN **)arg1), *((CSN **)arg2)));
  4656. }
  4657. /* builds CSN ordered list of all csns in the RUV */
  4658. CSN **
  4659. cl5BuildCSNList(const RUV *consRuv, const RUV *supRuv)
  4660. {
  4661. struct ruv_it data;
  4662. int count, rc;
  4663. CSN **csns;
  4664. PR_ASSERT(consRuv);
  4665. count = ruv_replica_count(consRuv);
  4666. csns = (CSN **)slapi_ch_calloc(count + 1, sizeof(CSN *));
  4667. data.csns = csns;
  4668. data.alloc = count + 1;
  4669. data.pos = 0;
  4670. /* add consumer elements to the list */
  4671. rc = ruv_enumerate_elements(consRuv, ruv_consumer_iterator, &data);
  4672. if (rc == 0 && supRuv) {
  4673. /* add supplier elements to the list */
  4674. rc = ruv_enumerate_elements(supRuv, ruv_supplier_iterator, &data);
  4675. }
  4676. /* we have no csns */
  4677. if (data.csns[0] == NULL) {
  4678. /* csns might have been realloced in ruv_supplier_iterator() */
  4679. slapi_ch_free((void **)&data.csns);
  4680. csns = NULL;
  4681. } else {
  4682. csns = data.csns;
  4683. data.csns[data.pos] = NULL;
  4684. if (rc == 0) {
  4685. qsort(csns, data.pos, sizeof(CSN *), my_csn_compare);
  4686. } else {
  4687. cl5DestroyCSNList(&csns);
  4688. }
  4689. }
  4690. return csns;
  4691. }
  4692. void
  4693. cl5DestroyCSNList(CSN ***csns)
  4694. {
  4695. if (csns && *csns) {
  4696. int i;
  4697. for (i = 0; (*csns)[i]; i++) {
  4698. csn_free(&(*csns)[i]);
  4699. }
  4700. slapi_ch_free((void **)csns);
  4701. }
  4702. }
  4703. /* A csn should be in the changelog if it is larger than purge vector csn for the same
  4704. replica and is smaller than the csn in supplier's ruv for the same replica.
  4705. The functions returns
  4706. CL5_PURGED if data was purged from the changelog or was never logged
  4707. because it was loaded as part of replica initialization
  4708. CL5_MISSING if the data erouneously missing
  4709. CL5_SUCCESS if that has not and should not been seen by the server
  4710. */
  4711. static int
  4712. _cl5CheckMissingCSN(const CSN *csn, const RUV *supplierRuv, CL5DBFile *file)
  4713. {
  4714. ReplicaId rid;
  4715. CSN *supplierCsn = NULL;
  4716. CSN *purgeCsn = NULL;
  4717. int rc = CL5_SUCCESS;
  4718. char csnStr[CSN_STRSIZE];
  4719. PR_ASSERT(csn && supplierRuv && file);
  4720. rid = csn_get_replicaid(csn);
  4721. ruv_get_largest_csn_for_replica(supplierRuv, rid, &supplierCsn);
  4722. if (supplierCsn == NULL) {
  4723. /* we have not seen any changes from this replica so it is
  4724. ok not to have this csn */
  4725. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4726. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
  4727. "can't locate %s csn: we have not seen any changes for replica %d\n",
  4728. csn_as_string(csn, PR_FALSE, csnStr), rid);
  4729. }
  4730. return CL5_SUCCESS;
  4731. }
  4732. ruv_get_largest_csn_for_replica(file->purgeRUV, rid, &purgeCsn);
  4733. if (purgeCsn == NULL) {
  4734. /* changelog never contained any changes for this replica */
  4735. if (csn_compare(csn, supplierCsn) <= 0) {
  4736. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4737. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
  4738. "the change with %s csn was never logged because it was imported "
  4739. "during replica initialization\n",
  4740. csn_as_string(csn, PR_FALSE, csnStr));
  4741. }
  4742. rc = CL5_PURGED_DATA; /* XXXggood is that the correct return value? */
  4743. } else {
  4744. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4745. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
  4746. "change with %s csn has not yet been seen by this server; "
  4747. " last csn seen from that replica is %s\n",
  4748. csn_as_string(csn, PR_FALSE, csnStr),
  4749. csn_as_string(supplierCsn, PR_FALSE, csnStr));
  4750. }
  4751. rc = CL5_SUCCESS;
  4752. }
  4753. } else /* we have both purge and supplier csn */
  4754. {
  4755. if (csn_compare(csn, purgeCsn) < 0) /* the csn is below the purge point */
  4756. {
  4757. rc = CL5_PURGED_DATA;
  4758. } else {
  4759. if (csn_compare(csn, supplierCsn) <= 0) /* we should have the data but we don't */
  4760. {
  4761. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4762. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
  4763. "change with %s csn has been purged by this server; "
  4764. "the current purge point for that replica is %s\n",
  4765. csn_as_string(csn, PR_FALSE, csnStr),
  4766. csn_as_string(purgeCsn, PR_FALSE, csnStr));
  4767. }
  4768. rc = CL5_MISSING_DATA;
  4769. } else {
  4770. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  4771. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
  4772. "change with %s csn has not yet been seen by this server; "
  4773. " last csn seen from that replica is %s\n",
  4774. csn_as_string(csn, PR_FALSE, csnStr),
  4775. csn_as_string(supplierCsn, PR_FALSE, csnStr));
  4776. }
  4777. rc = CL5_SUCCESS;
  4778. }
  4779. }
  4780. }
  4781. if (supplierCsn)
  4782. csn_free(&supplierCsn);
  4783. if (purgeCsn)
  4784. csn_free(&purgeCsn);
  4785. return rc;
  4786. }
  4787. /* Helper functions that work with individual changelog files */
  4788. /* file name format : <replica name>_<replica generation>db{2,3,...} */
  4789. static PRBool
  4790. _cl5FileName2Replica(const char *file_name, Object **replica)
  4791. {
  4792. Replica *r;
  4793. char *repl_name, *file_gen, *repl_gen;
  4794. int len;
  4795. PR_ASSERT(file_name && replica);
  4796. *replica = NULL;
  4797. /* this is database file */
  4798. if (_cl5FileEndsWith(file_name, DB_EXTENSION) ||
  4799. _cl5FileEndsWith(file_name, DB_EXTENSION_DB4) ||
  4800. _cl5FileEndsWith(file_name, DB_EXTENSION_DB3)) {
  4801. repl_name = slapi_ch_strdup(file_name);
  4802. file_gen = strstr(repl_name, FILE_SEP);
  4803. if (file_gen) {
  4804. int extlen = strlen(DB_EXTENSION);
  4805. *file_gen = '\0';
  4806. file_gen += strlen(FILE_SEP);
  4807. len = strlen(file_gen);
  4808. if (len <= extlen + 1) {
  4809. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4810. "_cl5FileName2Replica - "
  4811. "Invalid file name (%s)\n",
  4812. file_name);
  4813. } else {
  4814. /* get rid of the file extension */
  4815. file_gen[len - extlen - 1] = '\0';
  4816. *replica = replica_get_by_name(repl_name);
  4817. if (*replica) {
  4818. /* check that generation matches the one in replica object */
  4819. r = (Replica *)object_get_data(*replica);
  4820. repl_gen = replica_get_generation(r);
  4821. PR_ASSERT(repl_gen);
  4822. if (strcmp(file_gen, repl_gen) != 0) {
  4823. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4824. "_cl5FileName2Replica - "
  4825. "Replica generation mismatch for replica at (%s), "
  4826. "file generation %s, new replica generation %s\n",
  4827. slapi_sdn_get_dn(replica_get_root(r)), file_gen, repl_gen);
  4828. object_release(*replica);
  4829. *replica = NULL;
  4830. }
  4831. slapi_ch_free((void **)&repl_gen);
  4832. }
  4833. }
  4834. slapi_ch_free((void **)&repl_name);
  4835. } else {
  4836. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica - "
  4837. "Malformed file name - %s\n",
  4838. file_name);
  4839. }
  4840. return PR_TRUE;
  4841. } else
  4842. return PR_FALSE;
  4843. }
  4844. /* file name format : <replica name>_<replica generation>db{2,3} */
  4845. static char *
  4846. _cl5Replica2FileName(Object *replica)
  4847. {
  4848. const char *replName;
  4849. char *replGen, *fileName;
  4850. Replica *r;
  4851. PR_ASSERT(replica);
  4852. r = (Replica *)object_get_data(replica);
  4853. PR_ASSERT(r);
  4854. replName = replica_get_name(r);
  4855. replGen = replica_get_generation(r);
  4856. fileName = _cl5MakeFileName(replName, replGen);
  4857. slapi_ch_free((void **)&replGen);
  4858. return fileName;
  4859. }
  4860. static char *
  4861. _cl5MakeFileName(const char *replName, const char *replGen)
  4862. {
  4863. char *fileName = slapi_ch_smprintf("%s/%s%s%s.%s",
  4864. s_cl5Desc.dbDir, replName,
  4865. FILE_SEP, replGen, DB_EXTENSION);
  4866. return fileName;
  4867. }
  4868. /* open file that corresponds to a particular database */
  4869. static int
  4870. _cl5DBOpenFile(Object *replica, Object **obj, PRBool checkDups)
  4871. {
  4872. int rc;
  4873. const char *replName;
  4874. char *replGen;
  4875. Replica *r;
  4876. PR_ASSERT(replica);
  4877. r = (Replica *)object_get_data(replica);
  4878. replName = replica_get_name(r);
  4879. PR_ASSERT(replName);
  4880. replGen = replica_get_generation(r);
  4881. PR_ASSERT(replGen);
  4882. rc = _cl5DBOpenFileByReplicaName(replName, replGen, obj, checkDups);
  4883. slapi_ch_free((void **)&replGen);
  4884. return rc;
  4885. }
  4886. static int
  4887. _cl5DBOpenFileByReplicaName(const char *replName, const char *replGen, Object **obj, PRBool checkDups)
  4888. {
  4889. int rc = CL5_SUCCESS;
  4890. Object *tmpObj;
  4891. CL5DBFile *file;
  4892. char *file_name;
  4893. PR_ASSERT(replName && replGen);
  4894. if (checkDups) {
  4895. PR_Lock(s_cl5Desc.fileLock);
  4896. file_name = _cl5MakeFileName(replName, replGen);
  4897. tmpObj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, file_name);
  4898. slapi_ch_free((void **)&file_name);
  4899. if (tmpObj) /* this file already exist */
  4900. {
  4901. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4902. "_cl5DBOpenFileByReplicaName - Found DB object %p for replica %s\n", tmpObj, replName);
  4903. /* if we were asked for file handle - keep the handle */
  4904. if (obj) {
  4905. *obj = tmpObj;
  4906. } else {
  4907. object_release(tmpObj);
  4908. }
  4909. rc = CL5_SUCCESS;
  4910. goto done;
  4911. }
  4912. }
  4913. rc = _cl5NewDBFile(replName, replGen, &file);
  4914. if (rc == CL5_SUCCESS) {
  4915. /* This creates the file but doesn't set the init flag
  4916. * The flag is set later when the purge and max ruvs are set.
  4917. * This is to prevent some thread to get file access before the
  4918. * structure is fully initialized */
  4919. rc = _cl5AddDBFile(file, &tmpObj);
  4920. if (rc == CL5_SUCCESS) {
  4921. /* read purge RUV - done here because it needs file object rather than file pointer */
  4922. rc = _cl5ReadRUV(replGen, tmpObj, PR_TRUE);
  4923. if (rc != CL5_SUCCESS) {
  4924. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4925. "_cl5DBOpenFileByReplicaName - Failed to get purge RUV\n");
  4926. goto done;
  4927. }
  4928. /* read ruv that represents the upper bound of the changes stored in the file */
  4929. rc = _cl5ReadRUV(replGen, tmpObj, PR_FALSE);
  4930. if (rc != CL5_SUCCESS) {
  4931. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4932. "_cl5DBOpenFileByReplicaName - Failed to get upper bound RUV\n");
  4933. goto done;
  4934. }
  4935. /* Mark the DB File initialize */
  4936. _cl5DBFileInitialized(tmpObj);
  4937. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4938. "_cl5DBOpenFileByReplicaName - Created new DB object %p\n", tmpObj);
  4939. if (obj) {
  4940. *obj = tmpObj;
  4941. } else {
  4942. object_release(tmpObj);
  4943. }
  4944. }
  4945. }
  4946. done:;
  4947. if (rc != CL5_SUCCESS) {
  4948. if (file)
  4949. _cl5DBCloseFile((void **)&file);
  4950. }
  4951. if (checkDups) {
  4952. PR_Unlock(s_cl5Desc.fileLock);
  4953. }
  4954. return rc;
  4955. }
  4956. /* adds file to the db file list */
  4957. static int
  4958. _cl5AddDBFile(CL5DBFile *file, Object **obj)
  4959. {
  4960. int rc;
  4961. Object *tmpObj;
  4962. PR_ASSERT(file);
  4963. tmpObj = object_new(file, _cl5DBCloseFile);
  4964. rc = objset_add_obj(s_cl5Desc.dbFiles, tmpObj);
  4965. if (rc != OBJSET_SUCCESS) {
  4966. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4967. "_cl5AddDBFile - Failed to add db file to the list; "
  4968. "repl_objset error - %d\n",
  4969. rc);
  4970. object_release(tmpObj);
  4971. return CL5_OBJSET_ERROR;
  4972. } else {
  4973. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  4974. "_cl5AddDBFile - Added new DB object %p\n", tmpObj);
  4975. }
  4976. if (obj) {
  4977. *obj = tmpObj;
  4978. } else
  4979. object_release(tmpObj);
  4980. return CL5_SUCCESS;
  4981. }
  4982. static int
  4983. _cl5NewDBFile(const char *replName, const char *replGen, CL5DBFile **dbFile)
  4984. {
  4985. int rc;
  4986. DB *db = NULL;
  4987. char *name;
  4988. #ifdef HPUX
  4989. char cwd[PATH_MAX + 1];
  4990. #endif
  4991. PR_ASSERT(replName && replGen && dbFile);
  4992. if (!dbFile) {
  4993. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  4994. "_cl5NewDBFile - NULL dbFile\n");
  4995. return CL5_UNKNOWN_ERROR;
  4996. }
  4997. (*dbFile) = (CL5DBFile *)slapi_ch_calloc(1, sizeof(CL5DBFile));
  4998. if (*dbFile == NULL) {
  4999. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5000. "_cl5NewDBFile - memory allocation failed\n");
  5001. return CL5_MEMORY_ERROR;
  5002. }
  5003. name = _cl5MakeFileName(replName, replGen);
  5004. {
  5005. /* The subname argument allows applications to have
  5006. * subdatabases, i.e., multiple databases inside of a single
  5007. * physical file. This is useful when the logical databases
  5008. * are both numerous and reasonably small, in order to
  5009. * avoid creating a large number of underlying files.
  5010. */
  5011. char *subname = NULL;
  5012. DB_ENV *dbEnv = s_cl5Desc.dbEnv;
  5013. rc = db_create(&db, dbEnv, 0);
  5014. if (0 != rc) {
  5015. goto out;
  5016. }
  5017. rc = db->set_pagesize(
  5018. db,
  5019. s_cl5Desc.dbConfig.pageSize);
  5020. if (0 != rc) {
  5021. goto out;
  5022. }
  5023. DB_OPEN(s_cl5Desc.dbEnvOpenFlags,
  5024. db, NULL /* txnid */, name, subname, DB_BTREE,
  5025. DB_CREATE | DB_THREAD, s_cl5Desc.dbConfig.fileMode, rc);
  5026. }
  5027. out:
  5028. if (rc != 0) {
  5029. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5030. "_cl5NewDBFile - db_open failed; db error - %d %s\n",
  5031. rc, db_strerror(rc));
  5032. rc = CL5_DB_ERROR;
  5033. goto done;
  5034. }
  5035. (*dbFile)->db = db;
  5036. (*dbFile)->name = name;
  5037. name = NULL; /* transfer ownership to dbFile struct */
  5038. (*dbFile)->replName = slapi_ch_strdup(replName);
  5039. (*dbFile)->replGen = slapi_ch_strdup(replGen);
  5040. /* compute number of entries in the file */
  5041. /* ONREPL - to improve performance, we keep entry count in memory
  5042. and write it down during shutdown. Problem: this will not
  5043. work with multiple processes. Do we have to worry about that?
  5044. */
  5045. if (s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL) {
  5046. rc = _cl5GetEntryCount(*dbFile);
  5047. if (rc != CL5_SUCCESS) {
  5048. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  5049. "_cl5NewDBFile - Failed to get entry count\n");
  5050. goto done;
  5051. }
  5052. }
  5053. done:
  5054. if (rc != CL5_SUCCESS) {
  5055. _cl5DBCloseFile((void **)dbFile);
  5056. /* slapi_ch_free accepts NULL pointer */
  5057. slapi_ch_free((void **)&name);
  5058. slapi_ch_free((void **)dbFile);
  5059. }
  5060. return rc;
  5061. }
  5062. static void
  5063. _cl5DBCloseFile(void **data)
  5064. {
  5065. CL5DBFile *file;
  5066. int rc = 0;
  5067. PR_ASSERT(data);
  5068. file = *(CL5DBFile **)data;
  5069. PR_ASSERT(file);
  5070. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
  5071. "Closing database %s\n",
  5072. file->name);
  5073. /* close the file */
  5074. /* if this is normal close or close after import, update entry count */
  5075. if ((s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL && s_cl5Desc.dbState == CL5_STATE_CLOSING) ||
  5076. s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL) {
  5077. _cl5WriteEntryCount(file);
  5078. _cl5WriteRUV(file, PR_TRUE);
  5079. _cl5WriteRUV(file, PR_FALSE);
  5080. }
  5081. /* close the db */
  5082. if (file->db) {
  5083. rc = file->db->close(file->db, 0);
  5084. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  5085. "_cl5DBCloseFile - "
  5086. "Closed the changelog database handle for %s "
  5087. "(rc: %d)\n",
  5088. file->name, rc);
  5089. file->db = NULL;
  5090. }
  5091. if (file->flags & DB_FILE_DELETED) {
  5092. /* We need to use the libdb API to delete the files, otherwise we'll
  5093. * run into problems when we try to checkpoint transactions later. */
  5094. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
  5095. "removing the changelog %s (flag %d)\n",
  5096. file->name, DEFAULT_DB_ENV_OP_FLAGS);
  5097. rc = s_cl5Desc.dbEnv->dbremove(s_cl5Desc.dbEnv, 0, file->name, 0,
  5098. DEFAULT_DB_ENV_OP_FLAGS);
  5099. if (rc != 0) {
  5100. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
  5101. "failed to remove (%s) file; libdb error - %d (%s)\n",
  5102. file->name, rc, db_strerror(rc));
  5103. } else {
  5104. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
  5105. "Deleted the changelog database file %s\n",
  5106. file->name);
  5107. }
  5108. }
  5109. /* slapi_ch_free accepts NULL pointer */
  5110. slapi_ch_free((void **)&file->name);
  5111. slapi_ch_free((void **)&file->replName);
  5112. slapi_ch_free((void **)&file->replGen);
  5113. ruv_destroy(&file->maxRUV);
  5114. ruv_destroy(&file->purgeRUV);
  5115. file->db = NULL;
  5116. slapi_ch_free(data);
  5117. }
  5118. static int
  5119. _cl5GetDBFile(Object *replica, Object **obj)
  5120. {
  5121. char *fileName;
  5122. PR_ASSERT(replica && obj);
  5123. fileName = _cl5Replica2FileName(replica);
  5124. *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
  5125. if (*obj) {
  5126. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFile - "
  5127. "found DB object %p for database %s\n",
  5128. *obj, fileName);
  5129. slapi_ch_free_string(&fileName);
  5130. return CL5_SUCCESS;
  5131. } else {
  5132. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFile - "
  5133. "no DB object found for database %s\n",
  5134. fileName);
  5135. slapi_ch_free_string(&fileName);
  5136. return CL5_NOTFOUND;
  5137. }
  5138. }
  5139. static int
  5140. _cl5GetDBFileByReplicaName(const char *replName, const char *replGen, Object **obj)
  5141. {
  5142. char *fileName;
  5143. PR_ASSERT(replName && replGen && obj);
  5144. fileName = _cl5MakeFileName(replName, replGen);
  5145. *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
  5146. if (*obj) {
  5147. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFileByReplicaName - "
  5148. "found DB object %p for database %s\n",
  5149. *obj, fileName);
  5150. slapi_ch_free_string(&fileName);
  5151. return CL5_SUCCESS;
  5152. } else {
  5153. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFileByReplicaName - "
  5154. "no DB object found for database %s\n",
  5155. fileName);
  5156. slapi_ch_free_string(&fileName);
  5157. return CL5_NOTFOUND;
  5158. }
  5159. }
  5160. static void
  5161. _cl5DBDeleteFile(Object *obj)
  5162. {
  5163. CL5DBFile *file;
  5164. int rc = 0;
  5165. PR_ASSERT(obj);
  5166. file = (CL5DBFile *)object_get_data(obj);
  5167. PR_ASSERT(file);
  5168. file->flags |= DB_FILE_DELETED;
  5169. rc = objset_remove_obj(s_cl5Desc.dbFiles, obj);
  5170. if (rc != OBJSET_SUCCESS) {
  5171. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBDeleteFile - "
  5172. "could not find DB object %p\n",
  5173. obj);
  5174. } else {
  5175. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBDeleteFile - "
  5176. "removed DB object %p\n",
  5177. obj);
  5178. }
  5179. object_release(obj);
  5180. }
  5181. static void
  5182. _cl5DBFileInitialized(Object *obj)
  5183. {
  5184. CL5DBFile *file;
  5185. PR_ASSERT(obj);
  5186. file = (CL5DBFile *)object_get_data(obj);
  5187. PR_ASSERT(file);
  5188. file->flags |= DB_FILE_INIT;
  5189. }
  5190. static int
  5191. _cl5CompareDBFile(Object *el1, const void *el2)
  5192. {
  5193. CL5DBFile *file;
  5194. const char *name;
  5195. PR_ASSERT(el1 && el2);
  5196. file = (CL5DBFile *)object_get_data(el1);
  5197. name = (const char *)el2;
  5198. return ((file->flags & DB_FILE_INIT) ? strcmp(file->name, name) : 1);
  5199. }
  5200. /*
  5201. * return 1: true (the "filename" ends with "ext")
  5202. * return 0: false
  5203. */
  5204. static int
  5205. _cl5FileEndsWith(const char *filename, const char *ext)
  5206. {
  5207. char *p = NULL;
  5208. int flen = strlen(filename);
  5209. int elen = strlen(ext);
  5210. if (0 == flen || 0 == elen) {
  5211. return 0;
  5212. }
  5213. p = PL_strrstr(filename, ext);
  5214. if (NULL == p) {
  5215. return 0;
  5216. }
  5217. if (p - filename + elen == flen) {
  5218. return 1;
  5219. }
  5220. return 0;
  5221. }
  5222. static int
  5223. _cl5ExportFile(PRFileDesc *prFile, Object *obj)
  5224. {
  5225. int rc;
  5226. void *iterator = NULL;
  5227. slapi_operation_parameters op = {0};
  5228. char *buff;
  5229. PRInt32 len, wlen;
  5230. CL5Entry entry;
  5231. CL5DBFile *file;
  5232. PR_ASSERT(prFile && obj);
  5233. file = (CL5DBFile *)object_get_data(obj);
  5234. PR_ASSERT(file);
  5235. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  5236. ruv_dump(file->purgeRUV, "clpurgeruv", prFile);
  5237. ruv_dump(file->maxRUV, "clmaxruv", prFile);
  5238. }
  5239. slapi_write_buffer(prFile, "\n", strlen("\n"));
  5240. entry.op = &op;
  5241. rc = _cl5GetFirstEntry(obj, &entry, &iterator, NULL);
  5242. while (rc == CL5_SUCCESS) {
  5243. rc = _cl5Operation2LDIF(&op, file->replGen, &buff, &len);
  5244. if (rc != CL5_SUCCESS) {
  5245. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5246. "_cl5ExportFile - Failed to convert operation to ldif\n");
  5247. operation_parameters_done(&op);
  5248. break;
  5249. }
  5250. wlen = slapi_write_buffer(prFile, buff, len);
  5251. slapi_ch_free((void **)&buff);
  5252. if (wlen < len) {
  5253. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5254. "_cl5ExportFile - Failed to write to ldif file\n");
  5255. rc = CL5_SYSTEM_ERROR;
  5256. operation_parameters_done(&op);
  5257. break;
  5258. }
  5259. cl5_operation_parameters_done(&op);
  5260. rc = _cl5GetNextEntry(&entry, iterator);
  5261. }
  5262. cl5_operation_parameters_done(&op);
  5263. if (iterator)
  5264. cl5DestroyIterator(iterator);
  5265. if (rc != CL5_NOTFOUND) {
  5266. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5267. "_cl5ExportFile - Failed to retrieve changelog entry\n");
  5268. } else {
  5269. rc = CL5_SUCCESS;
  5270. }
  5271. return rc;
  5272. }
  5273. static PRBool
  5274. _cl5ReplicaInList(Object *replica, Object **replicas)
  5275. {
  5276. int i;
  5277. PR_ASSERT(replica && replicas);
  5278. /* ONREPL I think it should be sufficient to just compare replica pointers */
  5279. for (i = 0; replicas[i]; i++) {
  5280. if (replica == replicas[i])
  5281. return PR_TRUE;
  5282. }
  5283. return PR_FALSE;
  5284. }
  5285. static char *
  5286. _cl5GetHelperEntryKey(int type, char *csnStr)
  5287. {
  5288. CSN *csn = csn_new();
  5289. char *rt;
  5290. csn_set_time(csn, (time_t)type);
  5291. csn_set_replicaid(csn, 0);
  5292. rt = csn_as_string(csn, PR_FALSE, csnStr);
  5293. csn_free(&csn);
  5294. return rt;
  5295. }
  5296. static Object *
  5297. _cl5GetReplica(const slapi_operation_parameters *op, const char *replGen)
  5298. {
  5299. Slapi_DN *sdn;
  5300. Object *replObj;
  5301. Replica *replica;
  5302. char *newGen;
  5303. PR_ASSERT(op && replGen);
  5304. sdn = op->target_address.sdn;
  5305. replObj = replica_get_replica_from_dn(sdn);
  5306. if (replObj) {
  5307. /* check to see if replica generation has not change */
  5308. replica = (Replica *)object_get_data(replObj);
  5309. PR_ASSERT(replica);
  5310. newGen = replica_get_generation(replica);
  5311. PR_ASSERT(newGen);
  5312. if (strcmp(replGen, newGen) != 0) {
  5313. object_release(replObj);
  5314. replObj = NULL;
  5315. }
  5316. slapi_ch_free((void **)&newGen);
  5317. }
  5318. return replObj;
  5319. }
  5320. int
  5321. cl5_is_diskfull()
  5322. {
  5323. int rc;
  5324. PR_Lock(cl5_diskfull_lock);
  5325. rc = cl5_diskfull_flag;
  5326. PR_Unlock(cl5_diskfull_lock);
  5327. return rc;
  5328. }
  5329. static void
  5330. cl5_set_diskfull(void)
  5331. {
  5332. PR_Lock(cl5_diskfull_lock);
  5333. cl5_diskfull_flag = 1;
  5334. PR_Unlock(cl5_diskfull_lock);
  5335. }
  5336. static void
  5337. cl5_set_no_diskfull(void)
  5338. {
  5339. PR_Lock(cl5_diskfull_lock);
  5340. cl5_diskfull_flag = 0;
  5341. PR_Unlock(cl5_diskfull_lock);
  5342. }
  5343. int
  5344. cl5_diskspace_is_available()
  5345. {
  5346. int rval = 1;
  5347. #if defined(OS_solaris) || defined(hpux)
  5348. struct statvfs fsbuf;
  5349. if (statvfs(s_cl5Desc.dbDir, &fsbuf) < 0) {
  5350. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5351. "cl5_diskspace_is_available - Cannot get file system info\n");
  5352. rval = 0;
  5353. } else {
  5354. unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_frsize;
  5355. if (fsiz < NO_DISK_SPACE) {
  5356. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5357. "cl5_diskspace_is_available - No enough diskspace for changelog: (%u bytes free)\n", fsiz);
  5358. rval = 0;
  5359. } else if (fsiz > MIN_DISK_SPACE) {
  5360. /* assume recovered */
  5361. cl5_set_no_diskfull();
  5362. }
  5363. }
  5364. #endif
  5365. #if defined(linux)
  5366. struct statfs fsbuf;
  5367. if (statfs(s_cl5Desc.dbDir, &fsbuf) < 0) {
  5368. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5369. "cl5_diskspace_is_available - Cannot get file system info\n");
  5370. rval = 0;
  5371. } else {
  5372. unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_bsize;
  5373. if (fsiz < NO_DISK_SPACE) {
  5374. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5375. "cl5_diskspace_is_available - No enough diskspace for changelog: (%lu bytes free)\n", fsiz);
  5376. rval = 0;
  5377. } else if (fsiz > MIN_DISK_SPACE) {
  5378. /* assume recovered */
  5379. cl5_set_no_diskfull();
  5380. }
  5381. }
  5382. #endif
  5383. return rval;
  5384. }
  5385. int
  5386. cl5DbDirIsEmpty(const char *dir)
  5387. {
  5388. PRDir *prDir;
  5389. PRDirEntry *prDirEntry;
  5390. int isempty = 1;
  5391. if (!dir || !*dir) {
  5392. return isempty;
  5393. }
  5394. /* assume failure means it does not exist - other failure
  5395. cases will be handled by code which attempts to create the
  5396. db in this directory */
  5397. if (PR_Access(dir, PR_ACCESS_EXISTS)) {
  5398. return isempty;
  5399. }
  5400. prDir = PR_OpenDir(dir);
  5401. if (prDir == NULL) {
  5402. return isempty; /* assume failure means does not exist */
  5403. }
  5404. while (NULL != (prDirEntry = PR_ReadDir(prDir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
  5405. if (NULL == prDirEntry->name) { /* NSPR doesn't behave like the docs say it should */
  5406. break;
  5407. }
  5408. isempty = 0; /* found at least one "real" file */
  5409. break;
  5410. }
  5411. PR_CloseDir(prDir);
  5412. return isempty;
  5413. }
  5414. /*
  5415. * Write RUVs into the changelog;
  5416. * implemented for backup to make sure the backed up changelog contains RUVs
  5417. * Return values: 0 -- success
  5418. * 1 -- failure
  5419. */
  5420. int
  5421. cl5WriteRUV()
  5422. {
  5423. int rc = 0;
  5424. Object *file_obj = NULL;
  5425. CL5DBFile *dbfile = NULL;
  5426. int closeit = 0;
  5427. int slapd_pid = 0;
  5428. changelog5Config config;
  5429. /* read changelog configuration */
  5430. changelog5_read_config(&config);
  5431. if (config.dir == NULL) {
  5432. /* Changelog is not configured; Replication is not enabled.
  5433. * we don't have to update RUVs.
  5434. * bail out - return success */
  5435. goto bail;
  5436. }
  5437. slapd_pid = is_slapd_running();
  5438. if (slapd_pid <= 0) {
  5439. /* I'm not a server, rather a utility.
  5440. * And the server is NOT running.
  5441. * RUVs should be in the changelog.
  5442. * we don't have to update RUVs.
  5443. * bail out - return success */
  5444. goto bail;
  5445. }
  5446. if (getpid() != slapd_pid) {
  5447. /* I'm not a server, rather a utility.
  5448. * And the server IS running.
  5449. * RUVs are not in the changelog and no easy way to retrieve them.
  5450. * bail out - return failure */
  5451. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5452. "cl5WriteRUV - server (pid %d) is already running; bail.\n",
  5453. slapd_pid);
  5454. rc = 1;
  5455. goto bail;
  5456. }
  5457. /* file is stored in the changelog directory and is named
  5458. * <replica name>.ldif */
  5459. if (CL5_STATE_OPEN != s_cl5Desc.dbState) {
  5460. rc = _cl5Open(config.dir, &config.dbconfig, CL5_OPEN_NORMAL);
  5461. if (rc != CL5_SUCCESS) {
  5462. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5463. "cl5WriteRUV - Failed to open changelog\n");
  5464. goto bail;
  5465. }
  5466. s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */
  5467. closeit = 1; /* It had not been opened; close it */
  5468. }
  5469. file_obj = objset_first_obj(s_cl5Desc.dbFiles);
  5470. while (file_obj) {
  5471. dbfile = (CL5DBFile *)object_get_data(file_obj);
  5472. if (dbfile) {
  5473. _cl5WriteEntryCount(dbfile);
  5474. _cl5WriteRUV(dbfile, PR_TRUE);
  5475. _cl5WriteRUV(dbfile, PR_FALSE);
  5476. }
  5477. file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);
  5478. }
  5479. bail:
  5480. if (closeit && (CL5_STATE_OPEN == s_cl5Desc.dbState)) {
  5481. _cl5Close();
  5482. s_cl5Desc.dbState = CL5_STATE_CLOSED; /* force to change the state */
  5483. }
  5484. changelog5_config_done(&config);
  5485. return rc;
  5486. }
  5487. /*
  5488. * Delete RUVs from the changelog;
  5489. * implemented for backup to clean up RUVs
  5490. * Return values: 0 -- success
  5491. * 1 -- failure
  5492. */
  5493. int
  5494. cl5DeleteRUV()
  5495. {
  5496. int rc = 0;
  5497. Object *file_obj = NULL;
  5498. CL5DBFile *dbfile = NULL;
  5499. int slapd_pid = 0;
  5500. int closeit = 0;
  5501. changelog5Config config;
  5502. /* read changelog configuration */
  5503. changelog5_read_config(&config);
  5504. if (config.dir == NULL) {
  5505. /* Changelog is not configured; Replication is not enabled.
  5506. * we don't have to update RUVs.
  5507. * bail out - return success */
  5508. goto bail;
  5509. }
  5510. slapd_pid = is_slapd_running();
  5511. if (slapd_pid <= 0) {
  5512. /* I'm not a server, rather a utility.
  5513. * And the server is NOT running.
  5514. * RUVs should be in the changelog.
  5515. * we don't have to update RUVs.
  5516. * bail out - return success */
  5517. goto bail;
  5518. }
  5519. if (getpid() != slapd_pid) {
  5520. /* I'm not a server, rather a utility.
  5521. * And the server IS running.
  5522. * RUVs are not in the changelog.
  5523. * bail out - return success */
  5524. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5525. "cl5DeleteRUV - server (pid %d) is already running; bail.\n",
  5526. slapd_pid);
  5527. goto bail;
  5528. }
  5529. /* file is stored in the changelog directory and is named
  5530. * <replica name>.ldif */
  5531. if (CL5_STATE_OPEN != s_cl5Desc.dbState) {
  5532. rc = _cl5Open(config.dir, &config.dbconfig, CL5_OPEN_NORMAL);
  5533. if (rc != CL5_SUCCESS) {
  5534. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5535. "cl5DeleteRUV - Failed to open changelog\n");
  5536. goto bail;
  5537. }
  5538. s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */
  5539. closeit = 1; /* It had been opened; no need to close */
  5540. }
  5541. file_obj = objset_first_obj(s_cl5Desc.dbFiles);
  5542. while (file_obj) {
  5543. dbfile = (CL5DBFile *)object_get_data(file_obj);
  5544. /* _cl5GetEntryCount deletes entry count after reading it */
  5545. rc = _cl5GetEntryCount(dbfile);
  5546. if (rc != CL5_SUCCESS) {
  5547. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  5548. "cl5DeleteRUV - Failed to get/delete entry count\n");
  5549. goto bail;
  5550. }
  5551. /* _cl5ReadRUV deletes RUV after reading it */
  5552. rc = _cl5ReadRUV(dbfile->replGen, file_obj, PR_TRUE);
  5553. if (rc != CL5_SUCCESS) {
  5554. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5555. "cl5DeleteRUV - Failed to read/delete purge RUV\n");
  5556. goto bail;
  5557. }
  5558. rc = _cl5ReadRUV(dbfile->replGen, file_obj, PR_FALSE);
  5559. if (rc != CL5_SUCCESS) {
  5560. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5561. "cl5DeleteRUV - Failed to read/delete upper bound RUV\n");
  5562. goto bail;
  5563. }
  5564. file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);
  5565. }
  5566. bail:
  5567. if (file_obj) {
  5568. object_release(file_obj);
  5569. }
  5570. if (closeit && (CL5_STATE_OPEN == s_cl5Desc.dbState)) {
  5571. _cl5Close();
  5572. s_cl5Desc.dbState = CL5_STATE_CLOSED; /* force to change the state */
  5573. }
  5574. changelog5_config_done(&config);
  5575. return rc;
  5576. }
  5577. /*
  5578. * Clean the in memory RUV, at shutdown we will write the update to the db
  5579. */
  5580. void
  5581. cl5CleanRUV(ReplicaId rid)
  5582. {
  5583. CL5DBFile *file;
  5584. Object *obj = NULL;
  5585. slapi_rwlock_wrlock(s_cl5Desc.stLock);
  5586. obj = objset_first_obj(s_cl5Desc.dbFiles);
  5587. while (obj) {
  5588. file = (CL5DBFile *)object_get_data(obj);
  5589. ruv_delete_replica(file->purgeRUV, rid);
  5590. ruv_delete_replica(file->maxRUV, rid);
  5591. obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
  5592. }
  5593. slapi_rwlock_unlock(s_cl5Desc.stLock);
  5594. }
  5595. static void
  5596. free_purge_data(cleanruv_purge_data *purge_data)
  5597. {
  5598. slapi_ch_free_string(&purge_data->replGen);
  5599. slapi_ch_free((void **)&purge_data);
  5600. }
  5601. /*
  5602. * Create a thread to purge a changelog of cleaned RIDs
  5603. */
  5604. void
  5605. trigger_cl_purging(cleanruv_purge_data *purge_data)
  5606. {
  5607. PRThread *trim_tid = NULL;
  5608. trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void *)trigger_cl_purging_thread,
  5609. (void *)purge_data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  5610. PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
  5611. if (NULL == trim_tid) {
  5612. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5613. "trigger_cl_purging - Failed to create cl purging "
  5614. "thread; NSPR error - %d\n",
  5615. PR_GetError());
  5616. free_purge_data(purge_data);
  5617. } else {
  5618. /* need a little time for the thread to get started */
  5619. DS_Sleep(PR_SecondsToInterval(1));
  5620. }
  5621. }
  5622. /*
  5623. * Purge a changelog of entries that originated from a particular replica(rid)
  5624. */
  5625. void
  5626. trigger_cl_purging_thread(void *arg)
  5627. {
  5628. cleanruv_purge_data *purge_data = (cleanruv_purge_data *)arg;
  5629. /* Make sure we have a change log, and we aren't closing it */
  5630. if (s_cl5Desc.dbState == CL5_STATE_CLOSED ||
  5631. s_cl5Desc.dbState == CL5_STATE_CLOSING) {
  5632. goto free_and_return;
  5633. }
  5634. /* Bump the changelog thread count */
  5635. if (CL5_SUCCESS != _cl5AddThread()) {
  5636. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  5637. "trigger_cl_purging_thread - Abort, failed to increment thread count "
  5638. "NSPR error - %d\n",
  5639. PR_GetError());
  5640. goto free_and_return;
  5641. }
  5642. /* Purge the changelog */
  5643. _cl5DoPurging(purge_data);
  5644. _cl5RemoveThread();
  5645. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
  5646. "trigger_cl_purging_thread - purged changelog for (%s) rid (%d)\n",
  5647. slapi_sdn_get_dn(purge_data->suffix_sdn), purge_data->cleaned_rid);
  5648. free_and_return:
  5649. free_purge_data(purge_data);
  5650. }