repl5_replica.c 143 KB


  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * Copyright (C) 2009 Hewlett-Packard Development Company, L.P.
  5. * All rights reserved.
  6. *
  7. * License: GPL (version 3 or any later version).
  8. * See LICENSE for details.
  9. * END COPYRIGHT BLOCK **/
  10. #ifdef HAVE_CONFIG_H
  11. #include <config.h>
  12. #endif
  13. /* repl5_replica.c */
  14. #include "slapi-plugin.h"
  15. #include "repl5.h"
  16. #include "repl_shared.h"
  17. #include "csnpl.h"
  18. #include "cl5_api.h"
  19. #include "slap.h"
  20. #define RUV_SAVE_INTERVAL (30 * 1000) /* 30 seconds */
  21. #define REPLICA_RDN "cn=replica"
  22. /*
  23. * A replica is a locally-held copy of a portion of the DIT.
  24. */
  25. struct replica
  26. {
  27. Slapi_DN *repl_root; /* top of the replicated are */
  28. char *repl_name; /* unique replica name */
  29. PRBool new_name; /* new name was generated - need to be saved */
  30. ReplicaUpdateDNList updatedn_list; /* list of dns with which a supplier should bind to update this replica */
  31. Slapi_ValueSet *updatedn_groups; /* set of groups whose memebers are allowed to update replica */
  32. ReplicaUpdateDNList groupdn_list; /* exploded listof dns from update group */
  33. uint32_t updatedn_group_last_check; /* the time of the last group check */
  34. int64_t updatedn_group_check_interval; /* the group check interval */
  35. ReplicaType repl_type; /* is this replica read-only ? */
  36. ReplicaId repl_rid; /* replicaID */
  37. Object *repl_ruv; /* replica update vector */
  38. CSNPL *min_csn_pl; /* Pending list for minimal CSN */
  39. void *csn_pl_reg_id; /* registration assignment for csn callbacks */
  40. unsigned long repl_state_flags; /* state flags */
  41. uint32_t repl_flags; /* persistent, externally visible flags */
  42. PRMonitor *repl_lock; /* protects entire structure */
  43. Slapi_Eq_Context repl_eqcxt_rs; /* context to cancel event that saves ruv */
  44. Slapi_Eq_Context repl_eqcxt_tr; /* context to cancel event that reaps tombstones */
  45. Object *repl_csngen; /* CSN generator for this replica */
  46. PRBool repl_csn_assigned; /* Flag set when new csn is assigned. */
  47. int64_t repl_purge_delay; /* When purgeable, CSNs are held on to for this many extra seconds */
  48. PRBool tombstone_reap_stop; /* TRUE when the tombstone reaper should stop */
  49. PRBool tombstone_reap_active; /* TRUE when the tombstone reaper is running */
  50. int64_t tombstone_reap_interval; /* Time in seconds between tombstone reaping */
  51. Slapi_ValueSet *repl_referral; /* A list of administrator provided referral URLs */
  52. PRBool state_update_inprogress; /* replica state is being updated */
  53. PRLock *agmt_lock; /* protects agreement creation, start and stop */
  54. char *locking_purl; /* supplier who has exclusive access */
  55. uint64_t locking_conn; /* The supplier's connection id */
  56. Slapi_Counter *protocol_timeout; /* protocol shutdown timeout */
  57. Slapi_Counter *backoff_min; /* backoff retry minimum */
  58. Slapi_Counter *backoff_max; /* backoff retry maximum */
  59. Slapi_Counter *precise_purging; /* Enable precise tombstone purging */
  60. uint64_t agmt_count; /* Number of agmts */
  61. Slapi_Counter *release_timeout; /* The amount of time to wait before releasing active replica */
  62. uint64_t abort_session; /* Abort the current replica session */
  63. cldb_Handle *cldb; /* database info for the changelog */
  64. };
  65. typedef struct reap_callback_data
  66. {
  67. int rc;
  68. uint64_t num_entries;
  69. uint64_t num_purged_entries;
  70. CSN *purge_csn;
  71. PRBool *tombstone_reap_stop;
  72. } reap_callback_data;
  73. /* Forward declarations of helper functions*/
  74. static Slapi_Entry *_replica_get_config_entry(const Slapi_DN *root, const char **attrs);
  75. static int _replica_check_validity(const Replica *r);
  76. static int _replica_init_from_config(Replica *r, Slapi_Entry *e, char *errortext);
  77. static int _replica_update_entry(Replica *r, Slapi_Entry *e, char *errortext);
  78. static int _replica_config_changelog(Replica *r);
  79. static int _replica_configure_ruv(Replica *r, PRBool isLocked);
  80. static char *_replica_get_config_dn(const Slapi_DN *root);
  81. static char *_replica_type_as_string(const Replica *r);
  82. /* DBDB, I think this is probably bogus : */
  83. static int replica_create_ruv_tombstone(Replica *r);
  84. static void assign_csn_callback(const CSN *csn, void *data);
  85. static void abort_csn_callback(const CSN *csn, void *data);
  86. static void eq_cb_reap_tombstones(time_t when, void *arg);
  87. static CSN *_replica_get_purge_csn_nolock(const Replica *r);
  88. static void replica_get_referrals_nolock(const Replica *r, char ***referrals);
  89. static int replica_log_ruv_elements_nolock(const Replica *r);
  90. static void replica_replace_ruv_tombstone(Replica *r);
  91. static void start_agreements_for_replica(Replica *r, PRBool start);
  92. static void _delete_tombstone(const char *tombstone_dn, const char *uniqueid, int ext_op_flags);
  93. static void replica_strip_cleaned_rids(Replica *r);
  94. static void
  95. replica_lock(PRMonitor *lock)
  96. {
  97. PR_EnterMonitor(lock);
  98. }
  99. static void
  100. replica_unlock(PRMonitor *lock)
  101. {
  102. PR_ExitMonitor(lock);
  103. }
  104. /*
  105. * Allocates new replica and reads its state and state of its component from
  106. * various parts of the DIT.
  107. */
  108. Replica *
  109. replica_new(const Slapi_DN *root)
  110. {
  111. Replica *r = NULL;
  112. Slapi_Entry *e = NULL;
  113. char errorbuf[SLAPI_DSE_RETURNTEXT_SIZE];
  114. PR_ASSERT(root);
  115. /* check if there is a replica associated with the tree */
  116. e = _replica_get_config_entry(root, NULL);
  117. if (e) {
  118. errorbuf[0] = '\0';
  119. replica_new_from_entry(e, errorbuf,
  120. PR_FALSE, /* not a newly added entry */
  121. &r);
  122. if (NULL == r) {
  123. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_new - "
  124. "Unable to configure replica %s: %s\n",
  125. slapi_sdn_get_dn(root), errorbuf);
  126. }
  127. slapi_entry_free(e);
  128. }
  129. return r;
  130. }
  131. /* constructs the replica object from the newly added entry */
  132. int
  133. replica_new_from_entry(Slapi_Entry *e, char *errortext, PRBool is_add_operation, Replica **rp)
  134. {
  135. Replica *r;
  136. int rc = LDAP_SUCCESS;
  137. if (e == NULL) {
  138. if (NULL != errortext) {
  139. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "NULL entry");
  140. }
  141. return LDAP_OTHER;
  142. }
  143. r = (Replica *)slapi_ch_calloc(1, sizeof(Replica));
  144. if (!r) {
  145. if (NULL != errortext) {
  146. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "Out of memory");
  147. }
  148. rc = LDAP_OTHER;
  149. goto done;
  150. }
  151. if ((r->repl_lock = PR_NewMonitor()) == NULL) {
  152. if (NULL != errortext) {
  153. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "failed to create replica lock");
  154. }
  155. rc = LDAP_OTHER;
  156. goto done;
  157. }
  158. if ((r->agmt_lock = PR_NewLock()) == NULL) {
  159. if (NULL != errortext) {
  160. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "failed to create replica lock");
  161. }
  162. rc = LDAP_OTHER;
  163. goto done;
  164. }
  165. /* init the slapi_counter/atomic settings */
  166. r->protocol_timeout = slapi_counter_new();
  167. r->release_timeout = slapi_counter_new();
  168. r->backoff_min = slapi_counter_new();
  169. r->backoff_max = slapi_counter_new();
  170. r->precise_purging = slapi_counter_new();
  171. /* read parameters from the replica config entry */
  172. rc = _replica_init_from_config(r, e, errortext);
  173. if (rc != LDAP_SUCCESS) {
  174. goto done;
  175. }
  176. /* configure ruv */
  177. rc = _replica_configure_ruv(r, PR_FALSE);
  178. if (rc != 0) {
  179. rc = LDAP_OTHER;
  180. goto done;
  181. } else {
  182. rc = LDAP_SUCCESS;
  183. }
  184. /* If smallest csn exists in RUV for our local replica, it's ok to begin iteration */
  185. PR_ASSERT(object_get_data(r->repl_ruv));
  186. if (is_add_operation) {
  187. /*
  188. * This is called by an ldap add operation.
  189. * Update the entry to contain information generated
  190. * during replica initialization
  191. */
  192. rc = _replica_update_entry(r, e, errortext);
  193. /* add changelog config entry to config
  194. * this is only needed for replicas logging changes,
  195. * but for now let it exist for all replicas. Makes handling
  196. * of changing replica flags easier
  197. */
  198. _replica_config_changelog(r);
  199. if (r->repl_flags & REPLICA_LOG_CHANGES) {
  200. /* Init changelog db file */
  201. cldb_SetReplicaDB(r, NULL);
  202. }
  203. } else {
  204. /*
  205. * Entry is already in dse.ldif - update it on the disk
  206. * (done by the update state event scheduled below)
  207. */
  208. }
  209. if (rc != 0) {
  210. rc = LDAP_OTHER;
  211. goto done;
  212. } else {
  213. rc = LDAP_SUCCESS;
  214. }
  215. /* ONREPL - the state update can occur before the entry is added to the DIT.
  216. In that case the updated would fail but nothing bad would happen. The next
  217. scheduled update would save the state */
  218. r->repl_eqcxt_rs = slapi_eq_repeat(replica_update_state, r->repl_name,
  219. slapi_current_utc_time() + START_UPDATE_DELAY, RUV_SAVE_INTERVAL);
  220. if (r->tombstone_reap_interval > 0) {
  221. /*
  222. * Reap Tombstone should be started some time after the plugin started.
  223. * This will allow the server to fully start before consuming resources.
  224. */
  225. r->repl_eqcxt_tr = slapi_eq_repeat(eq_cb_reap_tombstones, r->repl_name,
  226. slapi_current_utc_time() + r->tombstone_reap_interval,
  227. 1000 * r->tombstone_reap_interval);
  228. }
  229. done:
  230. if (rc != LDAP_SUCCESS && r) {
  231. replica_destroy((void **)&r);
  232. }
  233. *rp = r;
  234. return rc;
  235. }
  236. void
  237. replica_flush(Replica *r)
  238. {
  239. PR_ASSERT(NULL != r);
  240. if (NULL != r) {
  241. replica_lock(r->repl_lock);
  242. /* Make sure we dump the CSNGen state */
  243. r->repl_csn_assigned = PR_TRUE;
  244. replica_unlock(r->repl_lock);
  245. /* This function take the Lock Inside */
  246. /* And also write the RUV */
  247. replica_update_state((time_t)0, r->repl_name);
  248. }
  249. }
  250. void
  251. replica_set_csn_assigned(Replica *r)
  252. {
  253. replica_lock(r->repl_lock);
  254. r->repl_csn_assigned = PR_TRUE;
  255. replica_unlock(r->repl_lock);
  256. }
  257. /*
  258. * Deallocate a replica. arg should point to the address of a
  259. * pointer that points to a replica structure.
  260. */
  261. void
  262. replica_destroy(void **arg)
  263. {
  264. Replica *r;
  265. if (arg == NULL)
  266. return;
  267. r = *((Replica **)arg);
  268. PR_ASSERT(r);
  269. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "replica_destroy\n");
  270. /*
  271. * The function will not be called unless the refcnt of its
  272. * wrapper object is 0. Hopefully this refcnt could sync up
  273. * this destruction and the events such as tombstone reap
  274. * and ruv updates.
  275. */
  276. if (r->repl_eqcxt_rs) {
  277. slapi_eq_cancel(r->repl_eqcxt_rs);
  278. r->repl_eqcxt_rs = NULL;
  279. }
  280. if (r->repl_eqcxt_tr) {
  281. slapi_eq_cancel(r->repl_eqcxt_tr);
  282. r->repl_eqcxt_tr = NULL;
  283. }
  284. if (r->repl_root) {
  285. slapi_sdn_free(&r->repl_root);
  286. }
  287. slapi_ch_free_string(&r->locking_purl);
  288. if (r->updatedn_list) {
  289. replica_updatedn_list_free(r->updatedn_list);
  290. r->updatedn_list = NULL;
  291. }
  292. if (r->groupdn_list) {
  293. replica_updatedn_list_free(r->groupdn_list);
  294. r->groupdn_list = NULL;
  295. }
  296. if (r->updatedn_groups) {
  297. slapi_valueset_free(r->updatedn_groups);
  298. }
  299. /* slapi_ch_free accepts NULL pointer */
  300. slapi_ch_free((void **)&r->repl_name);
  301. if (r->repl_lock) {
  302. PR_DestroyMonitor(r->repl_lock);
  303. r->repl_lock = NULL;
  304. }
  305. if (r->agmt_lock) {
  306. PR_DestroyLock(r->agmt_lock);
  307. r->agmt_lock = NULL;
  308. }
  309. if (NULL != r->repl_ruv) {
  310. object_release(r->repl_ruv);
  311. }
  312. if (NULL != r->repl_csngen) {
  313. if (r->csn_pl_reg_id) {
  314. csngen_unregister_callbacks((CSNGen *)object_get_data(r->repl_csngen), r->csn_pl_reg_id);
  315. }
  316. object_release(r->repl_csngen);
  317. }
  318. if (NULL != r->repl_referral) {
  319. slapi_valueset_free(r->repl_referral);
  320. }
  321. if (NULL != r->min_csn_pl) {
  322. csnplFree(&r->min_csn_pl);
  323. ;
  324. }
  325. slapi_counter_destroy(&r->protocol_timeout);
  326. slapi_counter_destroy(&r->release_timeout);
  327. slapi_counter_destroy(&r->backoff_min);
  328. slapi_counter_destroy(&r->backoff_max);
  329. slapi_counter_destroy(&r->precise_purging);
  330. slapi_ch_free((void **)arg);
  331. }
  332. #define KEEP_ALIVE_ATTR "keepalivetimestamp"
  333. #define KEEP_ALIVE_ENTRY "repl keep alive"
  334. #define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s"
  335. static int
  336. replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid)
  337. {
  338. char *entry_string = NULL;
  339. Slapi_Entry *e = NULL;
  340. Slapi_PBlock *pb = NULL;
  341. int return_value;
  342. int rc = 0;
  343. entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d",
  344. KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid);
  345. if (entry_string == NULL) {
  346. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  347. "replica_subentry_create - Failed in slapi_ch_smprintf\n");
  348. rc = -1;
  349. goto done;
  350. }
  351. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name,
  352. "replica_subentry_create - add %s\n", entry_string);
  353. e = slapi_str2entry(entry_string, 0);
  354. /* create the entry */
  355. pb = slapi_pblock_new();
  356. slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */
  357. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */);
  358. slapi_add_internal_pb(pb);
  359. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value);
  360. if (return_value != LDAP_SUCCESS &&
  361. return_value != LDAP_ALREADY_EXISTS &&
  362. return_value != LDAP_REFERRAL /* CONSUMER */) {
  363. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_subentry_create - Unable to "
  364. "create replication keep alive entry %s: error %d - %s\n",
  365. slapi_entry_get_dn_const(e),
  366. return_value, ldap_err2string(return_value));
  367. rc = -1;
  368. goto done;
  369. }
  370. done:
  371. slapi_pblock_destroy(pb);
  372. slapi_ch_free_string(&entry_string);
  373. return rc;
  374. }
  375. int
  376. replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid)
  377. {
  378. Slapi_PBlock *pb;
  379. char *filter = NULL;
  380. Slapi_Entry **entries = NULL;
  381. int res;
  382. int rc = 0;
  383. pb = slapi_pblock_new();
  384. filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid);
  385. slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL,
  386. filter, NULL, 0, NULL, NULL,
  387. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  388. slapi_search_internal_pb(pb);
  389. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res);
  390. if (res == LDAP_SUCCESS) {
  391. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
  392. if (entries && (entries[0] == NULL)) {
  393. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name,
  394. "replica_subentry_check - Need to create replication keep alive entry <cn=%s %d,%s>\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
  395. rc = replica_subentry_create(repl_root, rid);
  396. } else {
  397. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  398. "replica_subentry_check - replication keep alive entry <cn=%s %d,%s> already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
  399. rc = 0;
  400. }
  401. } else {
  402. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  403. "replica_subentry_check - Error accessing replication keep alive entry <cn=%s %d,%s> res=%d\n",
  404. KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res);
  405. /* The status of the entry is not clear, do not attempt to create it */
  406. rc = 1;
  407. }
  408. slapi_free_search_results_internal(pb);
  409. slapi_pblock_destroy(pb);
  410. slapi_ch_free_string(&filter);
  411. return rc;
  412. }
  413. int
  414. replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid)
  415. {
  416. int ldrc;
  417. int rc = LDAP_SUCCESS; /* Optimistic default */
  418. LDAPMod *mods[2];
  419. LDAPMod mod;
  420. struct berval *vals[2];
  421. char buf[SLAPI_TIMESTAMP_BUFSIZE];
  422. struct berval val;
  423. Slapi_PBlock *modpb = NULL;
  424. char *dn;
  425. replica_subentry_check(repl_root, rid);
  426. slapi_timestamp_utc_hr(buf, SLAPI_TIMESTAMP_BUFSIZE);
  427. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf);
  428. val.bv_val = buf;
  429. val.bv_len = strlen(val.bv_val);
  430. vals[0] = &val;
  431. vals[1] = NULL;
  432. mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES;
  433. mod.mod_type = KEEP_ALIVE_ATTR;
  434. mod.mod_bvalues = vals;
  435. mods[0] = &mod;
  436. mods[1] = NULL;
  437. modpb = slapi_pblock_new();
  438. dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
  439. slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL,
  440. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  441. slapi_modify_internal_pb(modpb);
  442. slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc);
  443. if (ldrc != LDAP_SUCCESS) {
  444. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  445. "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf);
  446. rc = ldrc;
  447. } else {
  448. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name,
  449. "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf);
  450. }
  451. slapi_pblock_destroy(modpb);
  452. slapi_ch_free_string(&dn);
  453. return rc;
  454. }
  455. /*
  456. * Attempt to obtain exclusive access to replica (advisory only)
  457. *
  458. * Returns PR_TRUE if exclusive access was granted,
  459. * PR_FALSE otherwise
  460. * The parameter isInc tells whether or not the replica is being
  461. * locked for an incremental update session - if the replica is
  462. * successfully locked, this value is used - if the replica is already
  463. * in use, this value will be set to TRUE or FALSE, depending on what
  464. * type of update session has the replica in use currently
  465. * locking_purl is the supplier who is attempting to acquire access
  466. * current_purl is the supplier who already has access, if any
  467. */
  468. PRBool
  469. replica_get_exclusive_access(Replica *r, PRBool *isInc, uint64_t connid, int opid, const char *locking_purl, char **current_purl)
  470. {
  471. PRBool rval = PR_TRUE;
  472. PR_ASSERT(r);
  473. replica_lock(r->repl_lock);
  474. if (r->repl_state_flags & REPLICA_IN_USE) {
  475. if (isInc)
  476. *isInc = (r->repl_state_flags & REPLICA_INCREMENTAL_IN_PROGRESS);
  477. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  478. "replica_get_exclusive_access - "
  479. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  480. "Replica in use locking_purl=%s\n",
  481. connid, opid,
  482. slapi_sdn_get_dn(r->repl_root),
  483. r->locking_purl ? r->locking_purl : "unknown");
  484. rval = PR_FALSE;
  485. if (!(r->repl_state_flags & REPLICA_TOTAL_IN_PROGRESS)) {
  486. /* inc update */
  487. if (r->locking_purl && r->locking_conn == connid) {
  488. /* This is the same supplier connection, reset the replica
  489. * purl, and return success */
  490. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  491. "replica_get_exclusive_access - "
  492. "This is a second acquire attempt from the same replica connection "
  493. " - return success instead of busy\n");
  494. slapi_ch_free_string(&r->locking_purl);
  495. r->locking_purl = slapi_ch_strdup(locking_purl);
  496. rval = PR_TRUE;
  497. goto done;
  498. }
  499. if (replica_get_release_timeout(r)) {
  500. /*
  501. * Abort the current session so other replicas can acquire
  502. * this server.
  503. */
  504. r->abort_session = ABORT_SESSION;
  505. }
  506. }
  507. if (current_purl) {
  508. *current_purl = slapi_ch_strdup(r->locking_purl);
  509. }
  510. } else {
  511. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  512. "replica_get_exclusive_access - "
  513. "conn=%" PRIu64 " op=%d repl=\"%s\": Acquired replica\n",
  514. connid, opid,
  515. slapi_sdn_get_dn(r->repl_root));
  516. r->repl_state_flags |= REPLICA_IN_USE;
  517. r->abort_session = SESSION_ACQUIRED;
  518. if (isInc && *isInc) {
  519. r->repl_state_flags |= REPLICA_INCREMENTAL_IN_PROGRESS;
  520. } else {
  521. /*
  522. * If connid or opid != 0, it's a total update.
  523. * Both set to 0 means we're disabling replication
  524. */
  525. if (connid || opid) {
  526. r->repl_state_flags |= REPLICA_TOTAL_IN_PROGRESS;
  527. }
  528. }
  529. slapi_ch_free_string(&r->locking_purl);
  530. r->locking_purl = slapi_ch_strdup(locking_purl);
  531. r->locking_conn = connid;
  532. }
  533. done:
  534. replica_unlock(r->repl_lock);
  535. return rval;
  536. }
  537. /*
  538. * Relinquish exclusive access to the replica
  539. */
  540. void
  541. replica_relinquish_exclusive_access(Replica *r, uint64_t connid, int opid)
  542. {
  543. PRBool isInc;
  544. PR_ASSERT(r);
  545. replica_lock(r->repl_lock);
  546. isInc = (r->repl_state_flags & REPLICA_INCREMENTAL_IN_PROGRESS);
  547. /* check to see if the replica is in use and log a warning if not */
  548. if (!(r->repl_state_flags & REPLICA_IN_USE)) {
  549. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  550. "replica_relinquish_exclusive_access - "
  551. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  552. "Replica not in use\n",
  553. connid, opid, slapi_sdn_get_dn(r->repl_root));
  554. } else {
  555. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  556. "replica_relinquish_exclusive_access - "
  557. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  558. "Released replica held by locking_purl=%s\n",
  559. connid, opid,
  560. slapi_sdn_get_dn(r->repl_root), r->locking_purl);
  561. slapi_ch_free_string(&r->locking_purl);
  562. r->repl_state_flags &= ~(REPLICA_IN_USE);
  563. if (isInc)
  564. r->repl_state_flags &= ~(REPLICA_INCREMENTAL_IN_PROGRESS);
  565. else
  566. r->repl_state_flags &= ~(REPLICA_TOTAL_IN_PROGRESS);
  567. }
  568. replica_unlock(r->repl_lock);
  569. }
  570. /*
  571. * Returns root of the replicated area
  572. */
  573. PRBool
  574. replica_get_tombstone_reap_active(const Replica *r)
  575. {
  576. PR_ASSERT(r);
  577. return (r->tombstone_reap_active);
  578. }
  579. /*
  580. * Returns root of the replicated area
  581. */
  582. const Slapi_DN *
  583. replica_get_root(const Replica *r) /* ONREPL - should we return copy instead? */
  584. {
  585. PR_ASSERT(r);
  586. /* replica root never changes so we don't have to lock */
  587. return (r->repl_root);
  588. }
  589. /*
  590. * Returns normalized dn of the root of the replicated area
  591. */
  592. const char *
  593. replica_get_name(const Replica *r) /* ONREPL - should we return copy instead? */
  594. {
  595. PR_ASSERT(r);
  596. /* replica name never changes so we don't have to lock */
  597. return (r->repl_name);
  598. }
  599. /*
  600. * Returns locking_conn of this replica
  601. */
  602. uint64_t
  603. replica_get_locking_conn(const Replica *r)
  604. {
  605. uint64_t connid;
  606. replica_lock(r->repl_lock);
  607. connid = r->locking_conn;
  608. replica_unlock(r->repl_lock);
  609. return connid;
  610. }
  611. /*
  612. * Returns replicaid of this replica
  613. */
  614. ReplicaId
  615. replica_get_rid(const Replica *r)
  616. {
  617. ReplicaId rid;
  618. PR_ASSERT(r);
  619. replica_lock(r->repl_lock);
  620. rid = r->repl_rid;
  621. replica_unlock(r->repl_lock);
  622. return rid;
  623. }
  624. /*
  625. * Sets replicaid of this replica - should only be used when also changing the type
  626. */
  627. void
  628. replica_set_rid(Replica *r, ReplicaId rid)
  629. {
  630. PR_ASSERT(r);
  631. replica_lock(r->repl_lock);
  632. r->repl_rid = rid;
  633. replica_unlock(r->repl_lock);
  634. }
  635. /* Returns true if replica was initialized through ORC or import;
  636. * otherwise, false. An uninitialized replica should return
  637. * LDAP_UNWILLING_TO_PERFORM to all client requests
  638. */
  639. PRBool
  640. replica_is_initialized(const Replica *r)
  641. {
  642. PR_ASSERT(r);
  643. return (r->repl_ruv != NULL);
  644. }
  645. /*
  646. * Returns refcounted object that contains RUV. The caller should release the
  647. * object once it is no longer used. To release, call object_release
  648. */
  649. Object *
  650. replica_get_ruv(const Replica *r)
  651. {
  652. Object *ruv = NULL;
  653. PR_ASSERT(r);
  654. replica_lock(r->repl_lock);
  655. PR_ASSERT(r->repl_ruv);
  656. object_acquire(r->repl_ruv);
  657. ruv = r->repl_ruv;
  658. replica_unlock(r->repl_lock);
  659. return ruv;
  660. }
  661. /*
  662. * Sets RUV vector. This function should be called during replica
  663. * (re)initialization. During normal operation, the RUV is read from
  664. * the root of the replicated in the replica_new call
  665. */
  666. void
  667. replica_set_ruv(Replica *r, RUV *ruv)
  668. {
  669. PR_ASSERT(r && ruv);
  670. replica_lock(r->repl_lock);
  671. if (NULL != r->repl_ruv) {
  672. object_release(r->repl_ruv);
  673. }
  674. /* if the local replica is not in the RUV and it is writable - add it
  675. and reinitialize min_csn pending list */
  676. if (r->repl_type == REPLICA_TYPE_UPDATABLE) {
  677. CSN *csn = NULL;
  678. if (r->min_csn_pl)
  679. csnplFree(&r->min_csn_pl);
  680. if (ruv_contains_replica(ruv, r->repl_rid)) {
  681. ruv_get_smallest_csn_for_replica(ruv, r->repl_rid, &csn);
  682. if (csn)
  683. csn_free(&csn);
  684. else
  685. r->min_csn_pl = csnplNew();
  686. /* We need to make sure the local ruv element is the 1st. */
  687. ruv_move_local_supplier_to_first(ruv, r->repl_rid);
  688. } else {
  689. r->min_csn_pl = csnplNew();
  690. /* To be sure that the local is in first */
  691. ruv_add_index_replica(ruv, r->repl_rid, multimaster_get_local_purl(), 1);
  692. }
  693. }
  694. r->repl_ruv = object_new((void *)ruv, (FNFree)ruv_destroy);
  695. if (r->repl_flags & REPLICA_LOG_CHANGES) {
  696. cl5NotifyRUVChange(r);
  697. }
  698. replica_unlock(r->repl_lock);
  699. }
  700. /*
  701. * Update one particular CSN in an RUV. This is meant to be called
  702. * whenever (a) the server has processed a client operation and
  703. * needs to update its CSN, or (b) the server is completing an
  704. * inbound replication session operation, and needs to update its
  705. * local RUV.
  706. */
  707. int
  708. replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
  709. {
  710. char csn_str[CSN_STRSIZE];
  711. int rc = RUV_SUCCESS;
  712. PR_ASSERT(NULL != r);
  713. PR_ASSERT(NULL != updated_csn);
  714. #ifdef DEBUG
  715. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  716. "replica_update_ruv: csn %s\n",
  717. csn_as_string(updated_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
  718. #endif
  719. if (NULL == r) {
  720. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_update_ruv - Replica "
  721. "is NULL\n");
  722. rc = RUV_BAD_DATA;
  723. } else if (NULL == updated_csn) {
  724. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_update_ruv - csn "
  725. "is NULL when updating replica %s\n",
  726. slapi_sdn_get_dn(r->repl_root));
  727. rc = RUV_BAD_DATA;
  728. } else {
  729. RUV *ruv;
  730. replica_lock(r->repl_lock);
  731. if (r->repl_ruv != NULL) {
  732. ruv = object_get_data(r->repl_ruv);
  733. if (NULL != ruv) {
  734. ReplicaId rid = csn_get_replicaid(updated_csn);
  735. if (rid == r->repl_rid) {
  736. if (NULL != r->min_csn_pl) {
  737. CSN *min_csn;
  738. PRBool committed;
  739. (void)csnplCommit(r->min_csn_pl, updated_csn);
  740. min_csn = csnplGetMinCSN(r->min_csn_pl, &committed);
  741. if (NULL != min_csn) {
  742. if (committed) {
  743. ruv_set_min_csn(ruv, min_csn, replica_purl);
  744. csnplFree(&r->min_csn_pl);
  745. }
  746. csn_free(&min_csn);
  747. }
  748. }
  749. }
  750. /* Update max csn for local and remote replicas */
  751. rc = ruv_update_ruv(ruv, updated_csn, replica_purl, r, r->repl_rid);
  752. if (RUV_COVERS_CSN == rc) {
  753. slapi_log_err(SLAPI_LOG_REPL,
  754. repl_plugin_name, "replica_update_ruv - RUV "
  755. "for replica %s already covers max_csn = %s\n",
  756. slapi_sdn_get_dn(r->repl_root),
  757. csn_as_string(updated_csn, PR_FALSE, csn_str));
  758. /* RUV is not dirty - no write needed */
  759. } else if (RUV_SUCCESS != rc) {
  760. slapi_log_err(SLAPI_LOG_ERR,
  761. repl_plugin_name, "replica_update_ruv - Unable "
  762. "to update RUV for replica %s, csn = %s\n",
  763. slapi_sdn_get_dn(r->repl_root),
  764. csn_as_string(updated_csn, PR_FALSE, csn_str));
  765. }
  766. } else {
  767. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  768. "replica_update_ruv - Unable to get RUV object for replica "
  769. "%s\n",
  770. slapi_sdn_get_dn(r->repl_root));
  771. rc = RUV_NOTFOUND;
  772. }
  773. } else {
  774. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_update_ruv - "
  775. "Unable to initialize RUV for replica %s\n",
  776. slapi_sdn_get_dn(r->repl_root));
  777. rc = RUV_NOTFOUND;
  778. }
  779. replica_unlock(r->repl_lock);
  780. }
  781. return rc;
  782. }
  783. /*
  784. * Returns refcounted object that contains csn generator. The caller should release the
  785. * object once it is no longer used. To release, call object_release
  786. */
  787. Object *
  788. replica_get_csngen(const Replica *r)
  789. {
  790. Object *csngen;
  791. PR_ASSERT(r);
  792. replica_lock(r->repl_lock);
  793. object_acquire(r->repl_csngen);
  794. csngen = r->repl_csngen;
  795. replica_unlock(r->repl_lock);
  796. return csngen;
  797. }
  798. /*
  799. * Returns the replica type.
  800. */
  801. ReplicaType
  802. replica_get_type(const Replica *r)
  803. {
  804. PR_ASSERT(r);
  805. return r->repl_type;
  806. }
  807. uint64_t
  808. replica_get_protocol_timeout(Replica *r)
  809. {
  810. if (r) {
  811. return slapi_counter_get_value(r->protocol_timeout);
  812. } else {
  813. return 0;
  814. }
  815. }
  816. uint64_t
  817. replica_get_release_timeout(Replica *r)
  818. {
  819. if (r) {
  820. return slapi_counter_get_value(r->release_timeout);
  821. } else {
  822. return 0;
  823. }
  824. }
  825. void
  826. replica_set_release_timeout(Replica *r, uint64_t limit)
  827. {
  828. if (r) {
  829. slapi_counter_set_value(r->release_timeout, limit);
  830. }
  831. }
  832. void
  833. replica_set_protocol_timeout(Replica *r, uint64_t timeout)
  834. {
  835. if (r) {
  836. slapi_counter_set_value(r->protocol_timeout, timeout);
  837. }
  838. }
  839. void
  840. replica_set_groupdn_checkinterval(Replica *r, int interval)
  841. {
  842. if (r) {
  843. r->updatedn_group_check_interval = interval;
  844. }
  845. }
  846. /*
  847. * Sets the replica type.
  848. */
  849. void
  850. replica_set_type(Replica *r, ReplicaType type)
  851. {
  852. PR_ASSERT(r);
  853. replica_lock(r->repl_lock);
  854. r->repl_type = type;
  855. replica_unlock(r->repl_lock);
  856. }
  857. static PRBool
  858. valuesets_equal(Slapi_ValueSet *new_dn_groups, Slapi_ValueSet *old_dn_groups)
  859. {
  860. Slapi_Attr *attr = NULL;
  861. Slapi_Value *val = NULL;
  862. int idx = 0;
  863. PRBool rc = PR_TRUE;
  864. if (new_dn_groups == NULL) {
  865. if (old_dn_groups == NULL)
  866. return PR_TRUE;
  867. else
  868. return PR_FALSE;
  869. }
  870. if (old_dn_groups == NULL) {
  871. return PR_FALSE;
  872. }
  873. /* if there is not the same number of value, no need to check the value themselves */
  874. if (new_dn_groups->num != old_dn_groups->num) {
  875. return PR_FALSE;
  876. }
  877. attr = slapi_attr_new();
  878. slapi_attr_init(attr, attr_replicaBindDnGroup);
  879. /* Check that all values in old_dn_groups also exist in new_dn_groups */
  880. for (idx = slapi_valueset_first_value(old_dn_groups, &val);
  881. val && (idx != -1);
  882. idx = slapi_valueset_next_value(old_dn_groups, idx, &val)) {
  883. if (slapi_valueset_find(attr, new_dn_groups, val) == NULL) {
  884. rc = PR_FALSE;
  885. break;
  886. }
  887. }
  888. slapi_attr_free(&attr);
  889. return rc;
  890. }
  891. /*
  892. * Returns true if sdn is the same as updatedn and false otherwise
  893. */
  894. PRBool
  895. replica_is_updatedn(Replica *r, const Slapi_DN *sdn)
  896. {
  897. PRBool result = PR_FALSE;
  898. PR_ASSERT(r);
  899. replica_lock(r->repl_lock);
  900. if ((r->updatedn_list == NULL) && (r->groupdn_list == NULL)) {
  901. if (sdn == NULL) {
  902. result = PR_TRUE;
  903. } else {
  904. result = PR_FALSE;
  905. }
  906. replica_unlock(r->repl_lock);
  907. return result;
  908. }
  909. if (r->updatedn_list) {
  910. result = replica_updatedn_list_ismember(r->updatedn_list, sdn);
  911. if (result == PR_TRUE) {
  912. /* sdn is present in the updatedn_list */
  913. replica_unlock(r->repl_lock);
  914. return result;
  915. }
  916. }
  917. if (r->groupdn_list) {
  918. /* check and rebuild groupdns */
  919. if (r->updatedn_group_check_interval > -1) {
  920. time_t now = slapi_current_utc_time();
  921. if (now - r->updatedn_group_last_check > r->updatedn_group_check_interval) {
  922. Slapi_ValueSet *updatedn_groups_copy = NULL;
  923. ReplicaUpdateDNList groupdn_list = replica_updatedn_list_new(NULL);
  924. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "Authorized replication managers is resync (%ld)\n", now);
  925. updatedn_groups_copy = slapi_valueset_new();
  926. slapi_valueset_set_valueset(updatedn_groups_copy, r->updatedn_groups);
  927. r->updatedn_group_last_check = now; /* Just to be sure no one will try to reload */
  928. /* It can do internal searches, to avoid deadlock release the replica lock
  929. * as we are working on local variables
  930. */
  931. replica_unlock(r->repl_lock);
  932. replica_updatedn_list_group_replace(groupdn_list, updatedn_groups_copy);
  933. replica_lock(r->repl_lock);
  934. if (valuesets_equal(r->updatedn_groups, updatedn_groups_copy)) {
  935. /* the updatedn_groups has not been updated while we release the replica
  936. * this is fine to apply the groupdn_list
  937. */
  938. replica_updatedn_list_delete(r->groupdn_list, NULL);
  939. replica_updatedn_list_free(r->groupdn_list);
  940. r->groupdn_list = groupdn_list;
  941. } else {
  942. /* the unpdatedn_groups has been updated while we released the replica
  943. * groupdn_list in the replica is up to date. Do not replace it
  944. */
  945. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "Authorized replication managers (%s) was updated during a refresh\n", attr_replicaBindDnGroup);
  946. replica_updatedn_list_delete(groupdn_list, NULL);
  947. replica_updatedn_list_free(groupdn_list);
  948. }
  949. slapi_valueset_free(updatedn_groups_copy);
  950. }
  951. }
  952. result = replica_updatedn_list_ismember(r->groupdn_list, sdn);
  953. }
  954. replica_unlock(r->repl_lock);
  955. return result;
  956. }
  957. /*
  958. * Sets updatedn list for this replica
  959. */
  960. void
  961. replica_set_updatedn(Replica *r, const Slapi_ValueSet *vs, int mod_op)
  962. {
  963. PR_ASSERT(r);
  964. replica_lock(r->repl_lock);
  965. if (!r->updatedn_list)
  966. r->updatedn_list = replica_updatedn_list_new(NULL);
  967. if (SLAPI_IS_MOD_DELETE(mod_op) || vs == NULL ||
  968. (0 == slapi_valueset_count(vs))) /* null value also causes list deletion */
  969. replica_updatedn_list_delete(r->updatedn_list, vs);
  970. else if (SLAPI_IS_MOD_REPLACE(mod_op))
  971. replica_updatedn_list_replace(r->updatedn_list, vs);
  972. else if (SLAPI_IS_MOD_ADD(mod_op))
  973. replica_updatedn_list_add(r->updatedn_list, vs);
  974. replica_unlock(r->repl_lock);
  975. }
  976. /*
  977. * Sets updatedn list for this replica
  978. */
  979. void
  980. replica_set_groupdn(Replica *r, const Slapi_ValueSet *vs, int mod_op)
  981. {
  982. PR_ASSERT(r);
  983. replica_lock(r->repl_lock);
  984. if (!r->groupdn_list)
  985. r->groupdn_list = replica_updatedn_list_new(NULL);
  986. if (!r->updatedn_groups)
  987. r->updatedn_groups = slapi_valueset_new();
  988. if (SLAPI_IS_MOD_DELETE(mod_op) || vs == NULL ||
  989. (0 == slapi_valueset_count(vs))) {
  990. /* null value also causes list deletion */
  991. slapi_valueset_free(r->updatedn_groups);
  992. r->updatedn_groups = NULL;
  993. replica_updatedn_list_delete(r->groupdn_list, vs);
  994. } else if (SLAPI_IS_MOD_REPLACE(mod_op)) {
  995. if (r->updatedn_groups) {
  996. slapi_valueset_done(r->updatedn_groups);
  997. } else {
  998. r->updatedn_groups = slapi_valueset_new();
  999. }
  1000. slapi_valueset_set_valueset(r->updatedn_groups, vs);
  1001. replica_updatedn_list_group_replace(r->groupdn_list, vs);
  1002. } else if (SLAPI_IS_MOD_ADD(mod_op)) {
  1003. if (!r->updatedn_groups) {
  1004. r->updatedn_groups = slapi_valueset_new();
  1005. }
  1006. slapi_valueset_join_attr_valueset(NULL, r->updatedn_groups, vs);
  1007. replica_updatedn_list_add_ext(r->groupdn_list, vs, 1);
  1008. }
  1009. replica_unlock(r->repl_lock);
  1010. }
  1011. void
  1012. replica_reset_csn_pl(Replica *r)
  1013. {
  1014. replica_lock(r->repl_lock);
  1015. if (NULL != r->min_csn_pl) {
  1016. csnplFree(&r->min_csn_pl);
  1017. }
  1018. r->min_csn_pl = csnplNew();
  1019. replica_unlock(r->repl_lock);
  1020. }
  1021. /* gets current replica generation for this replica */
  1022. char *
  1023. replica_get_generation(const Replica *r)
  1024. {
  1025. int rc = 0;
  1026. char *gen = NULL;
  1027. if (r && r->repl_ruv) {
  1028. replica_lock(r->repl_lock);
  1029. if (rc == 0)
  1030. gen = ruv_get_replica_generation((RUV *)object_get_data(r->repl_ruv));
  1031. replica_unlock(r->repl_lock);
  1032. }
  1033. return gen;
  1034. }
  1035. PRBool
  1036. replica_is_flag_set(const Replica *r, uint32_t flag)
  1037. {
  1038. if (r)
  1039. return (r->repl_flags & flag);
  1040. else
  1041. return PR_FALSE;
  1042. }
  1043. void
  1044. replica_set_flag(Replica *r, uint32_t flag, PRBool clear)
  1045. {
  1046. if (r == NULL)
  1047. return;
  1048. replica_lock(r->repl_lock);
  1049. if (clear) {
  1050. r->repl_flags &= ~flag;
  1051. } else {
  1052. r->repl_flags |= flag;
  1053. }
  1054. replica_unlock(r->repl_lock);
  1055. }
  1056. void
  1057. replica_replace_flags(Replica *r, uint32_t flags)
  1058. {
  1059. if (r) {
  1060. replica_lock(r->repl_lock);
  1061. r->repl_flags = flags;
  1062. replica_unlock(r->repl_lock);
  1063. }
  1064. }
  1065. void
  1066. replica_get_referrals(const Replica *r, char ***referrals)
  1067. {
  1068. replica_lock(r->repl_lock);
  1069. replica_get_referrals_nolock(r, referrals);
  1070. replica_unlock(r->repl_lock);
  1071. }
  1072. void
  1073. replica_set_referrals(Replica *r, const Slapi_ValueSet *vs)
  1074. {
  1075. int ii = 0;
  1076. Slapi_Value *vv = NULL;
  1077. if (r->repl_referral == NULL) {
  1078. r->repl_referral = slapi_valueset_new();
  1079. } else {
  1080. slapi_valueset_done(r->repl_referral);
  1081. }
  1082. slapi_valueset_set_valueset(r->repl_referral, vs);
  1083. /* make sure the DN is included in the referral LDAP URL */
  1084. if (r->repl_referral) {
  1085. Slapi_ValueSet *newvs = slapi_valueset_new();
  1086. const char *repl_root = slapi_sdn_get_dn(r->repl_root);
  1087. ii = slapi_valueset_first_value(r->repl_referral, &vv);
  1088. while (vv) {
  1089. const char *ref = slapi_value_get_string(vv);
  1090. LDAPURLDesc *lud = NULL;
  1091. (void)slapi_ldap_url_parse(ref, &lud, 0, NULL);
  1092. /* see if the dn is already in the referral URL */
  1093. if (!lud || !lud->lud_dn) {
  1094. /* add the dn */
  1095. Slapi_Value *newval = NULL;
  1096. int len = strlen(ref);
  1097. char *tmpref = NULL;
  1098. int need_slash = 0;
  1099. if (ref[len - 1] != '/') {
  1100. need_slash = 1;
  1101. }
  1102. tmpref = slapi_ch_smprintf("%s%s%s", ref, (need_slash ? "/" : ""),
  1103. repl_root);
  1104. newval = slapi_value_new_string(tmpref);
  1105. slapi_ch_free_string(&tmpref); /* sv_new_string makes a copy */
  1106. slapi_valueset_add_value(newvs, newval);
  1107. slapi_value_free(&newval); /* s_vs_add_value makes a copy */
  1108. }
  1109. if (lud)
  1110. ldap_free_urldesc(lud);
  1111. ii = slapi_valueset_next_value(r->repl_referral, ii, &vv);
  1112. }
  1113. if (slapi_valueset_count(newvs) > 0) {
  1114. slapi_valueset_done(r->repl_referral);
  1115. slapi_valueset_set_valueset(r->repl_referral, newvs);
  1116. }
  1117. slapi_valueset_free(newvs); /* s_vs_set_vs makes a copy */
  1118. }
  1119. }
  1120. int
  1121. replica_update_csngen_state_ext(Replica *r, const RUV *ruv, const CSN *extracsn)
  1122. {
  1123. int rc = 0;
  1124. CSNGen *gen;
  1125. CSN *csn = NULL;
  1126. PR_ASSERT(r && ruv);
  1127. rc = ruv_get_max_csn(ruv, &csn);
  1128. if (rc != RUV_SUCCESS) {
  1129. return -1;
  1130. }
  1131. if ((csn == NULL) && (extracsn == NULL)) /* ruv contains no csn and no extra - we are done */
  1132. {
  1133. return 0;
  1134. }
  1135. if (csn_compare(extracsn, csn) > 0) /* extracsn > csn */
  1136. {
  1137. csn_free(&csn); /* free */
  1138. csn = (CSN *)extracsn; /* use this csn to do the update */
  1139. }
  1140. replica_lock(r->repl_lock);
  1141. gen = (CSNGen *)object_get_data(r->repl_csngen);
  1142. PR_ASSERT(gen);
  1143. rc = csngen_adjust_time(gen, csn);
  1144. /* rc will be either CSN_SUCCESS (0) or clock skew */
  1145. /* done: */
  1146. replica_unlock(r->repl_lock);
  1147. if (csn != extracsn) /* do not free the given csn */
  1148. {
  1149. csn_free(&csn);
  1150. }
  1151. return rc;
  1152. }
  1153. int
  1154. replica_update_csngen_state(Replica *r, const RUV *ruv)
  1155. {
  1156. return replica_update_csngen_state_ext(r, ruv, NULL);
  1157. }
  1158. /*
  1159. * dumps replica state for debugging purpose
  1160. */
  1161. void
  1162. replica_dump(Replica *r)
  1163. {
  1164. char *updatedn_list = NULL;
  1165. PR_ASSERT(r);
  1166. replica_lock(r->repl_lock);
  1167. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "Replica state:\n");
  1168. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\treplica root: %s\n",
  1169. slapi_sdn_get_ndn(r->repl_root));
  1170. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\treplica type: %s\n",
  1171. _replica_type_as_string(r));
  1172. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\treplica id: %d\n", r->repl_rid);
  1173. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\tflags: %d\n", r->repl_flags);
  1174. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\tstate flags: %lu\n", r->repl_state_flags);
  1175. if (r->updatedn_list)
  1176. updatedn_list = replica_updatedn_list_to_string(r->updatedn_list, "\n\t\t");
  1177. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\tupdate dn: %s\n",
  1178. updatedn_list ? updatedn_list : "not configured");
  1179. slapi_ch_free_string(&updatedn_list);
  1180. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "\tCSN generator: %s configured\n",
  1181. r->repl_csngen ? "" : "not");
  1182. /* JCMREPL - Dump Referrals */
  1183. replica_unlock(r->repl_lock);
  1184. }
  1185. /*
  1186. * Return the CSN of the purge point. Any CSNs smaller than the
  1187. * purge point can be safely removed from entries within this
  1188. * this replica. Returns an allocated CSN that must be freed by
  1189. * the caller, or NULL if purging is disabled.
  1190. */
  1191. CSN *
  1192. replica_get_purge_csn(const Replica *r)
  1193. {
  1194. CSN *csn;
  1195. replica_lock(r->repl_lock);
  1196. csn = _replica_get_purge_csn_nolock(r);
  1197. replica_unlock(r->repl_lock);
  1198. return csn;
  1199. }
  1200. /*
  1201. * This function logs a dummy entry for the smallest csn in the RUV.
  1202. * This is necessary because, to get the next change, we need to position
  1203. * changelog on the previous change. So this function insures that we always have one.
  1204. */
  1205. /* ONREPL we will need to change this function to log all the
  1206. * ruv elements not just the smallest when changelog iteration
  1207. * algoritm changes to iterate replica by replica
  1208. */
  1209. int
  1210. replica_log_ruv_elements(const Replica *r)
  1211. {
  1212. int rc = 0;
  1213. PR_ASSERT(r);
  1214. replica_lock(r->repl_lock);
  1215. rc = replica_log_ruv_elements_nolock(r);
  1216. replica_unlock(r->repl_lock);
  1217. return rc;
  1218. }
  1219. void
  1220. consumer5_set_mapping_tree_state_for_replica(const Replica *r, RUV *supplierRuv)
  1221. {
  1222. const Slapi_DN *repl_root_sdn = replica_get_root(r);
  1223. char **ruv_referrals = NULL;
  1224. char **replica_referrals = NULL;
  1225. RUV *ruv;
  1226. int state_backend = -1;
  1227. const char *mtn_state = NULL;
  1228. replica_lock(r->repl_lock);
  1229. if (supplierRuv == NULL) {
  1230. ruv = (RUV *)object_get_data(r->repl_ruv);
  1231. PR_ASSERT(ruv);
  1232. ruv_referrals = ruv_get_referrals(ruv); /* ruv_referrals has to be free'd */
  1233. } else {
  1234. ruv_referrals = ruv_get_referrals(supplierRuv);
  1235. }
  1236. replica_get_referrals_nolock(r, &replica_referrals); /* replica_referrals has to be free'd */
  1237. /* JCMREPL - What if there's a Total update in progress? */
  1238. if (r->repl_type == REPLICA_TYPE_READONLY) {
  1239. state_backend = 0;
  1240. } else if (r->repl_type == REPLICA_TYPE_UPDATABLE) {
  1241. state_backend = 1;
  1242. }
  1243. /* Unlock to avoid changing MTN state under repl lock */
  1244. replica_unlock(r->repl_lock);
  1245. if (state_backend == 0) {
  1246. /* Read-Only - The mapping tree should be refering all update operations. */
  1247. mtn_state = STATE_UPDATE_REFERRAL;
  1248. } else if (state_backend == 1) {
  1249. /* Updatable - The mapping tree should be accepting all update operations. */
  1250. mtn_state = STATE_BACKEND;
  1251. }
  1252. /* JCMREPL - Check the return code. */
  1253. repl_set_mtn_state_and_referrals(repl_root_sdn, mtn_state, NULL,
  1254. ruv_referrals, replica_referrals);
  1255. charray_free(ruv_referrals);
  1256. charray_free(replica_referrals);
  1257. }
  1258. void
  1259. replica_set_enabled(Replica *r, PRBool enable)
  1260. {
  1261. PR_ASSERT(r);
  1262. replica_lock(r->repl_lock);
  1263. if (enable) {
  1264. if (r->repl_eqcxt_rs == NULL) /* event is not already registered */
  1265. {
  1266. r->repl_eqcxt_rs = slapi_eq_repeat(replica_update_state, r->repl_name,
  1267. slapi_current_utc_time() + START_UPDATE_DELAY, RUV_SAVE_INTERVAL);
  1268. }
  1269. } else /* disable */
  1270. {
  1271. if (r->repl_eqcxt_rs) /* event is still registerd */
  1272. {
  1273. slapi_eq_cancel(r->repl_eqcxt_rs);
  1274. r->repl_eqcxt_rs = NULL;
  1275. }
  1276. }
  1277. replica_unlock(r->repl_lock);
  1278. }
  1279. /* This function is generally called when replica's data store
  1280. is reloaded. It retrieves new RUV from the datastore. If new
  1281. RUV does not exist or if it is not as up to date as the purge RUV
  1282. of the corresponding changelog file, we need to remove */
  1283. /* the function minimizes the use of replica lock where ever possible.
  1284. Locking replica lock while calling changelog functions
  1285. causes a deadlock because changelog calls replica functions that
  1286. that lock the same lock */
  1287. int
  1288. replica_reload_ruv(Replica *r)
  1289. {
  1290. int rc = 0;
  1291. Object *old_ruv_obj = NULL, *new_ruv_obj = NULL;
  1292. RUV *upper_bound_ruv = NULL;
  1293. RUV *new_ruv = NULL;
  1294. PR_ASSERT(r);
  1295. replica_lock(r->repl_lock);
  1296. old_ruv_obj = r->repl_ruv;
  1297. r->repl_ruv = NULL;
  1298. rc = _replica_configure_ruv(r, PR_TRUE);
  1299. replica_unlock(r->repl_lock);
  1300. if (rc != 0) {
  1301. return rc;
  1302. }
  1303. /* check if there is a changelog and whether this replica logs changes */
  1304. if (cl5GetState() == CL5_STATE_OPEN && (r->repl_flags & REPLICA_LOG_CHANGES)) {
  1305. /* Compare new ruv to the changelog's upper bound ruv. We could only keep
  1306. the existing changelog if its upper bound is the same as replica's RUV.
  1307. This is because if changelog has changes not in RUV, they will be
  1308. eventually sent to the consumer's which will cause a state mismatch
  1309. (because the supplier does not actually contain the changes in its data store.
  1310. If, on the other hand, the changelog is not as up to date as the supplier,
  1311. it is not really useful since out of sync consumer's can't be brought
  1312. up to date using this changelog and hence will need to be reinitialized */
  1313. /* replace ruv to make sure we work with the correct changelog file */
  1314. replica_lock(r->repl_lock);
  1315. new_ruv_obj = r->repl_ruv;
  1316. r->repl_ruv = old_ruv_obj;
  1317. replica_unlock(r->repl_lock);
  1318. rc = cl5GetUpperBoundRUV(r, &upper_bound_ruv);
  1319. if (rc != CL5_SUCCESS && rc != CL5_NOTFOUND) {
  1320. return -1;
  1321. }
  1322. if (upper_bound_ruv) {
  1323. new_ruv = object_get_data(new_ruv_obj);
  1324. PR_ASSERT(new_ruv);
  1325. /* ONREPL - there are more efficient ways to establish RUV equality.
  1326. However, because this is not in the critical path and we at most
  1327. have 2 elements in the RUV, this will not effect performance */
  1328. if (!ruv_covers_ruv(new_ruv, upper_bound_ruv) ||
  1329. !ruv_covers_ruv(upper_bound_ruv, new_ruv)) {
  1330. /* We can't use existing changelog - remove existing file */
  1331. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "replica_reload_ruv - "
  1332. "New data for replica %s does not match the data in the changelog.\n"
  1333. " Recreating the changelog file. This could affect replication with replica's "
  1334. " consumers in which case the consumers should be reinitialized.\n",
  1335. slapi_sdn_get_dn(r->repl_root));
  1336. /* need to reset changelog db */
  1337. rc = cldb_RemoveReplicaDB(r);
  1338. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1339. "replica_reload_ruv: reset cldb for replica\n");
  1340. /* reinstate new ruv */
  1341. replica_lock(r->repl_lock);
  1342. r->repl_ruv = new_ruv_obj;
  1343. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
  1344. "replica_reload_ruv: set cldb for replica\n");
  1345. cldb_SetReplicaDB(r, NULL);
  1346. if (rc == CL5_SUCCESS) {
  1347. /* log changes to mark starting point for replication */
  1348. rc = replica_log_ruv_elements_nolock(r);
  1349. }
  1350. replica_unlock(r->repl_lock);
  1351. } else {
  1352. /* we just need to reinstate new ruv */
  1353. replica_lock(r->repl_lock);
  1354. r->repl_ruv = new_ruv_obj;
  1355. replica_unlock(r->repl_lock);
  1356. }
  1357. } else /* upper bound vector is not there - we have no changes logged */
  1358. {
  1359. /* reinstate new ruv */
  1360. replica_lock(r->repl_lock);
  1361. r->repl_ruv = new_ruv_obj;
  1362. /* just log elements of the current RUV. This is to have
  1363. a starting point for iteration through the changes */
  1364. rc = replica_log_ruv_elements_nolock(r);
  1365. replica_unlock(r->repl_lock);
  1366. }
  1367. }
  1368. if (rc == 0) {
  1369. consumer5_set_mapping_tree_state_for_replica(r, NULL);
  1370. /* reset mapping tree referrals based on new local RUV */
  1371. }
  1372. if (old_ruv_obj)
  1373. object_release(old_ruv_obj);
  1374. if (upper_bound_ruv)
  1375. ruv_destroy(&upper_bound_ruv);
  1376. return rc;
  1377. }
  1378. /* this function is called during server startup for each replica
  1379. to check whether the replica's data was reloaded offline and
  1380. whether replica's changelog needs to be reinitialized */
  1381. /* the function does not use replica lock but all functions it calls are
  1382. thread safe. Locking replica lock while calling changelog functions
  1383. causes a deadlock because changelog calls replica functions that
  1384. that lock the same lock */
  1385. int
  1386. replica_check_for_data_reload(Replica *r, void *arg __attribute__((unused)))
  1387. {
  1388. int rc = 0;
  1389. RUV *upper_bound_ruv = NULL;
  1390. RUV *r_ruv = NULL;
  1391. Object *ruv_obj;
  1392. PR_ASSERT(r);
  1393. /* check that we have a changelog and if this replica logs changes */
  1394. if (cl5GetState() == CL5_STATE_OPEN && (r->repl_flags & REPLICA_LOG_CHANGES)) {
  1395. /* Compare new ruv to the purge ruv. If the new contains csns which
  1396. are smaller than those in purge ruv, we need to remove old and
  1397. create new changelog file for this replica. This is because we
  1398. will not have sufficient changes to incrementally update a consumer
  1399. to the current state of the supplier. */
  1400. rc = cl5GetUpperBoundRUV(r, &upper_bound_ruv);
  1401. if (rc != CL5_SUCCESS && rc != CL5_NOTFOUND) {
  1402. return -1;
  1403. }
  1404. if (upper_bound_ruv) {
  1405. ruv_obj = replica_get_ruv(r);
  1406. r_ruv = object_get_data(ruv_obj);
  1407. PR_ASSERT(r_ruv);
  1408. /* Compare new ruv to the changelog's upper bound ruv. We could only keep
  1409. the existing changelog if its upper bound is the same as replica's RUV.
  1410. This is because if changelog has changes not in RUV, they will be
  1411. eventually sent to the consumer's which will cause a state mismatch
  1412. (because the supplier does not actually contain the changes in its data store.
  1413. If, on the other hand, the changelog is not as up to date as the supplier,
  1414. it is not really useful since out of sync consumer's can't be brought
  1415. up to date using this changelog and hence will need to be reinitialized */
  1416. /*
  1417. * Actually we can ignore the scenario that the changelog's upper
  1418. * bound ruv covers data store's ruv for two reasons: (1) a change
  1419. * is always written to the changelog after it is committed to the
  1420. * data store; (2) a change will be ignored if the server has seen
  1421. * it before - this happens frequently at the beginning of replication
  1422. * sessions.
  1423. */
  1424. if (slapi_disorderly_shutdown(PR_FALSE)) {
  1425. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "replica_check_for_data_reload - "
  1426. "Disorderly shutdown for replica %s. Check if DB RUV needs to be updated\n",
  1427. slapi_sdn_get_dn(r->repl_root));
  1428. if (ruv_covers_ruv(upper_bound_ruv, r_ruv) && !ruv_covers_ruv(r_ruv, upper_bound_ruv)) {
  1429. /*
  1430. * The Changelog RUV is ahead of the RUV in the DB.
  1431. * RUV DB was likely not flushed on disk.
  1432. */
  1433. ruv_force_csn_update_from_ruv(upper_bound_ruv, r_ruv,
  1434. "Force update of database RUV (from CL RUV) -> ", SLAPI_LOG_NOTICE);
  1435. }
  1436. } else {
  1437. rc = ruv_compare_ruv(upper_bound_ruv, "changelog max RUV", r_ruv, "database RUV", 0, SLAPI_LOG_ERR);
  1438. if (RUV_COMP_IS_FATAL(rc)) {
  1439. /* We can't use existing changelog - remove existing file */
  1440. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "replica_check_for_data_reload - "
  1441. "Data for replica %s does not match the data in the changelog. "
  1442. "Recreating the changelog file. "
  1443. "This could affect replication with replica's consumers in which case the "
  1444. "consumers should be reinitialized.\n",
  1445. slapi_sdn_get_dn(r->repl_root));
  1446. /* need to reset changelog db */
  1447. rc = cldb_RemoveReplicaDB(r);
  1448. cldb_SetReplicaDB(r, NULL);
  1449. if (rc == CL5_SUCCESS) {
  1450. /* log changes to mark starting point for replication */
  1451. rc = replica_log_ruv_elements(r);
  1452. }
  1453. } else if (rc) {
  1454. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "replica_check_for_data_reload - "
  1455. "For replica %s there were some differences between the changelog max RUV and the "
  1456. "database RUV. If there are obsolete elements in the database RUV, you "
  1457. "should remove them using the CLEANALLRUV task. If they are not obsolete, "
  1458. "you should check their status to see why there are no changes from those "
  1459. "servers in the changelog.\n",
  1460. slapi_sdn_get_dn(r->repl_root));
  1461. rc = 0;
  1462. }
  1463. } /* slapi_disorderly_shutdown */
  1464. object_release(ruv_obj);
  1465. } else /* we have no changes currently logged for this replica */
  1466. {
  1467. /* log changes to mark starting point for replication */
  1468. rc = replica_log_ruv_elements(r);
  1469. }
  1470. }
  1471. if (rc == 0) {
  1472. /* reset mapping tree referrals based on new local RUV */
  1473. consumer5_set_mapping_tree_state_for_replica(r, NULL);
  1474. }
  1475. if (upper_bound_ruv)
  1476. ruv_destroy(&upper_bound_ruv);
  1477. return rc;
  1478. }
  1479. /* Helper functions */
  1480. /* reads replica configuration entry. The entry is the child of the
  1481. mapping tree node for the replica's backend */
  1482. static Slapi_Entry *
  1483. _replica_get_config_entry(const Slapi_DN *root, const char **attrs)
  1484. {
  1485. int rc = 0;
  1486. char *dn = NULL;
  1487. Slapi_Entry **entries;
  1488. Slapi_Entry *e = NULL;
  1489. Slapi_PBlock *pb = NULL;
  1490. dn = _replica_get_config_dn(root);
  1491. if (NULL == dn) {
  1492. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  1493. "_replica_get_config_entry - Failed to get the config dn for %s\n",
  1494. slapi_sdn_get_dn(root));
  1495. return NULL;
  1496. }
  1497. pb = slapi_pblock_new();
  1498. slapi_search_internal_set_pb(pb, dn, LDAP_SCOPE_BASE, "objectclass=*", (char **)attrs, 0, NULL,
  1499. NULL, repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  1500. slapi_search_internal_pb(pb);
  1501. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
  1502. if (rc == 0) {
  1503. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
  1504. e = slapi_entry_dup(entries[0]);
  1505. }
  1506. slapi_free_search_results_internal(pb);
  1507. slapi_pblock_destroy(pb);
  1508. slapi_ch_free_string(&dn);
  1509. return e;
  1510. }
  1511. /* It does an internal search to read the in memory RUV
  1512. * of the provided suffix
  1513. */
  1514. Slapi_Entry *
  1515. get_in_memory_ruv(Slapi_DN *suffix_sdn)
  1516. {
  1517. const char *attrs[4];
  1518. /* these two attributes needs to be asked when reading the RUV */
  1519. attrs[0] = type_ruvElement;
  1520. attrs[1] = type_ruvElementUpdatetime;
  1521. attrs[2] = type_agmtMaxCSN;
  1522. attrs[3] = NULL;
  1523. return (_replica_get_config_entry(suffix_sdn, attrs));
  1524. }
  1525. char *
  1526. replica_get_dn(Replica *r)
  1527. {
  1528. return _replica_get_config_dn(r->repl_root);
  1529. }
  1530. static int
  1531. _replica_check_validity(const Replica *r)
  1532. {
  1533. PR_ASSERT(r);
  1534. if (r->repl_root == NULL || r->repl_type == 0 || r->repl_rid == 0 ||
  1535. r->repl_csngen == NULL || r->repl_name == NULL) {
  1536. return LDAP_OTHER;
  1537. } else {
  1538. return LDAP_SUCCESS;
  1539. }
  1540. }
  1541. /* replica configuration entry has the following format:
  1542. dn: cn=replica,<mapping tree node dn>
  1543. objectclass: top
  1544. objectclass: nsds5Replica
  1545. objectclass: extensibleObject
  1546. nsds5ReplicaRoot: <root of the replica>
  1547. nsds5ReplicaId: <replica id>
  1548. nsds5ReplicaType: <type of the replica: primary, read-write or read-only>
  1549. nsState: <state of the csn generator> missing the first time replica is started
  1550. nsds5ReplicaBindDN: <supplier update dn> consumers only
  1551. nsds5ReplicaBindDNGroup: group, containing replicaBindDNs
  1552. nsds5ReplicaBindDNGroupCheckInterval: defines how frequently to check for update of bindGroup
  1553. nsds5ReplicaReferral: <referral URL to updatable replica> consumers only
  1554. nsds5ReplicaPurgeDelay: <time, in seconds, to keep purgeable CSNs, 0 == keep forever>
  1555. nsds5ReplicaTombstonePurgeInterval: <time, in seconds, between tombstone purge runs, 0 == don't reap>
  1556. richm: changed slapi entry from const to editable - if the replica id is supplied for a read
  1557. only replica, we ignore it and replace the value with the READ_ONLY_REPLICA_ID
  1558. */
  1559. static int
  1560. _replica_init_from_config(Replica *r, Slapi_Entry *e, char *errortext)
  1561. {
  1562. Slapi_Attr *attr;
  1563. CSNGen *gen;
  1564. char *precise_purging = NULL;
  1565. char buf[SLAPI_DSE_RETURNTEXT_SIZE];
  1566. char *errormsg = errortext ? errortext : buf;
  1567. char *val;
  1568. int64_t backoff_min;
  1569. int64_t backoff_max;
  1570. int64_t ptimeout = 0;
  1571. int64_t release_timeout = 0;
  1572. int64_t interval = 0;
  1573. int64_t rtype = 0;
  1574. int rc;
  1575. PR_ASSERT(r && e);
  1576. /* get replica root */
  1577. val = slapi_entry_attr_get_charptr(e, attr_replicaRoot);
  1578. if (val == NULL) {
  1579. PR_snprintf(errormsg, SLAPI_DSE_RETURNTEXT_SIZE, "Failed to retrieve %s attribute from (%s)",
  1580. attr_replicaRoot,
  1581. (char *)slapi_entry_get_dn((Slapi_Entry *)e));
  1582. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_replica_init_from_config - %s\n",
  1583. errormsg);
  1584. return LDAP_OTHER;
  1585. }
  1586. r->repl_root = slapi_sdn_new_dn_passin(val);
  1587. /* get replica type */
  1588. if (slapi_entry_attr_exists(e, attr_replicaType)) {
  1589. if ((val = (char*)slapi_entry_attr_get_ref(e, attr_replicaType))) {
  1590. if (repl_config_valid_num(attr_replicaType, (char *)val, 0, REPLICA_TYPE_UPDATABLE, &rc, errormsg, &rtype) != 0) {
  1591. return LDAP_UNWILLING_TO_PERFORM;
  1592. }
  1593. r->repl_type = rtype;
  1594. } else {
  1595. r->repl_type = REPLICA_TYPE_READONLY;
  1596. }
  1597. } else {
  1598. r->repl_type = REPLICA_TYPE_READONLY;
  1599. }
  1600. /* grab and validate the backoff min retry settings */
  1601. if (slapi_entry_attr_exists(e, type_replicaBackoffMin)) {
  1602. if ((val = (char*)slapi_entry_attr_get_ref(e, type_replicaBackoffMin))) {
  1603. if (repl_config_valid_num(type_replicaBackoffMin, val, 1, INT_MAX, &rc, errormsg, &backoff_min) != 0) {
  1604. return LDAP_UNWILLING_TO_PERFORM;
  1605. }
  1606. } else {
  1607. backoff_min = PROTOCOL_BACKOFF_MINIMUM;
  1608. }
  1609. } else {
  1610. backoff_min = PROTOCOL_BACKOFF_MINIMUM;
  1611. }
  1612. /* grab and validate the backoff max retry settings */
  1613. if (slapi_entry_attr_exists(e, type_replicaBackoffMax)) {
  1614. if ((val = (char*)slapi_entry_attr_get_ref(e, type_replicaBackoffMax))) {
  1615. if (repl_config_valid_num(type_replicaBackoffMax, val, 1, INT_MAX, &rc, errormsg, &backoff_max) != 0) {
  1616. return LDAP_UNWILLING_TO_PERFORM;
  1617. }
  1618. } else {
  1619. backoff_max = PROTOCOL_BACKOFF_MAXIMUM;
  1620. }
  1621. } else {
  1622. backoff_max = PROTOCOL_BACKOFF_MAXIMUM;
  1623. }
  1624. /* Verify backoff min and max work together */
  1625. if (backoff_min > backoff_max) {
  1626. PR_snprintf(errormsg, SLAPI_DSE_RETURNTEXT_SIZE,
  1627. "Backoff minimum (%" PRId64 ") can not be greater than the backoff maximum (%" PRId64 ").",
  1628. backoff_min, backoff_max);
  1629. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_replica_init_from_config - "
  1630. "%s\n", errormsg);
  1631. return LDAP_UNWILLING_TO_PERFORM;
  1632. } else {
  1633. slapi_counter_set_value(r->backoff_min, backoff_min);
  1634. slapi_counter_set_value(r->backoff_max, backoff_max);
  1635. }
  1636. /* get the protocol timeout */
  1637. if (slapi_entry_attr_exists(e, type_replicaProtocolTimeout)) {
  1638. if ((val = (char*)slapi_entry_attr_get_ref(e, type_replicaProtocolTimeout))) {
  1639. if (repl_config_valid_num(type_replicaProtocolTimeout, val, 0, INT_MAX, &rc, errormsg, &ptimeout) != 0) {
  1640. return LDAP_UNWILLING_TO_PERFORM;
  1641. }
  1642. slapi_counter_set_value(r->protocol_timeout, ptimeout);
  1643. } else {
  1644. slapi_counter_set_value(r->protocol_timeout, DEFAULT_PROTOCOL_TIMEOUT);
  1645. }
  1646. } else {
  1647. slapi_counter_set_value(r->protocol_timeout, DEFAULT_PROTOCOL_TIMEOUT);
  1648. }
  1649. /* Get the release timeout */
  1650. if (slapi_entry_attr_exists(e, type_replicaReleaseTimeout)) {
  1651. if ((val = (char*)slapi_entry_attr_get_ref(e, type_replicaReleaseTimeout))) {
  1652. if (repl_config_valid_num(type_replicaReleaseTimeout, val, 0, INT_MAX, &rc, errortext, &release_timeout) != 0) {
  1653. return LDAP_UNWILLING_TO_PERFORM;
  1654. }
  1655. slapi_counter_set_value(r->release_timeout, release_timeout);
  1656. } else {
  1657. slapi_counter_set_value(r->release_timeout, 0);
  1658. }
  1659. } else {
  1660. slapi_counter_set_value(r->release_timeout, 0);
  1661. }
  1662. /* check for precise tombstone purging */
  1663. precise_purging = (char*)slapi_entry_attr_get_ref(e, type_replicaPrecisePurge);
  1664. if (precise_purging) {
  1665. if (strcasecmp(precise_purging, "on") == 0) {
  1666. slapi_counter_set_value(r->precise_purging, 1);
  1667. } else if (strcasecmp(precise_purging, "off") == 0) {
  1668. slapi_counter_set_value(r->precise_purging, 0);
  1669. } else {
  1670. /* Invalid value */
  1671. PR_snprintf(errormsg, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid value for %s: %s",
  1672. type_replicaPrecisePurge, precise_purging);
  1673. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_replica_init_from_config - "
  1674. "%s\n", errormsg);
  1675. return LDAP_UNWILLING_TO_PERFORM;
  1676. }
  1677. } else {
  1678. slapi_counter_set_value(r->precise_purging, 0);
  1679. }
  1680. /* get replica flags */
  1681. if (slapi_entry_attr_exists(e, attr_flags)) {
  1682. int64_t rflags;
  1683. if((val = (char*)slapi_entry_attr_get_ref(e, attr_flags))) {
  1684. if (repl_config_valid_num(attr_flags, val, 0, 1, &rc, errortext, &rflags) != 0) {
  1685. return LDAP_UNWILLING_TO_PERFORM;
  1686. }
  1687. r->repl_flags = (uint32_t)rflags;
  1688. } else {
  1689. r->repl_flags = 0;
  1690. }
  1691. } else {
  1692. r->repl_flags = 0;
  1693. }
  1694. /*
  1695. * Get replicaid
  1696. * The replica id is ignored for read only replicas and is set to the
  1697. * special value READ_ONLY_REPLICA_ID
  1698. */
  1699. if (r->repl_type == REPLICA_TYPE_READONLY) {
  1700. r->repl_rid = READ_ONLY_REPLICA_ID;
  1701. slapi_entry_attr_set_uint(e, attr_replicaId, (unsigned int)READ_ONLY_REPLICA_ID);
  1702. }
  1703. /* a replica id is required for updatable and primary replicas */
  1704. else if (r->repl_type == REPLICA_TYPE_UPDATABLE ||
  1705. r->repl_type == REPLICA_TYPE_PRIMARY) {
  1706. if ((val = (char*)slapi_entry_attr_get_ref(e, attr_replicaId))) {
  1707. int64_t rid;
  1708. if (repl_config_valid_num(attr_replicaId, val, 1, 65534, &rc, errormsg, &rid) != 0) {
  1709. return LDAP_UNWILLING_TO_PERFORM;
  1710. }
  1711. r->repl_rid = (ReplicaId)rid;
  1712. } else {
  1713. PR_snprintf(errormsg, SLAPI_DSE_RETURNTEXT_SIZE,
  1714. "Failed to retrieve required %s attribute from %s",
  1715. attr_replicaId, (char *)slapi_entry_get_dn((Slapi_Entry *)e));
  1716. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  1717. "_replica_init_from_config - %s\n", errormsg);
  1718. return LDAP_OTHER;
  1719. }
  1720. }
  1721. attr = NULL;
  1722. rc = slapi_entry_attr_find(e, attr_state, &attr);
  1723. gen = csngen_new(r->repl_rid, attr);
  1724. if (gen == NULL) {
  1725. PR_snprintf(errormsg, SLAPI_DSE_RETURNTEXT_SIZE,
  1726. "Failed to create csn generator for replica (%s)",
  1727. (char *)slapi_entry_get_dn((Slapi_Entry *)e));
  1728. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  1729. "_replica_init_from_config - %s\n", errormsg);
  1730. return LDAP_OTHER;
  1731. }
  1732. r->repl_csngen = object_new((void *)gen, (FNFree)csngen_free);
  1733. /* Hook generator so we can maintain min/max CSN info */
  1734. r->csn_pl_reg_id = csngen_register_callbacks(gen, assign_csn_callback, r, abort_csn_callback, r);
  1735. /* get replication bind dn */
  1736. r->updatedn_list = replica_updatedn_list_new(e);
  1737. /* get replication bind dn groups */
  1738. r->updatedn_groups = replica_updatedn_group_new(e);
  1739. r->groupdn_list = replica_groupdn_list_new(r->updatedn_groups);
  1740. r->updatedn_group_last_check = 0;
  1741. /* get groupdn check interval */
  1742. if ((val = (char*)slapi_entry_attr_get_ref(e, attr_replicaBindDnGroupCheckInterval))) {
  1743. if (repl_config_valid_num(attr_replicaBindDnGroupCheckInterval, val, -1, INT_MAX, &rc, errormsg, &interval) != 0) {
  1744. return LDAP_UNWILLING_TO_PERFORM;
  1745. }
  1746. r->updatedn_group_check_interval = interval;
  1747. } else {
  1748. r->updatedn_group_check_interval = -1;
  1749. }
  1750. /* get replica name */
  1751. val = slapi_entry_attr_get_charptr(e, attr_replicaName);
  1752. if (val) {
  1753. r->repl_name = val;
  1754. } else {
  1755. rc = slapi_uniqueIDGenerateString(&r->repl_name);
  1756. if (rc != UID_SUCCESS) {
  1757. PR_snprintf(errormsg, SLAPI_DSE_RETURNTEXT_SIZE,
  1758. "Failed to assign replica name for replica (%s); uuid generator error - %d ",
  1759. (char *)slapi_entry_get_dn((Slapi_Entry *)e), rc);
  1760. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_replica_init_from_config - %s\n",
  1761. errormsg);
  1762. return LDAP_OTHER;
  1763. } else
  1764. r->new_name = PR_TRUE;
  1765. }
  1766. /* get the list of referrals */
  1767. slapi_entry_attr_find(e, attr_replicaReferral, &attr);
  1768. if (attr != NULL) {
  1769. slapi_attr_get_valueset(attr, &r->repl_referral);
  1770. }
  1771. /*
  1772. * Set the purge offset (default 7 days). This is the extra
  1773. * time we allow purgeable CSNs to stick around, in case a
  1774. * replica regresses. Could also be useful when LCUP happens,
  1775. * since we don't know about LCUP replicas, and they can just
  1776. * turn up whenever they want to.
  1777. */
  1778. if ((val = (char*)slapi_entry_attr_get_ref(e, type_replicaPurgeDelay))) {
  1779. if (repl_config_valid_num(type_replicaPurgeDelay, val, -1, INT_MAX, &rc, errormsg, &interval) != 0) {
  1780. return LDAP_UNWILLING_TO_PERFORM;
  1781. }
  1782. r->repl_purge_delay = interval;
  1783. } else {
  1784. r->repl_purge_delay = 60 * 60 * 24 * 7; /* One week, in seconds */
  1785. }
  1786. if ((val = (char*)slapi_entry_attr_get_ref(e, type_replicaTombstonePurgeInterval))) {
  1787. if (repl_config_valid_num(type_replicaTombstonePurgeInterval, val, -1, INT_MAX, &rc, errormsg, &interval) != 0) {
  1788. return LDAP_UNWILLING_TO_PERFORM;
  1789. }
  1790. r->tombstone_reap_interval = interval;
  1791. } else {
  1792. r->tombstone_reap_interval = 3600 * 24; /* One week, in seconds */
  1793. }
  1794. r->tombstone_reap_stop = r->tombstone_reap_active = PR_FALSE;
  1795. /* No supplier holding the replica */
  1796. r->locking_conn = ULONG_MAX;
  1797. return (_replica_check_validity(r));
  1798. }
  1799. static void
  1800. replica_delete_task_config(Slapi_Entry *e, char *attr, char *value)
  1801. {
  1802. Slapi_PBlock *modpb;
  1803. struct berval *vals[2];
  1804. struct berval val[1];
  1805. LDAPMod *mods[2];
  1806. LDAPMod mod;
  1807. int32_t rc = 0;
  1808. val[0].bv_len = strlen(value);
  1809. val[0].bv_val = value;
  1810. vals[0] = &val[0];
  1811. vals[1] = NULL;
  1812. mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES;
  1813. mod.mod_type = attr;
  1814. mod.mod_bvalues = vals;
  1815. mods[0] = &mod;
  1816. mods[1] = NULL;
  1817. modpb = slapi_pblock_new();
  1818. slapi_modify_internal_set_pb(modpb, slapi_entry_get_dn(e), mods, NULL, NULL,
  1819. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  1820. slapi_modify_internal_pb(modpb);
  1821. slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
  1822. slapi_pblock_destroy(modpb);
  1823. if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) {
  1824. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  1825. "delete_cleaned_rid_config - Failed to remove task data from (%s) error (%d)\n",
  1826. slapi_entry_get_dn(e), rc);
  1827. }
  1828. }
  1829. void
  1830. replica_check_for_tasks(time_t when __attribute__((unused)), void *arg)
  1831. {
  1832. const Slapi_DN *repl_root = (Slapi_DN *)arg;
  1833. Slapi_Entry *e = NULL;
  1834. Replica *r = NULL;
  1835. char **clean_vals;
  1836. e = _replica_get_config_entry(repl_root, NULL);
  1837. r = replica_get_replica_from_dn(repl_root);
  1838. if (e == NULL || r == NULL || ldif_dump_is_running() == PR_TRUE) {
  1839. /* If db2ldif is being run, do not check if there are incomplete tasks */
  1840. return;
  1841. }
  1842. /*
  1843. * check if we are in the middle of a CLEANALLRUV task,
  1844. * if so set the cleaned rid, and fire off the thread
  1845. */
  1846. if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaCleanRUV)) != NULL) {
  1847. for (size_t i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++) {
  1848. struct timespec ts = slapi_current_rel_time_hr();
  1849. PRBool original_task = PR_TRUE;
  1850. Slapi_Entry *task_entry = NULL;
  1851. Slapi_PBlock *add_pb = NULL;
  1852. int32_t result = 0;
  1853. ReplicaId rid;
  1854. char *token = NULL;
  1855. char *forcing;
  1856. char *iter = NULL;
  1857. char *repl_root = NULL;
  1858. char *ridstr = NULL;
  1859. char *rdn = NULL;
  1860. char *dn = NULL;
  1861. char *orig_val = slapi_ch_strdup(clean_vals[i]);
  1862. /*
  1863. * Get all the needed from
  1864. */
  1865. token = ldap_utf8strtok_r(clean_vals[i], ":", &iter);
  1866. if (token) {
  1867. rid = atoi(token);
  1868. if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID) {
  1869. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "
  1870. "Invalid replica id(%d) aborting task. Aborting cleaning task!\n", rid);
  1871. replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);
  1872. goto done;
  1873. }
  1874. } else {
  1875. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "
  1876. "Unable to parse cleanallruv data (%s), missing rid, aborting task. Aborting cleaning task!\n",
  1877. clean_vals[i]);
  1878. replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);
  1879. goto done;
  1880. }
  1881. /* Get forcing */
  1882. forcing = ldap_utf8strtok_r(iter, ":", &iter);
  1883. if (forcing == NULL || strlen(forcing) > 3) {
  1884. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "
  1885. "Unable to parse cleanallruv data (%s), missing/invalid force option (%s). Aborting cleaning task!\n",
  1886. clean_vals[i], forcing ? forcing : "missing force option");
  1887. replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);
  1888. goto done;
  1889. }
  1890. /* Get original task flag */
  1891. token = ldap_utf8strtok_r(iter, ":", &iter);
  1892. if (token) {
  1893. if (!atoi(token)) {
  1894. original_task = PR_FALSE;
  1895. }
  1896. } else {
  1897. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "
  1898. "Unable to parse cleanallruv data (%s), missing original task flag. Aborting cleaning task!\n",
  1899. clean_vals[i]);
  1900. replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);
  1901. goto done;
  1902. }
  1903. /* Get repl root */
  1904. token = ldap_utf8strtok_r(iter, ":", &iter);
  1905. if (token) {
  1906. repl_root = token;
  1907. } else {
  1908. /* no repl root, have to void task */
  1909. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "
  1910. "Unable to parse cleanallruv data (%s), missing replication root aborting task. Aborting cleaning task!\n",
  1911. clean_vals[i]);
  1912. replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);
  1913. goto done;
  1914. }
  1915. /*
  1916. * We have all our data, now add the task....
  1917. */
  1918. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "CleanAllRUV Task - "
  1919. "CleanAllRUV task found, resuming the cleaning of rid(%d)...\n", rid);
  1920. add_pb = slapi_pblock_new();
  1921. task_entry = slapi_entry_alloc();
  1922. rdn = slapi_ch_smprintf("restarted-%ld", ts.tv_sec);
  1923. dn = slapi_create_dn_string("cn=%s,cn=cleanallruv, cn=tasks, cn=config", rdn, ts.tv_sec);
  1924. slapi_entry_init(task_entry, dn, NULL);
  1925. ridstr = slapi_ch_smprintf("%d", rid);
  1926. slapi_entry_add_string(task_entry, "objectclass", "top");
  1927. slapi_entry_add_string(task_entry, "objectclass", "extensibleObject");
  1928. slapi_entry_add_string(task_entry, "cn", rdn);
  1929. slapi_entry_add_string(task_entry, "replica-base-dn", repl_root);
  1930. slapi_entry_add_string(task_entry, "replica-id", ridstr);
  1931. slapi_entry_add_string(task_entry, "replica-force-cleaning", forcing);
  1932. slapi_entry_add_string(task_entry, "replica-original-task", original_task ? "1" : "0");
  1933. slapi_add_entry_internal_set_pb(add_pb, task_entry, NULL,
  1934. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  1935. slapi_add_internal_pb(add_pb);
  1936. slapi_pblock_get(add_pb, SLAPI_PLUGIN_INTOP_RESULT, &result);
  1937. slapi_pblock_destroy(add_pb);
  1938. if (result != LDAP_SUCCESS) {
  1939. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  1940. "replica_check_for_tasks - failed to add cleanallruv task entry: %s\n",
  1941. ldap_err2string(result));
  1942. }
  1943. done:
  1944. slapi_ch_free_string(&orig_val);
  1945. slapi_ch_free_string(&ridstr);
  1946. slapi_ch_free_string(&rdn);
  1947. }
  1948. slapi_ch_array_free(clean_vals);
  1949. }
  1950. if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaAbortCleanRUV)) != NULL) {
  1951. for (size_t i = 0; clean_vals[i]; i++) {
  1952. struct timespec ts = slapi_current_rel_time_hr();
  1953. PRBool original_task = PR_TRUE;
  1954. Slapi_Entry *task_entry = NULL;
  1955. Slapi_PBlock *add_pb = NULL;
  1956. ReplicaId rid;
  1957. char *certify = NULL;
  1958. char *ridstr = NULL;
  1959. char *token = NULL;
  1960. char *repl_root;
  1961. char *iter = NULL;
  1962. char *rdn = NULL;
  1963. char *dn = NULL;
  1964. char *orig_val = slapi_ch_strdup(clean_vals[i]);
  1965. int32_t result = 0;
  1966. token = ldap_utf8strtok_r(clean_vals[i], ":", &iter);
  1967. if (token) {
  1968. rid = atoi(token);
  1969. if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID) {
  1970. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - "
  1971. "Invalid replica id(%d) aborting abort task.\n", rid);
  1972. replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);
  1973. goto done2;
  1974. }
  1975. } else {
  1976. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - "
  1977. "Unable to parse cleanallruv data (%s), aborting abort task.\n", clean_vals[i]);
  1978. replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);
  1979. goto done2;
  1980. }
  1981. repl_root = ldap_utf8strtok_r(iter, ":", &iter);
  1982. certify = ldap_utf8strtok_r(iter, ":", &iter);
  1983. /* Get original task flag */
  1984. token = ldap_utf8strtok_r(iter, ":", &iter);
  1985. if (token) {
  1986. if (!atoi(token)) {
  1987. original_task = PR_FALSE;
  1988. }
  1989. } else {
  1990. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  1991. "Abort CleanAllRUV Task - Unable to parse cleanallruv data (%s), "
  1992. "missing original task flag. Aborting abort task!\n",
  1993. clean_vals[i]);
  1994. replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);
  1995. goto done2;
  1996. }
  1997. if (!is_cleaned_rid(rid)) {
  1998. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - "
  1999. "Replica id(%d) is not being cleaned, nothing to abort. Aborting abort task.\n", rid);
  2000. replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);
  2001. goto done2;
  2002. }
  2003. add_aborted_rid(rid, r, repl_root, certify, original_task);
  2004. stop_ruv_cleaning();
  2005. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - "
  2006. "Abort task found, resuming abort of rid(%d).\n", rid);
  2007. add_pb = slapi_pblock_new();
  2008. task_entry = slapi_entry_alloc();
  2009. rdn = slapi_ch_smprintf("restarted-abort-%ld", ts.tv_sec);
  2010. dn = slapi_create_dn_string("cn=%s,cn=abort cleanallruv, cn=tasks, cn=config", rdn, ts.tv_sec);
  2011. slapi_entry_init(task_entry, dn, NULL);
  2012. ridstr = slapi_ch_smprintf("%d", rid);
  2013. slapi_entry_add_string(task_entry, "objectclass", "top");
  2014. slapi_entry_add_string(task_entry, "objectclass", "extensibleObject");
  2015. slapi_entry_add_string(task_entry, "cn", rdn);
  2016. slapi_entry_add_string(task_entry, "replica-base-dn", repl_root);
  2017. slapi_entry_add_string(task_entry, "replica-id", ridstr);
  2018. slapi_entry_add_string(task_entry, "replica-certify-all", certify);
  2019. slapi_entry_add_string(task_entry, "replica-original-task", original_task ? "1" : "0");
  2020. slapi_add_entry_internal_set_pb(add_pb, task_entry, NULL,
  2021. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  2022. slapi_add_internal_pb(add_pb);
  2023. slapi_pblock_get(add_pb, SLAPI_PLUGIN_INTOP_RESULT, &result);
  2024. slapi_pblock_destroy(add_pb);
  2025. if (result != LDAP_SUCCESS) {
  2026. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2027. "replica_check_for_tasks - failed to add cleanallruv abort task entry: %s\n",
  2028. ldap_err2string(result));
  2029. }
  2030. done2:
  2031. slapi_ch_free_string(&orig_val);
  2032. slapi_ch_free_string(&ridstr);
  2033. slapi_ch_free_string(&rdn);
  2034. }
  2035. slapi_ch_array_free(clean_vals);
  2036. }
  2037. slapi_entry_free(e);
  2038. }
  2039. /* This function updates the entry to contain information generated
  2040. during replica initialization.
  2041. Returns 0 if successful and -1 otherwise */
  2042. static int
  2043. _replica_update_entry(Replica *r, Slapi_Entry *e, char *errortext)
  2044. {
  2045. int rc;
  2046. Slapi_Mod smod;
  2047. Slapi_Value *val;
  2048. PR_ASSERT(r);
  2049. /* add attribute that stores state of csn generator */
  2050. rc = csngen_get_state((CSNGen *)object_get_data(r->repl_csngen), &smod);
  2051. if (rc != CSN_SUCCESS) {
  2052. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "Failed to get csn generator's state; csn error - %d", rc);
  2053. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2054. "_replica_update_entry - %s\n", errortext);
  2055. return -1;
  2056. }
  2057. val = slapi_value_new_berval(slapi_mod_get_first_value(&smod));
  2058. rc = slapi_entry_add_value(e, slapi_mod_get_type(&smod), val);
  2059. slapi_value_free(&val);
  2060. slapi_mod_done(&smod);
  2061. if (rc != 0) {
  2062. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "Failed to update replica entry");
  2063. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2064. "_replica_update_entry - %s\n", errortext);
  2065. return -1;
  2066. }
  2067. /* add attribute that stores replica name */
  2068. rc = slapi_entry_add_string(e, attr_replicaName, r->repl_name);
  2069. if (rc != 0) {
  2070. PR_snprintf(errortext, SLAPI_DSE_RETURNTEXT_SIZE, "Failed to update replica entry");
  2071. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2072. "_replica_update_entry - %s\n", errortext);
  2073. return -1;
  2074. } else
  2075. r->new_name = PR_FALSE;
  2076. return 0;
  2077. }
  2078. /* DN format: cn=replica,cn=\"<root>\",cn=mapping tree,cn=config */
  2079. static char *
  2080. _replica_get_config_dn(const Slapi_DN *root)
  2081. {
  2082. char *dn;
  2083. /* "cn=mapping tree,cn=config" */
  2084. const char *mp_base = slapi_get_mapping_tree_config_root();
  2085. PR_ASSERT(root);
  2086. /* This function converts the old style DN to the new style. */
  2087. dn = slapi_ch_smprintf("%s,cn=\"%s\",%s",
  2088. REPLICA_RDN, slapi_sdn_get_dn(root), mp_base);
  2089. return dn;
  2090. }
  2091. /* when a replica is added the changelog config entry is created
  2092. * it will only the container entry, specifications for trimming
  2093. * or encyrption need to be added separately
  2094. */
  2095. static int
  2096. _replica_config_changelog(Replica *replica)
  2097. {
  2098. int rc = 0;
  2099. Slapi_Backend *be = slapi_be_select(replica_get_root(replica));
  2100. Slapi_Entry *config_entry = slapi_entry_alloc();
  2101. slapi_entry_init(config_entry, slapi_ch_strdup("cn=changelog"), NULL);
  2102. slapi_entry_add_string(config_entry, "objectclass", "top");
  2103. slapi_entry_add_string(config_entry, "objectclass", "extensibleObject");
  2104. rc = slapi_back_ctrl_info(be, BACK_INFO_CLDB_SET_CONFIG, (void *)config_entry);
  2105. return rc;
  2106. }
  2107. /* This function retrieves RUV from the root of the replicated tree.
  2108. * The attribute can be missing if
  2109. * (1) this replica is the first supplier and replica generation has not been assigned
  2110. * or
  2111. * (2) this is a consumer that has not been yet initialized
  2112. * In either case, replica_set_ruv should be used to further initialize the replica.
  2113. * Returns 0 on success, -1 on failure. If 0 is returned, the RUV is present in the replica.
  2114. */
  2115. static int
  2116. _replica_configure_ruv(Replica *r, PRBool isLocked __attribute__((unused)))
  2117. {
  2118. Slapi_PBlock *pb = NULL;
  2119. char *attrs[2];
  2120. int rc;
  2121. int return_value = -1;
  2122. Slapi_Entry **entries = NULL;
  2123. Slapi_Attr *attr;
  2124. RUV *ruv = NULL;
  2125. CSN *csn = NULL;
  2126. ReplicaId rid = 0;
  2127. /* read ruv state from the ruv tombstone entry */
  2128. pb = slapi_pblock_new();
  2129. if (!pb) {
  2130. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2131. "_replica_configure_ruv - Out of memory\n");
  2132. goto done;
  2133. }
  2134. attrs[0] = (char *)type_ruvElement;
  2135. attrs[1] = NULL;
  2136. slapi_search_internal_set_pb(
  2137. pb,
  2138. slapi_sdn_get_dn(r->repl_root),
  2139. LDAP_SCOPE_BASE,
  2140. "objectclass=*",
  2141. attrs,
  2142. 0, /* attrsonly */
  2143. NULL, /* controls */
  2144. RUV_STORAGE_ENTRY_UNIQUEID,
  2145. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
  2146. OP_FLAG_REPLICATED); /* flags */
  2147. slapi_search_internal_pb(pb);
  2148. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
  2149. if (rc == LDAP_SUCCESS) {
  2150. /* get RUV attributes and construct the RUV */
  2151. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
  2152. if (NULL == entries || NULL == entries[0]) {
  2153. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2154. "_replica_configure_ruv - Replica ruv tombstone entry for "
  2155. "replica %s not found\n",
  2156. slapi_sdn_get_dn(r->repl_root));
  2157. goto done;
  2158. }
  2159. rc = slapi_entry_attr_find(entries[0], type_ruvElement, &attr);
  2160. if (rc != 0) /* ruv attribute is missing - this not allowed */
  2161. {
  2162. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2163. "_replica_configure_ruv - Replica ruv tombstone entry for "
  2164. "replica %s does not contain %s\n",
  2165. slapi_sdn_get_dn(r->repl_root), type_ruvElement);
  2166. goto done;
  2167. }
  2168. /* Check in the tombstone we have retrieved if the local purl is
  2169. already present:
  2170. rid == 0: the local purl is not present
  2171. rid != 0: the local purl is present ==> nothing to do
  2172. */
  2173. ruv_init_from_slapi_attr_and_check_purl(attr, &ruv, &rid);
  2174. if (ruv) {
  2175. char *generation = NULL;
  2176. generation = ruv_get_replica_generation(ruv);
  2177. if (NULL != generation) {
  2178. r->repl_ruv = object_new((void *)ruv, (FNFree)ruv_destroy);
  2179. /* Is the local purl in the ruv? (the port or the host could have
  2180. changed)
  2181. */
  2182. /* A consumer only doesn't have its purl in its ruv */
  2183. if (r->repl_type == REPLICA_TYPE_UPDATABLE) {
  2184. int need_update = 0;
  2185. #define RUV_UPDATE_PARTIAL 1
  2186. #define RUV_UPDATE_FULL 2
  2187. if (rid == 0) {
  2188. /* We can not have more than 1 ruv with the same rid
  2189. so we replace it */
  2190. const char *purl = NULL;
  2191. purl = multimaster_get_local_purl();
  2192. ruv_delete_replica(ruv, r->repl_rid);
  2193. ruv_add_index_replica(ruv, r->repl_rid, purl, 1);
  2194. need_update = RUV_UPDATE_PARTIAL; /* ruv changed, so write tombstone */
  2195. } else /* bug 540844: make sure the local supplier rid is first in the ruv */
  2196. {
  2197. /* make sure local supplier is first in list */
  2198. ReplicaId first_rid = 0;
  2199. char *first_purl = NULL;
  2200. ruv_get_first_id_and_purl(ruv, &first_rid, &first_purl);
  2201. /* if the local supplier is not first in the list . . . */
  2202. /* rid is from changelog; first_rid is from backend */
  2203. if (rid != first_rid) {
  2204. /* . . . move the local supplier to the beginning of the list */
  2205. ruv_move_local_supplier_to_first(ruv, rid);
  2206. need_update = RUV_UPDATE_PARTIAL; /* must update tombstone also */
  2207. }
  2208. /* r->repl_rid is from config; rid is from changelog */
  2209. if (r->repl_rid != rid) {
  2210. /* Most likely, the replica was once deleted
  2211. * and recreated with a different rid from the
  2212. * previous. */
  2213. /* must recreate ruv tombstone */
  2214. need_update = RUV_UPDATE_FULL;
  2215. if (NULL != r->repl_ruv) {
  2216. object_release(r->repl_ruv);
  2217. r->repl_ruv = NULL;
  2218. }
  2219. }
  2220. }
  2221. /* Update also the directory entry */
  2222. if (RUV_UPDATE_PARTIAL == need_update) {
  2223. replica_replace_ruv_tombstone(r);
  2224. } else if (RUV_UPDATE_FULL == need_update) {
  2225. _delete_tombstone(slapi_sdn_get_dn(r->repl_root),
  2226. RUV_STORAGE_ENTRY_UNIQUEID,
  2227. OP_FLAG_REPL_RUV);
  2228. rc = replica_create_ruv_tombstone(r);
  2229. if (rc) {
  2230. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2231. "_replica_configure_ruv - "
  2232. "Failed to recreate replica ruv tombstone entry"
  2233. " (%s); LDAP error - %d\n",
  2234. slapi_sdn_get_dn(r->repl_root), rc);
  2235. slapi_ch_free_string(&generation);
  2236. goto done;
  2237. }
  2238. }
  2239. #undef RUV_UPDATE_PARTIAL
  2240. #undef RUV_UPDATE_FULL
  2241. }
  2242. slapi_ch_free_string(&generation);
  2243. return_value = 0;
  2244. } else {
  2245. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_replica_configure_ruv - "
  2246. "RUV for replica %s is missing replica generation\n",
  2247. slapi_sdn_get_dn(r->repl_root));
  2248. goto done;
  2249. }
  2250. } else {
  2251. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_replica_configure_ruv - "
  2252. "Unable to convert %s attribute in entry %s to a replica update vector.\n",
  2253. type_ruvElement, slapi_sdn_get_dn(r->repl_root));
  2254. goto done;
  2255. }
  2256. } else /* search failed */
  2257. {
  2258. if (LDAP_NO_SUCH_OBJECT == rc) {
  2259. /* The entry doesn't exist: create it */
  2260. rc = replica_create_ruv_tombstone(r);
  2261. if (LDAP_SUCCESS != rc) {
  2262. /*
  2263. * XXXggood - the following error appears on startup if we try
  2264. * to initialize replica RUVs before the backend instance is up.
  2265. * It's alarming to see this error, and we should suppress it
  2266. * (or avoid trying to configure it) if the backend instance is
  2267. * not yet online.
  2268. */
  2269. /*
  2270. * XXXrichm - you can also get this error when the backend is in
  2271. * read only mode c.f. bug 539782
  2272. */
  2273. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2274. "_replica_configure_ruv - Failed to create replica ruv tombstone "
  2275. "entry (%s); LDAP error - %d\n",
  2276. slapi_sdn_get_dn(r->repl_root), rc);
  2277. goto done;
  2278. } else {
  2279. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2280. "_replica_configure_ruv - No ruv tombstone found for replica %s. "
  2281. "Created a new one\n",
  2282. slapi_sdn_get_dn(r->repl_root));
  2283. return_value = 0;
  2284. }
  2285. } else {
  2286. /* see if the suffix is disabled */
  2287. char *state = slapi_mtn_get_state(r->repl_root);
  2288. if (state && !strcasecmp(state, "disabled")) {
  2289. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2290. "_replica_configure_ruv - Replication disabled for "
  2291. "entry (%s); LDAP error - %d\n",
  2292. slapi_sdn_get_dn(r->repl_root), rc);
  2293. slapi_ch_free_string(&state);
  2294. goto done;
  2295. } else if (!r->repl_ruv) /* other error */
  2296. {
  2297. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2298. "_replica_configure_ruv - Replication broken for "
  2299. "entry (%s); LDAP error - %d\n",
  2300. slapi_sdn_get_dn(r->repl_root), rc);
  2301. slapi_ch_free_string(&state);
  2302. goto done;
  2303. } else /* some error but continue anyway? */
  2304. {
  2305. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2306. "_replica_configure_ruv - Error %d reading tombstone for replica %s.\n",
  2307. rc, slapi_sdn_get_dn(r->repl_root));
  2308. return_value = 0;
  2309. }
  2310. slapi_ch_free_string(&state);
  2311. }
  2312. }
  2313. if (NULL != r->min_csn_pl) {
  2314. csnplFree(&r->min_csn_pl);
  2315. }
  2316. /* create pending list for min csn if necessary */
  2317. if (ruv_get_smallest_csn_for_replica((RUV *)object_get_data(r->repl_ruv),
  2318. r->repl_rid, &csn) == RUV_SUCCESS) {
  2319. csn_free(&csn);
  2320. r->min_csn_pl = NULL;
  2321. } else {
  2322. /*
  2323. * The local replica has not generated any of its own CSNs yet.
  2324. * We need to watch CSNs being generated and note the first
  2325. * locally-generated CSN that's committed. Once that event occurs,
  2326. * the RUV is suitable for iteration over locally generated
  2327. * changes.
  2328. */
  2329. r->min_csn_pl = csnplNew();
  2330. }
  2331. done:
  2332. if (NULL != pb) {
  2333. slapi_free_search_results_internal(pb);
  2334. slapi_pblock_destroy(pb);
  2335. }
  2336. if (return_value != 0) {
  2337. if (ruv)
  2338. ruv_destroy(&ruv);
  2339. }
  2340. return return_value;
  2341. }
  2342. /* NOTE - this is the only non-api function that performs locking because
  2343. it is called by the event queue */
  2344. void
  2345. replica_update_state(time_t when __attribute__((unused)), void *arg)
  2346. {
  2347. int rc;
  2348. const char *replica_name = (const char *)arg;
  2349. Replica *r;
  2350. Slapi_Mod smod;
  2351. LDAPMod *mods[3];
  2352. Slapi_PBlock *pb = NULL;
  2353. char *dn = NULL;
  2354. struct berval *vals[2];
  2355. struct berval val;
  2356. LDAPMod mod;
  2357. if (NULL == replica_name)
  2358. return;
  2359. r = replica_get_by_name(replica_name);
  2360. if (NULL == r) {
  2361. return;
  2362. }
  2363. replica_lock(r->repl_lock);
  2364. /* replica state is currently being updated
  2365. or no CSN was assigned - bail out */
  2366. if (r->state_update_inprogress) {
  2367. replica_unlock(r->repl_lock);
  2368. return;
  2369. }
  2370. /* This might be a consumer */
  2371. if (!r->repl_csn_assigned) {
  2372. /* EY: the consumer needs to flush ruv to disk. */
  2373. replica_unlock(r->repl_lock);
  2374. if (replica_write_ruv(r)) {
  2375. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2376. "replica_update_state - Failed write RUV for %s\n",
  2377. slapi_sdn_get_dn(r->repl_root));
  2378. }
  2379. return;
  2380. }
  2381. /* ONREPL update csn generator state of an updatable replica only */
  2382. /* ONREPL state always changes because we update time every second and
  2383. we write state to the disk less frequently */
  2384. rc = csngen_get_state((CSNGen *)object_get_data(r->repl_csngen), &smod);
  2385. if (rc != 0) {
  2386. replica_unlock(r->repl_lock);
  2387. return;
  2388. }
  2389. r->state_update_inprogress = PR_TRUE;
  2390. r->repl_csn_assigned = PR_FALSE;
  2391. dn = _replica_get_config_dn(r->repl_root);
  2392. if (NULL == dn) {
  2393. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2394. "replica_update_state - Failed to get the config dn for %s\n",
  2395. slapi_sdn_get_dn(r->repl_root));
  2396. replica_unlock(r->repl_lock);
  2397. return;
  2398. }
  2399. pb = slapi_pblock_new();
  2400. mods[0] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod);
  2401. /* we don't want to held lock during operations since it causes lock contention
  2402. and sometimes deadlock. So releasing lock here */
  2403. replica_unlock(r->repl_lock);
  2404. /* replica repl_name and new_name attributes do not get changed once
  2405. the replica is configured - so it is ok that they are outside replica lock */
  2406. /* write replica name if it has not been written before */
  2407. if (r->new_name) {
  2408. mods[1] = &mod;
  2409. mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES;
  2410. mod.mod_type = (char *)attr_replicaName;
  2411. mod.mod_bvalues = vals;
  2412. vals[0] = &val;
  2413. vals[1] = NULL;
  2414. val.bv_val = r->repl_name;
  2415. val.bv_len = strlen(val.bv_val);
  2416. mods[2] = NULL;
  2417. } else {
  2418. mods[1] = NULL;
  2419. }
  2420. slapi_modify_internal_set_pb(pb, dn, mods, NULL, NULL,
  2421. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
  2422. slapi_modify_internal_pb(pb);
  2423. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
  2424. if (rc != LDAP_SUCCESS) {
  2425. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_update_state - "
  2426. "Failed to update state of csn generator for replica %s: LDAP "
  2427. "error - %d\n",
  2428. slapi_sdn_get_dn(r->repl_root), rc);
  2429. } else {
  2430. r->new_name = PR_FALSE;
  2431. }
  2432. /* update RUV - performs its own locking */
  2433. if (replica_write_ruv(r)) {
  2434. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2435. "replica_update_state - Failed write RUV for %s\n",
  2436. slapi_sdn_get_dn(r->repl_root));
  2437. }
  2438. /* since this is the only place this value is changed and we are
  2439. guaranteed that only one thread enters the function, its ok
  2440. to change it outside replica lock */
  2441. r->state_update_inprogress = PR_FALSE;
  2442. slapi_ch_free((void **)&dn);
  2443. slapi_pblock_destroy(pb);
  2444. slapi_mod_done(&smod);
  2445. }
  2446. int
  2447. replica_write_ruv(Replica *r)
  2448. {
  2449. int rc = LDAP_SUCCESS;
  2450. Slapi_Mod smod, rmod;
  2451. Slapi_Mod smod_last_modified;
  2452. LDAPMod *mods[4];
  2453. Slapi_PBlock *pb;
  2454. PR_ASSERT(r);
  2455. replica_lock(r->repl_lock);
  2456. PR_ASSERT(r->repl_ruv);
  2457. ruv_to_smod((RUV *)object_get_data(r->repl_ruv), &smod);
  2458. ruv_last_modified_to_smod((RUV *)object_get_data(r->repl_ruv), &smod_last_modified);
  2459. replica_unlock(r->repl_lock);
  2460. mods[0] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod);
  2461. mods[1] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod_last_modified);
  2462. if (agmt_maxcsn_to_smod(r, &rmod) == LDAP_SUCCESS) {
  2463. mods[2] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&rmod);
  2464. } else {
  2465. mods[2] = NULL;
  2466. }
  2467. mods[3] = NULL;
  2468. pb = slapi_pblock_new();
  2469. /* replica name never changes so it is ok to reference it outside the lock */
  2470. slapi_modify_internal_set_pb_ext(
  2471. pb,
  2472. r->repl_root, /* only used to select be */
  2473. mods,
  2474. NULL, /* controls */
  2475. RUV_STORAGE_ENTRY_UNIQUEID,
  2476. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
  2477. /* Add OP_FLAG_TOMBSTONE_ENTRY so that this doesn't get logged in the Retro ChangeLog */
  2478. OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP | OP_FLAG_TOMBSTONE_ENTRY |
  2479. OP_FLAG_REPL_RUV);
  2480. slapi_modify_internal_pb(pb);
  2481. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
  2482. /* ruv does not exist - create one */
  2483. replica_lock(r->repl_lock);
  2484. if (rc == LDAP_NO_SUCH_OBJECT) {
  2485. /* this includes an internal operation - but since this only happens
  2486. during server startup - its ok that we have lock around it */
  2487. rc = _replica_configure_ruv(r, PR_TRUE);
  2488. } else if (rc != LDAP_SUCCESS) { /* error */
  2489. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2490. "replica_write_ruv - Failed to update RUV tombstone for %s; "
  2491. "LDAP error - %d\n",
  2492. slapi_sdn_get_dn(r->repl_root), rc);
  2493. }
  2494. replica_unlock(r->repl_lock);
  2495. slapi_mod_done(&smod);
  2496. slapi_mod_done(&rmod);
  2497. slapi_mod_done(&smod_last_modified);
  2498. slapi_pblock_destroy(pb);
  2499. return rc;
  2500. }
  2501. /* This routine figures out if an operation is for a replicated area and if so,
  2502. * pulls out the operation CSN and returns it through the smods parameter.
  2503. * It also informs the caller of the RUV entry's unique ID, since the caller
  2504. * may not have access to the macro in repl5.h. */
  2505. int
  2506. replica_ruv_smods_for_op(Slapi_PBlock *pb, char **uniqueid, Slapi_Mods **smods)
  2507. {
  2508. Object *ruv_obj;
  2509. Replica *replica;
  2510. RUV *ruv;
  2511. RUV *ruv_copy;
  2512. CSN *opcsn = NULL;
  2513. Slapi_Mod smod;
  2514. Slapi_Mod smod_last_modified;
  2515. Slapi_Operation *op;
  2516. Slapi_Entry *target_entry = NULL;
  2517. int rc = 0;
  2518. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &target_entry);
  2519. if (target_entry && is_ruv_tombstone_entry(target_entry)) {
  2520. /* disallow direct modification of the RUV tombstone entry
  2521. must use the CLEANRUV task instead */
  2522. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2523. "replica_ruv_smods_for_op - Attempted to directly modify the tombstone RUV "
  2524. "entry [%s] - use the CLEANALLRUV task instead\n",
  2525. slapi_entry_get_dn_const(target_entry));
  2526. return (-1);
  2527. }
  2528. replica = replica_get_replica_for_op(pb);
  2529. slapi_pblock_get(pb, SLAPI_OPERATION, &op);
  2530. if (NULL != replica && NULL != op) {
  2531. opcsn = operation_get_csn(op);
  2532. }
  2533. /* If the op has no CSN then it's not in a replicated area, so we're done */
  2534. if (NULL == opcsn) {
  2535. return (0);
  2536. }
  2537. ruv_obj = replica_get_ruv(replica);
  2538. PR_ASSERT(ruv_obj);
  2539. ruv = (RUV *)object_get_data(ruv_obj);
  2540. PR_ASSERT(ruv);
  2541. ruv_copy = ruv_dup(ruv);
  2542. object_release(ruv_obj);
  2543. rc = ruv_set_max_csn_ext(ruv_copy, opcsn, NULL, PR_TRUE);
  2544. if (rc == RUV_COVERS_CSN) { /* change would "revert" RUV - ignored */
  2545. rc = 0; /* tell caller to ignore */
  2546. } else if (rc == RUV_SUCCESS) {
  2547. rc = 1; /* tell caller success */
  2548. } else { /* error */
  2549. rc = -1; /* tell caller error */
  2550. }
  2551. if (rc == 1) {
  2552. ruv_to_smod(ruv_copy, &smod);
  2553. ruv_last_modified_to_smod(ruv_copy, &smod_last_modified);
  2554. }
  2555. ruv_destroy(&ruv_copy);
  2556. if (rc == 1) {
  2557. *smods = slapi_mods_new();
  2558. slapi_mods_add_smod(*smods, &smod);
  2559. slapi_mods_add_smod(*smods, &smod_last_modified);
  2560. *uniqueid = slapi_ch_strdup(RUV_STORAGE_ENTRY_UNIQUEID);
  2561. } else {
  2562. *smods = NULL;
  2563. *uniqueid = NULL;
  2564. }
  2565. return rc;
  2566. }
  2567. const CSN *
  2568. entry_get_deletion_csn(Slapi_Entry *e)
  2569. {
  2570. const CSN *deletion_csn = NULL;
  2571. PR_ASSERT(NULL != e);
  2572. if (NULL != e) {
  2573. Slapi_Attr *oc_attr = NULL;
  2574. if (entry_attr_find_wsi(e, SLAPI_ATTR_OBJECTCLASS, &oc_attr) == ATTRIBUTE_PRESENT) {
  2575. Slapi_Value *tombstone_value = NULL;
  2576. struct berval v;
  2577. v.bv_val = SLAPI_ATTR_VALUE_TOMBSTONE;
  2578. v.bv_len = strlen(SLAPI_ATTR_VALUE_TOMBSTONE);
  2579. if (attr_value_find_wsi(oc_attr, &v, &tombstone_value) == VALUE_PRESENT) {
  2580. deletion_csn = value_get_csn(tombstone_value, CSN_TYPE_VALUE_UPDATED);
  2581. }
  2582. }
  2583. }
  2584. return deletion_csn;
  2585. }
  2586. static void
  2587. _delete_tombstone(const char *tombstone_dn, const char *uniqueid, int ext_op_flags)
  2588. {
  2589. PR_ASSERT(NULL != tombstone_dn && NULL != uniqueid);
  2590. if (NULL == tombstone_dn || NULL == uniqueid) {
  2591. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "_delete_tombstone - "
  2592. "NULL tombstone_dn or uniqueid provided.\n");
  2593. } else {
  2594. int ldaprc;
  2595. Slapi_PBlock *pb = slapi_pblock_new();
  2596. slapi_delete_internal_set_pb(pb, tombstone_dn, NULL, /* controls */
  2597. uniqueid, repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
  2598. OP_FLAG_TOMBSTONE_ENTRY | ext_op_flags);
  2599. slapi_delete_internal_pb(pb);
  2600. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &ldaprc);
  2601. if (LDAP_SUCCESS != ldaprc) {
  2602. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2603. "_delete_tombstone - Unable to delete tombstone %s, "
  2604. "uniqueid %s: %s.\n",
  2605. tombstone_dn, uniqueid,
  2606. ldap_err2string(ldaprc));
  2607. }
  2608. slapi_pblock_destroy(pb);
  2609. }
  2610. }
  2611. static void
  2612. get_reap_result(int rc, void *cb_data)
  2613. {
  2614. PR_ASSERT(cb_data);
  2615. ((reap_callback_data *)cb_data)->rc = rc;
  2616. }
  2617. static int
  2618. process_reap_entry(Slapi_Entry *entry, void *cb_data)
  2619. {
  2620. char deletion_csn_str[CSN_STRSIZE];
  2621. char purge_csn_str[CSN_STRSIZE];
  2622. uint64_t *num_entriesp = &((reap_callback_data *)cb_data)->num_entries;
  2623. uint64_t *num_purged_entriesp = &((reap_callback_data *)cb_data)->num_purged_entries;
  2624. CSN *purge_csn = ((reap_callback_data *)cb_data)->purge_csn;
  2625. /* this is a pointer into the actual value in the Replica object - so that
  2626. if the value is set in the replica, we will know about it immediately */
  2627. PRBool *tombstone_reap_stop = ((reap_callback_data *)cb_data)->tombstone_reap_stop;
  2628. const CSN *deletion_csn = NULL;
  2629. CSN *tombstone_csn = NULL;
  2630. int rc = -1;
  2631. /* abort reaping if we've been told to stop or we're shutting down */
  2632. if (*tombstone_reap_stop || slapi_is_shutting_down()) {
  2633. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2634. "process_reap_entry - The tombstone reap process "
  2635. " has been stopped\n");
  2636. return rc;
  2637. }
  2638. /* we only ask for the objectclass in the search - the deletion csn is in the
  2639. objectclass attribute values - if we need more attributes returned by the
  2640. search in the future, see _replica_reap_tombstones below and add more to the
  2641. attrs array */
  2642. deletion_csn = entry_get_deletion_csn(entry);
  2643. if (deletion_csn == NULL) {
  2644. /* this might be a tombstone which was directly added, eg a cenotaph
  2645. * check if a tombstonecsn exist and use it
  2646. */
  2647. char *tombstonecsn_str = (char*)slapi_entry_attr_get_ref(entry, SLAPI_ATTR_TOMBSTONE_CSN);
  2648. if (tombstonecsn_str) {
  2649. tombstone_csn = csn_new_by_string(tombstonecsn_str);
  2650. deletion_csn = tombstone_csn;
  2651. }
  2652. }
  2653. if ((NULL == deletion_csn || csn_compare(deletion_csn, purge_csn) < 0) &&
  2654. (!is_ruv_tombstone_entry(entry))) {
  2655. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  2656. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2657. "process_reap_entry - Removing tombstone %s "
  2658. "because its deletion csn (%s) is less than the "
  2659. "purge csn (%s).\n",
  2660. slapi_entry_get_dn(entry),
  2661. csn_as_string(deletion_csn, PR_FALSE, deletion_csn_str),
  2662. csn_as_string(purge_csn, PR_FALSE, purge_csn_str));
  2663. }
  2664. if (slapi_entry_attr_get_ulong(entry, "tombstonenumsubordinates") < 1) {
  2665. _delete_tombstone(slapi_entry_get_dn(entry),
  2666. slapi_entry_get_uniqueid(entry), 0);
  2667. (*num_purged_entriesp)++;
  2668. }
  2669. } else {
  2670. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  2671. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2672. "process_reap_entry - NOT removing tombstone "
  2673. "%s\n",
  2674. slapi_entry_get_dn(entry));
  2675. }
  2676. }
  2677. if (!is_ruv_tombstone_entry(entry)) {
  2678. /* Don't update the count for the database tombstone entry */
  2679. (*num_entriesp)++;
  2680. }
  2681. if (tombstone_csn) {
  2682. csn_free(&tombstone_csn);
  2683. }
  2684. return 0;
  2685. }
  2686. /* This does the actual work of searching for tombstones and deleting them.
  2687. This must be called in a separate thread because it may take a long time.
  2688. */
  2689. static void
  2690. _replica_reap_tombstones(void *arg)
  2691. {
  2692. const char *replica_name = (const char *)arg;
  2693. Slapi_PBlock *pb = NULL;
  2694. Replica *replica = NULL;
  2695. CSN *purge_csn = NULL;
  2696. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2697. "_replica_reap_tombstones - Beginning tombstone reap for replica %s.\n",
  2698. replica_name ? replica_name : "(null)");
  2699. if (NULL == replica_name) {
  2700. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2701. "_replica_reap_tombstones - Replica name is null in tombstone reap\n");
  2702. goto done;
  2703. }
  2704. replica = replica_get_by_name(replica_name);
  2705. if (NULL == replica) {
  2706. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2707. "_replica_reap_tombstones - Replica object %s is null in tombstone reap\n", replica_name);
  2708. goto done;
  2709. }
  2710. if (replica->tombstone_reap_stop) {
  2711. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2712. "_replica_reap_tombstones - Replica %s reap stop flag is set for tombstone reap\n", replica_name);
  2713. goto done;
  2714. }
  2715. purge_csn = replica_get_purge_csn(replica);
  2716. if (NULL != purge_csn) {
  2717. LDAPControl **ctrls;
  2718. reap_callback_data cb_data;
  2719. char deletion_csn_str[CSN_STRSIZE];
  2720. char tombstone_filter[128];
  2721. char **attrs = NULL;
  2722. int oprc;
  2723. if (replica_get_precise_purging(replica)) {
  2724. /*
  2725. * Using precise tombstone purging. Create filter to lookup the exact
  2726. * entries that need to be purged by using a range search on the new
  2727. * tombstone csn index.
  2728. */
  2729. csn_as_string(purge_csn, PR_FALSE, deletion_csn_str);
  2730. PR_snprintf(tombstone_filter, 128,
  2731. "(&(%s<=%s)(objectclass=nsTombstone)(|(objectclass=*)(objectclass=ldapsubentry)))", SLAPI_ATTR_TOMBSTONE_CSN,
  2732. csn_as_string(purge_csn, PR_FALSE, deletion_csn_str));
  2733. } else {
  2734. /* Use the old inefficient filter */
  2735. PR_snprintf(tombstone_filter, 128, "(&(objectclass=nsTombstone)(|(objectclass=*)(objectclass=ldapsubentry)))");
  2736. }
  2737. /* we just need the objectclass - for the deletion csn
  2738. and the dn and nsuniqueid - for possible deletion
  2739. and tombstonenumsubordinates to check if it has numsubordinates
  2740. saves time to return only 3 attrs
  2741. */
  2742. charray_add(&attrs, slapi_ch_strdup("objectclass"));
  2743. charray_add(&attrs, slapi_ch_strdup("nsuniqueid"));
  2744. charray_add(&attrs, slapi_ch_strdup("tombstonenumsubordinates"));
  2745. charray_add(&attrs, slapi_ch_strdup(SLAPI_ATTR_TOMBSTONE_CSN));
  2746. ctrls = (LDAPControl **)slapi_ch_calloc(3, sizeof(LDAPControl *));
  2747. ctrls[0] = create_managedsait_control();
  2748. ctrls[1] = create_backend_control(replica->repl_root);
  2749. ctrls[2] = NULL;
  2750. pb = slapi_pblock_new();
  2751. slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(replica->repl_root),
  2752. LDAP_SCOPE_SUBTREE, tombstone_filter,
  2753. attrs, 0, ctrls, NULL,
  2754. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
  2755. OP_FLAG_REVERSE_CANDIDATE_ORDER);
  2756. cb_data.rc = 0;
  2757. cb_data.num_entries = 0UL;
  2758. cb_data.num_purged_entries = 0UL;
  2759. cb_data.purge_csn = purge_csn;
  2760. /* set the cb data pointer to point to the actual memory address in
  2761. the actual Replica object - so that when the value in the Replica
  2762. is set, the reap process will know about it immediately */
  2763. cb_data.tombstone_reap_stop = &(replica->tombstone_reap_stop);
  2764. slapi_search_internal_callback_pb(pb, &cb_data /* callback data */,
  2765. get_reap_result /* result callback */,
  2766. process_reap_entry /* entry callback */,
  2767. NULL /* referral callback*/);
  2768. charray_free(attrs);
  2769. oprc = cb_data.rc;
  2770. if (LDAP_SUCCESS != oprc) {
  2771. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2772. "_replica_reap_tombstones - Failed when searching for "
  2773. "tombstones in replica %s: %s. Will try again in %" PRId64 " "
  2774. "seconds.\n",
  2775. slapi_sdn_get_dn(replica->repl_root),
  2776. ldap_err2string(oprc), replica->tombstone_reap_interval);
  2777. } else {
  2778. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2779. "_replica_reap_tombstones - Purged %" PRIu64 " of %" PRIu64 " tombstones "
  2780. "in replica %s. Will try again in %" PRId64 " "
  2781. "seconds.\n",
  2782. cb_data.num_purged_entries, cb_data.num_entries,
  2783. slapi_sdn_get_dn(replica->repl_root),
  2784. replica->tombstone_reap_interval);
  2785. }
  2786. } else {
  2787. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2788. "_replica_reap_tombstones - No purge CSN for tombstone reap for replica %s.\n",
  2789. replica_name);
  2790. }
  2791. done:
  2792. if (replica) {
  2793. replica_lock(replica->repl_lock);
  2794. replica->tombstone_reap_active = PR_FALSE;
  2795. replica_unlock(replica->repl_lock);
  2796. }
  2797. if (NULL != purge_csn) {
  2798. csn_free(&purge_csn);
  2799. }
  2800. if (NULL != pb) {
  2801. slapi_free_search_results_internal(pb);
  2802. slapi_pblock_destroy(pb);
  2803. }
  2804. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  2805. "_replica_reap_tombstones - Finished tombstone reap for replica %s.\n",
  2806. replica_name ? replica_name : "(null)");
  2807. }
  2808. /*
  2809. We don't want to run the reaper function directly from the event
  2810. queue since it may hog the event queue, starving other events.
  2811. See bug 604441
  2812. The function eq_cb_reap_tombstones will fire off the actual thread
  2813. that does the real work.
  2814. */
  2815. static void
  2816. eq_cb_reap_tombstones(time_t when __attribute__((unused)), void *arg)
  2817. {
  2818. const char *replica_name = (const char *)arg;
  2819. Replica *replica = NULL;
  2820. if (NULL != replica_name) {
  2821. replica = replica_get_by_name(replica_name);
  2822. if (replica) {
  2823. replica_lock(replica->repl_lock);
  2824. /* No action if purge is disabled or the previous purge is not done yet */
  2825. if (replica->tombstone_reap_interval != 0 &&
  2826. replica->tombstone_reap_active == PR_FALSE) {
  2827. /* set the flag here to minimize race conditions */
  2828. replica->tombstone_reap_active = PR_TRUE;
  2829. if (PR_CreateThread(PR_USER_THREAD,
  2830. _replica_reap_tombstones, (void *)replica_name,
  2831. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD,
  2832. SLAPD_DEFAULT_THREAD_STACKSIZE) == NULL) {
  2833. replica->tombstone_reap_active = PR_FALSE;
  2834. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  2835. "eq_cb_reap_tombstones - Unable to create the tombstone reap thread for replica %s. "
  2836. "Possible system resources problem\n",
  2837. replica_name);
  2838. }
  2839. }
  2840. /* reap thread will wait until this lock is released */
  2841. replica_unlock(replica->repl_lock);
  2842. }
  2843. replica = NULL;
  2844. }
  2845. }
  2846. static char *
  2847. _replica_type_as_string(const Replica *r)
  2848. {
  2849. switch (r->repl_type) {
  2850. case REPLICA_TYPE_PRIMARY:
  2851. return "primary";
  2852. case REPLICA_TYPE_READONLY:
  2853. return "read-only";
  2854. case REPLICA_TYPE_UPDATABLE:
  2855. return "updatable";
  2856. default:
  2857. return "unknown";
  2858. }
  2859. }
  2860. static const char *root_glue =
  2861. "dn: %s\n"
  2862. "objectclass: top\n"
  2863. "objectclass: nsTombstone\n"
  2864. "objectclass: extensibleobject\n"
  2865. "nsuniqueid: %s\n";
  2866. static int
  2867. replica_create_ruv_tombstone(Replica *r)
  2868. {
  2869. int return_value = LDAP_LOCAL_ERROR;
  2870. char *root_entry_str;
  2871. Slapi_Entry *e = NULL;
  2872. const char *purl = NULL;
  2873. RUV *ruv;
  2874. struct berval **bvals = NULL;
  2875. Slapi_PBlock *pb = NULL;
  2876. int rc;
  2877. PR_ASSERT(NULL != r && NULL != r->repl_root);
  2878. root_entry_str = slapi_ch_smprintf(root_glue, slapi_sdn_get_ndn(r->repl_root), RUV_STORAGE_ENTRY_UNIQUEID);
  2879. e = slapi_str2entry(root_entry_str, SLAPI_STR2ENTRY_TOMBSTONE_CHECK);
  2880. if (e == NULL)
  2881. goto done;
  2882. /* Add ruv */
  2883. if (r->repl_ruv == NULL) {
  2884. CSNGen *gen;
  2885. CSN *csn;
  2886. char csnstr[CSN_STRSIZE];
  2887. /* first attempt to write RUV tombstone - need to create RUV */
  2888. gen = (CSNGen *)object_get_data(r->repl_csngen);
  2889. PR_ASSERT(gen);
  2890. if (csngen_new_csn(gen, &csn, PR_FALSE /* notify */) == CSN_SUCCESS) {
  2891. (void)csn_as_string(csn, PR_FALSE, csnstr);
  2892. csn_free(&csn);
  2893. /*
  2894. * if this is an updateable replica - add its own
  2895. * element to the RUV so that referrals work correctly
  2896. */
  2897. if (r->repl_type == REPLICA_TYPE_UPDATABLE)
  2898. purl = multimaster_get_local_purl();
  2899. if (ruv_init_new(csnstr, r->repl_rid, purl, &ruv) == RUV_SUCCESS) {
  2900. r->repl_ruv = object_new((void *)ruv, (FNFree)ruv_destroy);
  2901. return_value = LDAP_SUCCESS;
  2902. } else {
  2903. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_create_ruv_tombstone - "
  2904. "Cannot create new replica update vector for %s\n",
  2905. slapi_sdn_get_dn(r->repl_root));
  2906. ruv_destroy(&ruv);
  2907. goto done;
  2908. }
  2909. } else {
  2910. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_create_ruv_tombstone - "
  2911. "Cannot obtain CSN for new replica update vector for %s\n",
  2912. slapi_sdn_get_dn(r->repl_root));
  2913. csn_free(&csn);
  2914. goto done;
  2915. }
  2916. } else { /* failed to write the entry because DB was not initialized - retry */
  2917. ruv = (RUV *)object_get_data(r->repl_ruv);
  2918. PR_ASSERT(ruv);
  2919. }
  2920. PR_ASSERT(r->repl_ruv);
  2921. rc = ruv_to_bervals(ruv, &bvals);
  2922. if (rc != RUV_SUCCESS) {
  2923. goto done;
  2924. }
  2925. /* ONREPL this is depricated function but there is currently no better API to use */
  2926. rc = slapi_entry_add_values(e, type_ruvElement, bvals);
  2927. if (rc != 0) {
  2928. goto done;
  2929. }
  2930. pb = slapi_pblock_new();
  2931. slapi_add_entry_internal_set_pb(pb, e, NULL /* controls */, repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
  2932. OP_FLAG_TOMBSTONE_ENTRY | OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP | OP_FLAG_REPL_RUV);
  2933. slapi_add_internal_pb(pb);
  2934. e = NULL; /* add consumes e, upon success or failure */
  2935. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value);
  2936. done:
  2937. slapi_entry_free(e);
  2938. if (bvals)
  2939. ber_bvecfree(bvals);
  2940. if (pb)
  2941. slapi_pblock_destroy(pb);
  2942. slapi_ch_free_string(&root_entry_str);
  2943. return return_value;
  2944. }
  2945. static void
  2946. assign_csn_callback(const CSN *csn, void *data)
  2947. {
  2948. Replica *r = (Replica *)data;
  2949. Object *ruv_obj;
  2950. RUV *ruv;
  2951. PR_ASSERT(NULL != csn);
  2952. PR_ASSERT(NULL != r);
  2953. ruv_obj = replica_get_ruv(r);
  2954. PR_ASSERT(ruv_obj);
  2955. ruv = (RUV *)object_get_data(ruv_obj);
  2956. PR_ASSERT(ruv);
  2957. replica_lock(r->repl_lock);
  2958. r->repl_csn_assigned = PR_TRUE;
  2959. if (NULL != r->min_csn_pl) {
  2960. if (csnplInsert(r->min_csn_pl, csn, NULL) != 0) {
  2961. char csn_str[CSN_STRSIZE]; /* For logging only */
  2962. /* Ack, we can't keep track of min csn. Punt. */
  2963. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  2964. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "assign_csn_callback - "
  2965. "Failed to insert csn %s for replica %s\n",
  2966. csn_as_string(csn, PR_FALSE, csn_str),
  2967. slapi_sdn_get_dn(r->repl_root));
  2968. }
  2969. csnplFree(&r->min_csn_pl);
  2970. }
  2971. }
  2972. ruv_add_csn_inprogress(r, ruv, csn);
  2973. replica_unlock(r->repl_lock);
  2974. object_release(ruv_obj);
  2975. }
  2976. static void
  2977. abort_csn_callback(const CSN *csn, void *data)
  2978. {
  2979. Replica *r = (Replica *)data;
  2980. Object *ruv_obj;
  2981. RUV *ruv;
  2982. PR_ASSERT(NULL != csn);
  2983. PR_ASSERT(NULL != data);
  2984. ruv_obj = replica_get_ruv(r);
  2985. PR_ASSERT(ruv_obj);
  2986. ruv = (RUV *)object_get_data(ruv_obj);
  2987. PR_ASSERT(ruv);
  2988. replica_lock(r->repl_lock);
  2989. if (NULL != r->min_csn_pl) {
  2990. int rc = csnplRemove(r->min_csn_pl, csn);
  2991. if (rc) {
  2992. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed\n");
  2993. replica_unlock(r->repl_lock);
  2994. return;
  2995. }
  2996. }
  2997. ruv_cancel_csn_inprogress(r, ruv, csn, replica_get_rid(r));
  2998. replica_unlock(r->repl_lock);
  2999. object_release(ruv_obj);
  3000. }
  3001. static CSN *
  3002. _replica_get_purge_csn_nolock(const Replica *r)
  3003. {
  3004. CSN *purge_csn = NULL;
  3005. CSN **csns = NULL;
  3006. RUV *ruv;
  3007. int i;
  3008. if (r->repl_purge_delay > 0) {
  3009. /* get a sorted list of all maxcsns in ruv in ascend order */
  3010. object_acquire(r->repl_ruv);
  3011. ruv = object_get_data(r->repl_ruv);
  3012. csns = cl5BuildCSNList(ruv, NULL);
  3013. object_release(r->repl_ruv);
  3014. if (csns == NULL)
  3015. return NULL;
  3016. /* locate the most recent maxcsn in the csn list */
  3017. for (i = 0; csns[i]; i++)
  3018. ;
  3019. purge_csn = csn_dup(csns[i - 1]);
  3020. /* set purge_csn to the most recent maxcsn - purge_delay */
  3021. if ((csn_get_time(purge_csn) - r->repl_purge_delay) > 0)
  3022. csn_set_time(purge_csn, csn_get_time(purge_csn) - r->repl_purge_delay);
  3023. }
  3024. if (csns)
  3025. cl5DestroyCSNList(&csns);
  3026. return purge_csn;
  3027. }
  3028. static void
  3029. replica_get_referrals_nolock(const Replica *r, char ***referrals)
  3030. {
  3031. if (referrals != NULL) {
  3032. int hint;
  3033. int i = 0;
  3034. Slapi_Value *v = NULL;
  3035. if (NULL == r->repl_referral) {
  3036. *referrals = NULL;
  3037. } else {
  3038. /* richm: +1 for trailing NULL */
  3039. *referrals = (char **)slapi_ch_calloc(sizeof(char *), 1 + slapi_valueset_count(r->repl_referral));
  3040. hint = slapi_valueset_first_value(r->repl_referral, &v);
  3041. while (v != NULL) {
  3042. const char *s = slapi_value_get_string(v);
  3043. if (s != NULL && s[0] != '\0') {
  3044. (*referrals)[i] = slapi_ch_strdup(s);
  3045. i++;
  3046. }
  3047. hint = slapi_valueset_next_value(r->repl_referral, hint, &v);
  3048. }
  3049. (*referrals)[i] = NULL;
  3050. }
  3051. }
  3052. }
  3053. static int
  3054. replica_log_start_iteration(const ruv_enum_data *rid_data, void *data)
  3055. {
  3056. int rc = 0;
  3057. slapi_operation_parameters op_params;
  3058. Replica *replica = (Replica *)data;
  3059. cldb_Handle *cldb = NULL;
  3060. if (rid_data->csn == NULL)
  3061. return 0;
  3062. memset(&op_params, 0, sizeof(op_params));
  3063. op_params.operation_type = SLAPI_OPERATION_DELETE;
  3064. op_params.target_address.sdn = slapi_sdn_new_ndn_byval(START_ITERATION_ENTRY_DN);
  3065. op_params.target_address.uniqueid = START_ITERATION_ENTRY_UNIQUEID;
  3066. op_params.csn = csn_dup(rid_data->csn);
  3067. cldb = replica_get_file_info(replica);
  3068. rc = cl5WriteOperation(cldb, &op_params);
  3069. if (rc == CL5_SUCCESS)
  3070. rc = 0;
  3071. else
  3072. rc = -1;
  3073. slapi_sdn_free(&op_params.target_address.sdn);
  3074. csn_free(&op_params.csn);
  3075. return rc;
  3076. }
  3077. static int
  3078. replica_log_ruv_elements_nolock(const Replica *r)
  3079. {
  3080. int rc = 0;
  3081. RUV *ruv;
  3082. ruv = (RUV *)object_get_data(r->repl_ruv);
  3083. PR_ASSERT(ruv);
  3084. /* we log it as a delete operation to have the least number of fields
  3085. to set. the entry can be identified by a special target uniqueid and
  3086. special target dn */
  3087. rc = ruv_enumerate_elements(ruv, replica_log_start_iteration, (void *)r);
  3088. return rc;
  3089. }
  3090. void
  3091. replica_set_purge_delay(Replica *r, uint32_t purge_delay)
  3092. {
  3093. PR_ASSERT(r);
  3094. replica_lock(r->repl_lock);
  3095. r->repl_purge_delay = purge_delay;
  3096. replica_unlock(r->repl_lock);
  3097. }
  3098. void
  3099. replica_set_tombstone_reap_interval(Replica *r, long interval)
  3100. {
  3101. replica_lock(r->repl_lock);
  3102. /*
  3103. * Leave the event there to purge the existing tombstones
  3104. * if we are about to turn off tombstone creation
  3105. */
  3106. if (interval > 0 && r->repl_eqcxt_tr && r->tombstone_reap_interval != interval) {
  3107. int found;
  3108. found = slapi_eq_cancel(r->repl_eqcxt_tr);
  3109. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3110. "replica_set_tombstone_reap_interval - tombstone_reap event (interval=%" PRId64 ") was %s\n",
  3111. r->tombstone_reap_interval, (found ? "cancelled" : "not found"));
  3112. r->repl_eqcxt_tr = NULL;
  3113. }
  3114. r->tombstone_reap_interval = interval;
  3115. if (interval > 0 && r->repl_eqcxt_tr == NULL) {
  3116. r->repl_eqcxt_tr = slapi_eq_repeat(eq_cb_reap_tombstones, r->repl_name,
  3117. slapi_current_utc_time() + r->tombstone_reap_interval,
  3118. 1000 * r->tombstone_reap_interval);
  3119. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3120. "replica_set_tombstone_reap_interval - tombstone_reap event (interval=%" PRId64 ") was %s\n",
  3121. r->tombstone_reap_interval, (r->repl_eqcxt_tr ? "scheduled" : "not scheduled successfully"));
  3122. }
  3123. replica_unlock(r->repl_lock);
  3124. }
  3125. static void
  3126. replica_strip_cleaned_rids(Replica *r)
  3127. {
  3128. Object *RUVObj;
  3129. RUV *ruv = NULL;
  3130. ReplicaId rid[32] = {0};
  3131. int i = 0;
  3132. RUVObj = replica_get_ruv(r);
  3133. ruv = (RUV *)object_get_data(RUVObj);
  3134. ruv_get_cleaned_rids(ruv, rid);
  3135. while (rid[i] != 0) {
  3136. ruv_delete_replica(ruv, rid[i]);
  3137. if (replica_write_ruv(r)) {
  3138. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3139. "replica_strip_cleaned_rids - Failed to write RUV\n");
  3140. }
  3141. i++;
  3142. }
  3143. object_release(RUVObj);
  3144. }
  3145. /* Update the tombstone entry to reflect the content of the ruv */
  3146. static void
  3147. replica_replace_ruv_tombstone(Replica *r)
  3148. {
  3149. Slapi_PBlock *pb = NULL;
  3150. Slapi_Mod smod;
  3151. Slapi_Mod smod_last_modified;
  3152. LDAPMod *mods[3];
  3153. char *dn;
  3154. int rc;
  3155. PR_ASSERT(NULL != r && NULL != r->repl_root);
  3156. replica_strip_cleaned_rids(r);
  3157. replica_lock(r->repl_lock);
  3158. PR_ASSERT(r->repl_ruv);
  3159. ruv_to_smod((RUV *)object_get_data(r->repl_ruv), &smod);
  3160. ruv_last_modified_to_smod((RUV *)object_get_data(r->repl_ruv), &smod_last_modified);
  3161. dn = _replica_get_config_dn(r->repl_root);
  3162. if (NULL == dn) {
  3163. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  3164. "replica_replace_ruv_tombstone - "
  3165. "Failed to get the config dn for %s\n",
  3166. slapi_sdn_get_dn(r->repl_root));
  3167. replica_unlock(r->repl_lock);
  3168. goto bail;
  3169. }
  3170. mods[0] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod);
  3171. mods[1] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod_last_modified);
  3172. replica_unlock(r->repl_lock);
  3173. mods[2] = NULL;
  3174. pb = slapi_pblock_new();
  3175. slapi_modify_internal_set_pb_ext(
  3176. pb,
  3177. r->repl_root, /* only used to select be */
  3178. mods,
  3179. NULL, /* controls */
  3180. RUV_STORAGE_ENTRY_UNIQUEID,
  3181. repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION),
  3182. OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP | OP_FLAG_REPL_RUV | OP_FLAG_TOMBSTONE_ENTRY);
  3183. slapi_modify_internal_pb(pb);
  3184. slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
  3185. if (rc != LDAP_SUCCESS) {
  3186. if ((rc != LDAP_NO_SUCH_OBJECT && rc != LDAP_TYPE_OR_VALUE_EXISTS) || !replica_is_state_flag_set(r, REPLICA_IN_USE)) {
  3187. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_replace_ruv_tombstone - "
  3188. "Failed to update replication update vector for replica %s: LDAP "
  3189. "error - %d\n",
  3190. (char *)slapi_sdn_get_dn(r->repl_root), rc);
  3191. }
  3192. }
  3193. slapi_ch_free((void **)&dn);
  3194. slapi_pblock_destroy(pb);
  3195. bail:
  3196. slapi_mod_done(&smod);
  3197. slapi_mod_done(&smod_last_modified);
  3198. }
  3199. void
  3200. replica_update_ruv_consumer(Replica *r, RUV *supplier_ruv)
  3201. {
  3202. ReplicaId supplier_id = 0;
  3203. char *supplier_purl = NULL;
  3204. if (ruv_get_first_id_and_purl(supplier_ruv, &supplier_id, &supplier_purl) == RUV_SUCCESS) {
  3205. RUV *local_ruv = NULL;
  3206. replica_lock(r->repl_lock);
  3207. local_ruv = (RUV *)object_get_data(r->repl_ruv);
  3208. if (is_cleaned_rid(supplier_id) || local_ruv == NULL) {
  3209. replica_unlock(r->repl_lock);
  3210. return;
  3211. }
  3212. if (ruv_local_contains_supplier(local_ruv, supplier_id) == 0) {
  3213. if (r->repl_type == REPLICA_TYPE_UPDATABLE) {
  3214. /* Add the new ruv right after the consumer own purl */
  3215. ruv_add_index_replica(local_ruv, supplier_id, supplier_purl, 2);
  3216. } else {
  3217. /* This is a consumer only, add it first */
  3218. ruv_add_index_replica(local_ruv, supplier_id, supplier_purl, 1);
  3219. }
  3220. } else {
  3221. /* Replace it */
  3222. ruv_replace_replica_purl(local_ruv, supplier_id, supplier_purl);
  3223. }
  3224. replica_unlock(r->repl_lock);
  3225. /* Update also the directory entry */
  3226. replica_replace_ruv_tombstone(r);
  3227. }
  3228. }
  3229. PRBool
  3230. replica_is_state_flag_set(Replica *r, int32_t flag)
  3231. {
  3232. PR_ASSERT(r);
  3233. if (r)
  3234. return (r->repl_state_flags & flag);
  3235. else
  3236. return PR_FALSE;
  3237. }
  3238. void
  3239. replica_set_state_flag(Replica *r, uint32_t flag, PRBool clear)
  3240. {
  3241. if (r == NULL)
  3242. return;
  3243. replica_lock(r->repl_lock);
  3244. if (clear) {
  3245. r->repl_state_flags &= ~flag;
  3246. } else {
  3247. r->repl_state_flags |= flag;
  3248. }
  3249. replica_unlock(r->repl_lock);
  3250. }
  3251. /**
  3252. * Use this to tell the tombstone reap process to stop. This will
  3253. * typically be used when we (consumer) get a request to do a
  3254. * total update.
  3255. */
  3256. void
  3257. replica_set_tombstone_reap_stop(Replica *r, PRBool val)
  3258. {
  3259. if (r == NULL)
  3260. return;
  3261. replica_lock(r->repl_lock);
  3262. r->tombstone_reap_stop = val;
  3263. replica_unlock(r->repl_lock);
  3264. }
  3265. /* replica just came back online, probably after data was reloaded */
  3266. void
  3267. replica_enable_replication(Replica *r)
  3268. {
  3269. int rc;
  3270. PR_ASSERT(r);
  3271. /* prevent creation of new agreements until the replica is enabled */
  3272. PR_Lock(r->agmt_lock);
  3273. if (r->repl_flags & REPLICA_LOG_CHANGES) {
  3274. cldb_SetReplicaDB(r, NULL);
  3275. }
  3276. /* retrieve new ruv */
  3277. rc = replica_reload_ruv(r);
  3278. if (rc) {
  3279. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name, "replica_enable_replication - "
  3280. "Reloading ruv failed\n");
  3281. /* What to do ? */
  3282. }
  3283. /* Replica came back online, Check if the total update was terminated.
  3284. If flag is still set, it was not terminated, therefore the data is
  3285. very likely to be incorrect, and we should not restart Replication threads...
  3286. */
  3287. if (!replica_is_state_flag_set(r, REPLICA_TOTAL_IN_PROGRESS)) {
  3288. /* restart outbound replication */
  3289. start_agreements_for_replica(r, PR_TRUE);
  3290. /* enable ruv state update */
  3291. replica_set_enabled(r, PR_TRUE);
  3292. }
  3293. /* mark the replica as being available for updates */
  3294. replica_relinquish_exclusive_access(r, 0, 0);
  3295. replica_set_state_flag(r, REPLICA_AGREEMENTS_DISABLED, PR_TRUE /* clear */);
  3296. PR_Unlock(r->agmt_lock);
  3297. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "replica_enable_replication - "
  3298. "Replica %s is relinquished\n",
  3299. slapi_sdn_get_ndn(replica_get_root(r)));
  3300. }
  3301. /* replica is about to be taken offline */
  3302. void
  3303. replica_disable_replication(Replica *r)
  3304. {
  3305. char *current_purl = NULL;
  3306. char *p_locking_purl = NULL;
  3307. char *locking_purl = NULL;
  3308. ReplicaId junkrid;
  3309. PRBool isInc = PR_FALSE; /* get exclusive access, but not for inc update */
  3310. RUV *repl_ruv = NULL;
  3311. /* prevent creation of new agreements until the replica is disabled */
  3312. PR_Lock(r->agmt_lock);
  3313. /* stop ruv update */
  3314. replica_set_enabled(r, PR_FALSE);
  3315. /* disable outbound replication */
  3316. start_agreements_for_replica(r, PR_FALSE);
  3317. /* close the corresponding changelog file */
  3318. /* close_changelog_for_replica (r_obj); */
  3319. /* mark the replica as being unavailable for updates */
  3320. /* If an incremental update is in progress, we want to wait until it is
  3321. finished until we get exclusive access to the replica, because we have
  3322. to make sure no operations are in progress - it messes up replication
  3323. when a restore is in progress but we are still adding replicated entries
  3324. from a supplier
  3325. */
  3326. repl_ruv = (RUV *)object_get_data(r->repl_ruv);
  3327. ruv_get_first_id_and_purl(repl_ruv, &junkrid, &p_locking_purl);
  3328. locking_purl = slapi_ch_strdup(p_locking_purl);
  3329. p_locking_purl = NULL;
  3330. repl_ruv = NULL;
  3331. while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication",
  3332. &current_purl)) {
  3333. if (!isInc) /* already locked, but not by inc update - break */
  3334. break;
  3335. isInc = PR_FALSE;
  3336. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3337. "replica_disable_replication - "
  3338. "replica %s is already locked by (%s) for incoming "
  3339. "incremental update; sleeping 100ms\n",
  3340. slapi_sdn_get_ndn(replica_get_root(r)),
  3341. current_purl ? current_purl : "unknown");
  3342. slapi_ch_free_string(&current_purl);
  3343. DS_Sleep(PR_MillisecondsToInterval(100));
  3344. }
  3345. slapi_ch_free_string(&current_purl);
  3346. slapi_ch_free_string(&locking_purl);
  3347. replica_set_state_flag(r, REPLICA_AGREEMENTS_DISABLED, PR_FALSE);
  3348. PR_Unlock(r->agmt_lock);
  3349. /* no thread will access the changelog for this replica
  3350. * remove reference from replica object
  3351. */
  3352. if (r->repl_flags & REPLICA_LOG_CHANGES) {
  3353. cldb_UnSetReplicaDB(r, NULL);
  3354. }
  3355. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "replica_disable_replication - "
  3356. "replica %s is acquired\n",
  3357. slapi_sdn_get_ndn(replica_get_root(r)));
  3358. }
  3359. static void
  3360. start_agreements_for_replica(Replica *r, PRBool start)
  3361. {
  3362. Object *agmt_obj;
  3363. Repl_Agmt *agmt;
  3364. agmt_obj = agmtlist_get_first_agreement_for_replica(r);
  3365. while (agmt_obj) {
  3366. agmt = (Repl_Agmt *)object_get_data(agmt_obj);
  3367. PR_ASSERT(agmt);
  3368. if (agmt_is_enabled(agmt)) {
  3369. if (start)
  3370. agmt_start(agmt);
  3371. else /* stop */
  3372. agmt_stop(agmt);
  3373. }
  3374. agmt_obj = agmtlist_get_next_agreement_for_replica(r, agmt_obj);
  3375. }
  3376. }
  3377. int
  3378. replica_start_agreement(Replica *r, Repl_Agmt *ra)
  3379. {
  3380. int ret = 0;
  3381. if (r == NULL)
  3382. return -1;
  3383. PR_Lock(r->agmt_lock);
  3384. if (!replica_is_state_flag_set(r, REPLICA_AGREEMENTS_DISABLED) && agmt_is_enabled(ra)) {
  3385. ret = agmt_start(ra); /* Start the replication agreement */
  3386. }
  3387. PR_Unlock(r->agmt_lock);
  3388. return ret;
  3389. }
  3390. int
  3391. windows_replica_start_agreement(Replica *r, Repl_Agmt *ra)
  3392. {
  3393. int ret = 0;
  3394. if (r == NULL)
  3395. return -1;
  3396. PR_Lock(r->agmt_lock);
  3397. if (!replica_is_state_flag_set(r, REPLICA_AGREEMENTS_DISABLED)) {
  3398. ret = windows_agmt_start(ra); /* Start the replication agreement */
  3399. /* ret = windows_agmt_start(ra); Start the replication agreement */
  3400. }
  3401. PR_Unlock(r->agmt_lock);
  3402. return ret;
  3403. }
  3404. /*
  3405. * A callback function registered as op->o_csngen_handler and
  3406. * called by backend ops to generate opcsn.
  3407. */
  3408. int32_t
  3409. replica_generate_next_csn(Slapi_PBlock *pb, const CSN *basecsn, CSN **opcsn)
  3410. {
  3411. Replica *replica = replica_get_replica_for_op(pb);
  3412. if (NULL != replica) {
  3413. Slapi_Operation *op;
  3414. slapi_pblock_get(pb, SLAPI_OPERATION, &op);
  3415. if (replica->repl_type != REPLICA_TYPE_READONLY) {
  3416. Object *gen_obj = replica_get_csngen(replica);
  3417. if (NULL != gen_obj) {
  3418. CSNGen *gen = (CSNGen *)object_get_data(gen_obj);
  3419. if (NULL != gen) {
  3420. /* The new CSN should be greater than the base CSN */
  3421. if (csngen_new_csn(gen, opcsn, PR_FALSE /* don't notify */) != CSN_SUCCESS) {
  3422. /* Failed to generate CSN we must abort */
  3423. object_release(gen_obj);
  3424. return -1;
  3425. }
  3426. if (csn_compare(*opcsn, basecsn) <= 0) {
  3427. char opcsnstr[CSN_STRSIZE];
  3428. char basecsnstr[CSN_STRSIZE];
  3429. char opcsn2str[CSN_STRSIZE];
  3430. csn_as_string(*opcsn, PR_FALSE, opcsnstr);
  3431. csn_as_string(basecsn, PR_FALSE, basecsnstr);
  3432. csn_free(opcsn);
  3433. csngen_adjust_time(gen, basecsn);
  3434. if (csngen_new_csn(gen, opcsn, PR_FALSE) != CSN_SUCCESS) {
  3435. /* Failed to generate CSN we must abort */
  3436. object_release(gen_obj);
  3437. return -1;
  3438. }
  3439. csn_as_string(*opcsn, PR_FALSE, opcsn2str);
  3440. slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name,
  3441. "replica_generate_next_csn - "
  3442. "opcsn=%s <= basecsn=%s, adjusted opcsn=%s\n",
  3443. opcsnstr, basecsnstr, opcsn2str);
  3444. }
  3445. /*
  3446. * Insert opcsn into the csn pending list.
  3447. * This is the notify effect in csngen_new_csn().
  3448. */
  3449. assign_csn_callback(*opcsn, (void *)replica);
  3450. }
  3451. object_release(gen_obj);
  3452. }
  3453. }
  3454. }
  3455. return 0;
  3456. }
  3457. /*
  3458. * A callback function registed as op->o_replica_attr_handler and
  3459. * called by backend ops to get replica attributes.
  3460. */
  3461. int
  3462. replica_get_attr(Slapi_PBlock *pb, const char *type, void *value)
  3463. {
  3464. int rc = -1;
  3465. Replica *replica = replica_get_replica_for_op(pb);
  3466. if (NULL != replica) {
  3467. if (strcasecmp(type, type_replicaTombstonePurgeInterval) == 0) {
  3468. *((int *)value) = replica->tombstone_reap_interval;
  3469. rc = 0;
  3470. } else if (strcasecmp(type, type_replicaPurgeDelay) == 0) {
  3471. *((int *)value) = replica->repl_purge_delay;
  3472. rc = 0;
  3473. }
  3474. }
  3475. return rc;
  3476. }
  3477. uint64_t
  3478. replica_get_backoff_min(Replica *r)
  3479. {
  3480. if (r) {
  3481. return slapi_counter_get_value(r->backoff_min);
  3482. } else {
  3483. return PROTOCOL_BACKOFF_MINIMUM;
  3484. }
  3485. }
  3486. uint64_t
  3487. replica_get_backoff_max(Replica *r)
  3488. {
  3489. if (r) {
  3490. return slapi_counter_get_value(r->backoff_max);
  3491. } else {
  3492. return PROTOCOL_BACKOFF_MAXIMUM;
  3493. }
  3494. }
  3495. void
  3496. replica_set_backoff_min(Replica *r, uint64_t min)
  3497. {
  3498. if (r) {
  3499. slapi_counter_set_value(r->backoff_min, min);
  3500. }
  3501. }
  3502. void
  3503. replica_set_backoff_max(Replica *r, uint64_t max)
  3504. {
  3505. if (r) {
  3506. slapi_counter_set_value(r->backoff_max, max);
  3507. }
  3508. }
  3509. void
  3510. replica_set_precise_purging(Replica *r, uint64_t on_off)
  3511. {
  3512. if (r) {
  3513. slapi_counter_set_value(r->precise_purging, on_off);
  3514. }
  3515. }
  3516. uint64_t
  3517. replica_get_precise_purging(Replica *r)
  3518. {
  3519. if (r) {
  3520. return slapi_counter_get_value(r->precise_purging);
  3521. } else {
  3522. return 0;
  3523. }
  3524. }
  3525. int
  3526. replica_get_agmt_count(Replica *r)
  3527. {
  3528. return r->agmt_count;
  3529. }
  3530. void
  3531. replica_incr_agmt_count(Replica *r)
  3532. {
  3533. if (r) {
  3534. r->agmt_count++;
  3535. }
  3536. }
  3537. void
  3538. replica_decr_agmt_count(Replica *r)
  3539. {
  3540. if (r) {
  3541. if (r->agmt_count > 0) {
  3542. r->agmt_count--;
  3543. }
  3544. }
  3545. }
  3546. /*
  3547. * Add the "Abort Replication Session" control to the pblock
  3548. */
  3549. static void
  3550. replica_add_session_abort_control(Slapi_PBlock *pb)
  3551. {
  3552. LDAPControl ctrl = {0};
  3553. BerElement *ber;
  3554. struct berval *bvp;
  3555. int rc;
  3556. /* Build the BER payload */
  3557. if ((ber = der_alloc()) == NULL) {
  3558. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3559. "add_session_abort_control - Failed to create ber\n");
  3560. return;
  3561. }
  3562. rc = ber_printf(ber, "{}");
  3563. if (rc != -1) {
  3564. rc = ber_flatten(ber, &bvp);
  3565. }
  3566. ber_free(ber, 1);
  3567. if (rc == -1) {
  3568. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3569. "add_session_abort_control - Failed to flatten ber\n");
  3570. return;
  3571. }
  3572. ctrl.ldctl_oid = slapi_ch_strdup(REPL_ABORT_SESSION_OID);
  3573. ctrl.ldctl_value = *bvp;
  3574. bvp->bv_val = NULL;
  3575. ber_bvfree(bvp);
  3576. slapi_pblock_set(pb, SLAPI_ADD_RESCONTROL, &ctrl);
  3577. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  3578. "add_session_abort_control - abort control successfully added to result\n");
  3579. }
  3580. /*
  3581. * Check if we have exceeded the failed replica acquire limit,
  3582. * if so, end the replication session.
  3583. */
  3584. void
  3585. replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
  3586. {
  3587. replica_lock(r->repl_lock);
  3588. if (r->abort_session == ABORT_SESSION) {
  3589. /* Need to abort this session (just send the control once) */
  3590. replica_add_session_abort_control(pb);
  3591. r->abort_session = SESSION_ABORTED;
  3592. }
  3593. replica_unlock(r->repl_lock);
  3594. }
  3595. void
  3596. replica_lock_replica(Replica *r)
  3597. {
  3598. replica_lock(r->repl_lock);
  3599. }
  3600. void
  3601. replica_unlock_replica(Replica *r)
  3602. {
  3603. replica_unlock(r->repl_lock);
  3604. }
  3605. void* replica_get_file_info(Replica *r)
  3606. {
  3607. return r->cldb;
  3608. }
  3609. int replica_set_file_info(Replica *r, void *cl)
  3610. {
  3611. r->cldb = (cldb_Handle *)cl;
  3612. return 0;
  3613. }