|
|
@@ -10,15 +10,30 @@ import ldap
|
|
|
import os
|
|
|
import decimal
|
|
|
import time
|
|
|
+import logging
|
|
|
+import uuid
|
|
|
+
|
|
|
+from itertools import permutations
|
|
|
from lib389._constants import *
|
|
|
from lib389.properties import *
|
|
|
-from lib389.utils import normalizeDN, escapeDNValue, ensure_bytes
|
|
|
+from lib389.utils import normalizeDN, escapeDNValue, ensure_bytes, ensure_str, ensure_list_str, ds_is_older
|
|
|
from lib389._replication import RUV
|
|
|
from lib389.repltools import ReplTools
|
|
|
from lib389 import DirSrv, Entry, NoSuchEntryError, InvalidArgumentError
|
|
|
from lib389._mapped_object import DSLdapObjects, DSLdapObject
|
|
|
+from lib389.passwd import password_generate
|
|
|
+from lib389.mappingTree import MappingTrees
|
|
|
+from lib389.agreement import Agreements
|
|
|
+from lib389.changelog import Changelog5
|
|
|
+
|
|
|
from lib389.idm.domain import Domain
|
|
|
|
|
|
+from lib389.idm.group import Groups
|
|
|
+from lib389.idm.services import ServiceAccounts
|
|
|
+from lib389.idm.organisationalunit import OrganisationalUnits
|
|
|
+
|
|
|
+from lib389.agreement import Agreements
|
|
|
+
|
|
|
|
|
|
class ReplicaLegacy(object):
|
|
|
proxied_methods = 'search_s getEntry'.split()
|
|
|
@@ -791,6 +806,81 @@ class ReplicaLegacy(object):
|
|
|
raise ValueError('Failed to update replica: ' + str(e))
|
|
|
|
|
|
|
|
|
+class RUV(object):
|
|
|
+ """Represents the server in memory RUV object. The RUV contains each
|
|
|
+ update vector the server knows of, along with knowledge of CSN state of the
|
|
|
+ replica we have sent data to.
|
|
|
+
|
|
|
+ :param ruvs: A list of nsds50ruv values.
|
|
|
+ :type ruvs: list[str]
|
|
|
+ :param logger: A logging interface.
|
|
|
+ :type logger: logging object
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, ruvs, logger=None):
|
|
|
+ if logger is not None:
|
|
|
+ self._log = logger
|
|
|
+ else:
|
|
|
+ self._log = logging.getLogger(__name__)
|
|
|
+ self._rids = []
|
|
|
+ self._rid_csn = {}
|
|
|
+ self._rid_url = {}
|
|
|
+ self._data_generation = None
|
|
|
+ # Process the array of data
|
|
|
+ for r in ruvs:
|
|
|
+ pr = r.replace('{', '').replace('}', '').split(' ')
|
|
|
+ if pr[0] == 'replicageneration':
|
|
|
+ # replicageneration 5a2ffd0f000000010000
|
|
|
+ self._data_generation = pr[1]
|
|
|
+ elif pr[0] == 'replica':
|
|
|
+ # replica 1 ldap://ldapkdc.example.com:39001 5a2ffd0f000100010000 5a2ffd0f000200010000
|
|
|
+ # Don't add rids if they have no csn (no writes) yet.
|
|
|
+ rid = pr[1]
|
|
|
+ self._rids.append(rid)
|
|
|
+ self._rid_url[rid] = pr[2]
|
|
|
+ try:
|
|
|
+ self._rid_csn[rid] = pr[4]
|
|
|
+ except IndexError:
|
|
|
+ self._rid_csn[rid] = '00000000000000000000'
|
|
|
+
|
|
|
+ def alloc_rid(self):
|
|
|
+ """Based on the RUV, determine an available RID for the replication
|
|
|
+ topology that is unique.
|
|
|
+
|
|
|
+ :returns: str
|
|
|
+ """
|
|
|
+ self._log.debug("Allocated rids: %s" % self._rids)
|
|
|
+ for i in range(1, 65534):
|
|
|
+ self._log.debug("Testing ... %s" % i)
|
|
|
+ if str(i) not in self._rids:
|
|
|
+ return str(i)
|
|
|
+ raise Exception("Unable to alloc rid!")
|
|
|
+
|
|
|
+ def is_synced(self, other_ruv):
|
|
|
+ """Compare two server ruvs to determine if they are synced. This does not
|
|
|
+ mean that replication is in sync (due to things like fractional repl), but
|
|
|
+ in some cases can show that "at least some known point" has been achieved in
|
|
|
+ the replication process.
|
|
|
+
|
|
|
+ :param other_ruv: The other ruv object
|
|
|
+ :type other_ruv: RUV object
|
|
|
+ :returns: bool
|
|
|
+ """
|
|
|
+ self._log.debug("RUV: Comparing dg %s %s" % (self._data_generation, other_ruv._data_generation))
|
|
|
+ if self._data_generation != other_ruv._data_generation:
|
|
|
+ self._log.debug("RUV: Incorrect datageneration")
|
|
|
+ return False
|
|
|
+ if set(self._rids) != set(other_ruv._rids):
|
|
|
+ self._log.debug("RUV: Incorrect rid lists, is sync working?")
|
|
|
+ return False
|
|
|
+ for rid in self._rids:
|
|
|
+ my_csn = self._rid_csn.get(rid, '00000000000000000000')
|
|
|
+ other_csn = other_ruv._rid_csn.get(rid, '00000000000000000000')
|
|
|
+ self._log.debug("RUV: Comparing csn %s %s %s" % (rid, my_csn, other_csn))
|
|
|
+ if my_csn < other_csn:
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
class Replica(DSLdapObject):
|
|
|
"""Replica DSLdapObject with:
|
|
|
- must attributes = ['cn', 'nsDS5ReplicaType', 'nsDS5ReplicaRoot',
|
|
|
@@ -807,15 +897,37 @@ class Replica(DSLdapObject):
|
|
|
def __init__(self, instance, dn=None):
|
|
|
super(Replica, self).__init__(instance, dn)
|
|
|
self._rdn_attribute = 'cn'
|
|
|
- self._must_attributes = ['cn', REPL_TYPE,
|
|
|
- REPL_ROOT, REPL_BINDDN, REPL_ID]
|
|
|
-
|
|
|
- self._create_objectclasses = ['top', 'extensibleObject',
|
|
|
- REPLICA_OBJECTCLASS_VALUE]
|
|
|
+ self._must_attributes = [
|
|
|
+ 'cn',
|
|
|
+ 'nsDS5ReplicaType',
|
|
|
+ 'nsDS5ReplicaRoot',
|
|
|
+ 'nsDS5ReplicaId',
|
|
|
+ ]
|
|
|
+
|
|
|
+ self._create_objectclasses = [
|
|
|
+ 'top',
|
|
|
+ 'nsds5Replica'
|
|
|
+ ]
|
|
|
+ if ds_is_older('1.4.0'):
|
|
|
+ self._create_objectclasses.append('extensibleobject')
|
|
|
self._protected = False
|
|
|
self._suffix = None
|
|
|
|
|
|
- @staticmethod
|
|
|
+ def _validate(self, rdn, properties, basedn):
|
|
|
+ (tdn, str_props) = super(Replica, self)._validate(rdn, properties, basedn)
|
|
|
+ # We override the tdn here. We use the MT for the suffix.
|
|
|
+ mts = MappingTrees(self._instance)
|
|
|
+ s_suffix = ensure_str(str_props['nsDS5ReplicaRoot'][0])
|
|
|
+ mt = mts.get(s_suffix)
|
|
|
+ tdn = 'cn=replica,%s' % mt.dn
|
|
|
+ return (tdn, str_props)
|
|
|
+
|
|
|
+ def _populate_suffix(self):
|
|
|
+ """Some internal tasks need this populated.
|
|
|
+ """
|
|
|
+ if self._suffix is None:
|
|
|
+ self._suffix = self.get_attr_val_utf8('nsDS5ReplicaRoot')
|
|
|
+
|
|
|
def _valid_role(role):
|
|
|
"""Return True if role is valid
|
|
|
|
|
|
@@ -831,7 +943,6 @@ class Replica(DSLdapObject):
|
|
|
else:
|
|
|
return True
|
|
|
|
|
|
- @staticmethod
|
|
|
def _valid_rid(role, rid=None):
|
|
|
"""Return True if rid is valid for the replica role
|
|
|
|
|
|
@@ -866,49 +977,21 @@ class Replica(DSLdapObject):
|
|
|
:raises: - InvalidArgumentError - if suffix is missing
|
|
|
- ldap.LDAPError - for all other update failures
|
|
|
"""
|
|
|
-
|
|
|
- # Get the suffix
|
|
|
- suffix = self.get_attr_val(REPL_ROOT)
|
|
|
- if not suffix:
|
|
|
- self.log.fatal("disableReplication: suffix is not defined")
|
|
|
- raise InvalidArgumentError("suffix missing")
|
|
|
-
|
|
|
# Delete the agreements
|
|
|
- try:
|
|
|
- self.deleteAgreements()
|
|
|
- except ldap.LDAPError as e:
|
|
|
- self.log.fatal('Failed to delete replica agreements!')
|
|
|
- raise e
|
|
|
-
|
|
|
+ self._delete_agreements()
|
|
|
# Delete the replica
|
|
|
- try:
|
|
|
- super(Replica, self).delete()
|
|
|
- except ldap.LDAPError as e:
|
|
|
- self.log.fatal('Failed to delete replica configuration ' +
|
|
|
- '(%s), error: %s' % (self._dn, str(e)))
|
|
|
- raise e
|
|
|
+ return super(Replica, self).delete()
|
|
|
|
|
|
- def deleteAgreements(self):
|
|
|
+ def _delete_agreements(self):
|
|
|
"""Delete all the agreements for the suffix
|
|
|
|
|
|
:raises: LDAPError - If failing to delete or search for agreeme :type binddn: strnts
|
|
|
"""
|
|
|
-
|
|
|
- # Delete the agreements
|
|
|
- try:
|
|
|
- suffix = self.get_attr_val(REPL_ROOT)
|
|
|
- agmts = self._instance.agreement.list(suffix=suffix)
|
|
|
- for agmt in agmts:
|
|
|
- try:
|
|
|
- self._instance.delete_s(agmt.dn)
|
|
|
- except ldap.LDAPError as e:
|
|
|
- self.log.fatal('Failed to delete replica agreement (%s),' +
|
|
|
- ' error: %s' % (agmt.dn, str(e)))
|
|
|
- raise e
|
|
|
- except ldap.LDAPError as e:
|
|
|
- self.log.fatal('Failed to search for replication agreements ' +
|
|
|
- 'under (%s), error: %s' % (self._dn, str(e)))
|
|
|
- raise e
|
|
|
+ # Get the suffix
|
|
|
+ self._populate_suffix()
|
|
|
+ agmts = self.get_agreements()
|
|
|
+ for agmt in agmts.list():
|
|
|
+ agmt.delete()
|
|
|
|
|
|
def promote(self, newrole, binddn=None, rid=None):
|
|
|
"""Promote the replica to hub or master
|
|
|
@@ -1038,127 +1121,7 @@ class Replica(DSLdapObject):
|
|
|
|
|
|
return replicarole
|
|
|
|
|
|
- def check_init(self, agmtdn):
|
|
|
- """Check that a total update has completed
|
|
|
-
|
|
|
- :param agmtdn: The agreement DN
|
|
|
- :type agmtdn: str
|
|
|
-
|
|
|
- :returns: A tuple - first element is done/not done, 2nd is no error/has error
|
|
|
-
|
|
|
- THIS SHOULD BE IN THE NEW AGREEMENT CLASS
|
|
|
- """
|
|
|
-
|
|
|
- done, hasError = False, 0
|
|
|
- attrlist = ['cn',
|
|
|
- 'nsds5BeginReplicaRefresh',
|
|
|
- 'nsds5replicaUpdateInProgress',
|
|
|
- 'nsds5ReplicaLastInitStatus',
|
|
|
- 'nsds5ReplicaLastInitStart',
|
|
|
- 'nsds5ReplicaLastInitEnd']
|
|
|
- try:
|
|
|
- entry = self._instance.getEntry(
|
|
|
- agmtdn, ldap.SCOPE_BASE, "(objectclass=*)", attrlist)
|
|
|
- except NoSuchEntryError:
|
|
|
- self._log.exception("Error reading status from agreement {}".format(agmtdn))
|
|
|
- hasError = 1
|
|
|
- else:
|
|
|
- refresh = entry.nsds5BeginReplicaRefresh
|
|
|
- inprogress = entry.nsds5replicaUpdateInProgress
|
|
|
- status = entry.nsds5ReplicaLastInitStatus
|
|
|
- if not refresh: # done - check status
|
|
|
- if not status:
|
|
|
- print("No status yet")
|
|
|
- elif status.find(b"replica busy") > -1:
|
|
|
- print("Update failed - replica busy - status", status)
|
|
|
- done = True
|
|
|
- hasError = 2
|
|
|
- elif status.find(b"Total update succeeded") > -1:
|
|
|
- print("Update succeeded: status ", status)
|
|
|
- done = True
|
|
|
- elif inprogress.lower() == 'true':
|
|
|
- print("Update in progress yet not in progress: status ",
|
|
|
- status)
|
|
|
- else:
|
|
|
- print("Update failed: status", status)
|
|
|
- hasError = 1
|
|
|
- done = True
|
|
|
- elif self.verbose:
|
|
|
- print("Update in progress: status", status)
|
|
|
-
|
|
|
- return done, hasError
|
|
|
-
|
|
|
- def wait_init(self, agmtdn):
|
|
|
- """Initialize replication and wait for completion.
|
|
|
-
|
|
|
- :param agmtdn: The agreement DN
|
|
|
- :type agmtdn: str
|
|
|
-
|
|
|
- :returns: 0 if the initialization is complete
|
|
|
-
|
|
|
- THIS SHOULD BE IN THE NEW AGREEMENT CLASS
|
|
|
- """
|
|
|
-
|
|
|
- done = False
|
|
|
- haserror = 0
|
|
|
- while not done and not haserror:
|
|
|
- time.sleep(1) # give it a few seconds to get going
|
|
|
- done, haserror = self.check_init(agmtdn)
|
|
|
- return haserror
|
|
|
-
|
|
|
- def start_and_wait(self, agmtdn):
|
|
|
- """Initialize an agreement and wait for it to complete
|
|
|
-
|
|
|
- :param agmtdn: The agreement DN
|
|
|
- :type agmtdn: str
|
|
|
-
|
|
|
- :returns: 0 if the initialization is complete
|
|
|
-
|
|
|
- THIS SHOULD BE IN THE NEW AGREEMENT CLASS
|
|
|
- """
|
|
|
-
|
|
|
- rc = self.start_async(agmtdn)
|
|
|
- if not rc:
|
|
|
- rc = self.wait_init(agmtdn)
|
|
|
- if rc == 2: # replica busy - retry
|
|
|
- rc = self.start_and_wait(agmtdn)
|
|
|
- return rc
|
|
|
-
|
|
|
- def start_async(self, agmtdn):
|
|
|
- """Initialize replication without waiting
|
|
|
-
|
|
|
- :param agmtdn: The agreement DN
|
|
|
- :type agmtdn: str
|
|
|
-
|
|
|
- :returns: None
|
|
|
-
|
|
|
- THIS SHOULD BE IN THE NEW AGREEMENT CLASS
|
|
|
- """
|
|
|
-
|
|
|
- self._log.info("Starting async replication %s" % agmtdn)
|
|
|
- mod = [(ldap.MOD_ADD, 'nsds5BeginReplicaRefresh', b'start')]
|
|
|
- self._instance.modify_s(agmtdn, mod)
|
|
|
-
|
|
|
- def get_ruv_entry(self):
|
|
|
- """Return the database RUV entry
|
|
|
-
|
|
|
- :returns: The database RUV entry
|
|
|
- :raises: ValeuError - If suffix is not setup for replication
|
|
|
- LDAPError - If there is a problem trying to search for the RUV
|
|
|
- """
|
|
|
-
|
|
|
- try:
|
|
|
- entry = self._instance.search_s(self._suffix,
|
|
|
- ldap.SCOPE_SUBTREE,
|
|
|
- REPLICA_RUV_FILTER)
|
|
|
- if entry:
|
|
|
- return entry[0]
|
|
|
- else:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % self._suffix)
|
|
|
- except ldap.LDAPError as e:
|
|
|
- raise e
|
|
|
-
|
|
|
- def test(self, *replica_dirsrvs):
|
|
|
+ def test_replication(self, replica_dirsrvs):
|
|
|
"""Make a "dummy" update on the the replicated suffix, and check
|
|
|
all the provided replicas to see if they received the update.
|
|
|
|
|
|
@@ -1204,6 +1167,42 @@ class Replica(DSLdapObject):
|
|
|
|
|
|
return True
|
|
|
|
|
|
+ def get_agreements(self):
|
|
|
+ """Return the set of agreements related to this suffix replica
|
|
|
+
|
|
|
+ :returns: Agreements object
|
|
|
+ """
|
|
|
+ return Agreements(self._instance, self.dn)
|
|
|
+
|
|
|
+ def get_rid(self):
|
|
|
+ """Return the current replicas RID for this suffix
|
|
|
+
|
|
|
+ :returns: str
|
|
|
+ """
|
|
|
+ return self.get_attr_val_utf8('nsDS5ReplicaId')
|
|
|
+
|
|
|
+ def get_ruv(self):
|
|
|
+ """Return the in memory ruv of this replica suffix.
|
|
|
+
|
|
|
+ :returns: RUV object
|
|
|
+ """
|
|
|
+ self._populate_suffix()
|
|
|
+
|
|
|
+ ent = self._instance.search_ext_s(
|
|
|
+ base=self._suffix,
|
|
|
+ scope=ldap.SCOPE_SUBTREE,
|
|
|
+ filterstr='(&(nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff)(objectclass=nstombstone))',
|
|
|
+ attrlist=['nsds50ruv'],
|
|
|
+ serverctrls=self._server_controls, clientctrls=self._client_controls)[0]
|
|
|
+
|
|
|
+ data = ensure_list_str(ent.getValues('nsds50ruv'))
|
|
|
+
|
|
|
+ return RUV(data)
|
|
|
+
|
|
|
+ def begin_task_cl2ldif(self):
|
|
|
+ """Begin the changelog to ldif task
|
|
|
+ """
|
|
|
+ self.replace('nsds5task', 'cl2ldif')
|
|
|
|
|
|
class Replicas(DSLdapObjects):
|
|
|
"""Replica DSLdapObjects for all replicas
|
|
|
@@ -1235,217 +1234,628 @@ class Replicas(DSLdapObjects):
|
|
|
replica = super(Replicas, self).get(selector, dn)
|
|
|
if replica:
|
|
|
# Get and set the replica's suffix
|
|
|
- replica._suffix = replica.get_attr_val(REPL_ROOT)
|
|
|
+ replica._populate_suffix()
|
|
|
return replica
|
|
|
|
|
|
- def enable(self, suffix, role, replicaID=None, args=None):
|
|
|
- """Enable replication for this suffix
|
|
|
+class BootstrapReplicationManager(DSLdapObject):
|
|
|
+ """A Replication Manager credential for bootstrapping the repl process.
|
|
|
+ This is used by the replication manager object to coordinate the initial
|
|
|
+ init so that server creds are available.
|
|
|
|
|
|
- :param suffix: The suffix to enable replication for
|
|
|
- :type suffix: str
|
|
|
- :param role: MASTER, HUB and CONSUMER
|
|
|
- :type role: ReplicaRole
|
|
|
- :param replicaID: number that identify the supplier replica
|
|
|
- (role=ReplicaRole.MASTER) in the topology.
|
|
|
- For hub/consumer (role=ReplicaRole.HUB or
|
|
|
- ReplicaRole.CONSUMER), rid value is not used.
|
|
|
- This parameter is mandatory for supplier.
|
|
|
- :type replicaID: int
|
|
|
- :param args: A dictionary of additional replica properties
|
|
|
- :type args: dict
|
|
|
-
|
|
|
- :returns: Replica DSLdapObject
|
|
|
- :raises: - InvalidArgumentError - if missing mandatory arguments
|
|
|
- - ValueError - argument with invalid value
|
|
|
- - LDAPError - failed to add replica entry
|
|
|
- """
|
|
|
+ :param instance: An instance
|
|
|
+ :type instance: lib389.DirSrv
|
|
|
+ :param dn: The dn to create
|
|
|
+ :type dn: str
|
|
|
+ """
|
|
|
+ def __init__(self, instance, dn='cn=replication manager,cn=config'):
|
|
|
+ super(BootstrapReplicationManager, self).__init__(instance, dn)
|
|
|
+ self._rdn_attribute = 'cn'
|
|
|
+ self._must_attributes = ['cn', 'userPassword']
|
|
|
+ self._create_objectclasses = [
|
|
|
+ 'top',
|
|
|
+ 'netscapeServer'
|
|
|
+ ]
|
|
|
+ self._protected = False
|
|
|
+ self.common_name = 'replication manager'
|
|
|
|
|
|
- # Normalize the suffix
|
|
|
- suffix = normalizeDN(suffix)
|
|
|
|
|
|
- # Check validity of role
|
|
|
- if not role:
|
|
|
- self._log.fatal("Replica.create: replica role is not specified (ReplicaRole.*)")
|
|
|
- raise InvalidArgumentError("role missing")
|
|
|
+class ReplicationManager(object):
|
|
|
+ """The lib389 replication manager. This is used to coordinate
|
|
|
+ replicas and agreements between servers.
|
|
|
|
|
|
- if not Replica._valid_role(role):
|
|
|
- self._log.fatal("enableReplication: replica role invalid (%s) " % role)
|
|
|
- raise ValueError("invalid role: %s" % role)
|
|
|
+ Unlike the raw replicas / agreement types that manipulate the
|
|
|
+ servers configuration, this is a "high level" coordination type.
|
|
|
+ It's capable of taking multiple instances and joining them. It
|
|
|
+ consumes many lib389 types like Replicas, Agreements and more.
|
|
|
|
|
|
- # role is fine, set the replica type
|
|
|
- if role == ReplicaRole.MASTER:
|
|
|
- rtype = REPLICA_RDWR_TYPE
|
|
|
- # check the validity of 'rid'
|
|
|
- if not Replica._valid_rid(role, rid=replicaID):
|
|
|
- self._log.fatal("Replica.create: replica role is master but " +
|
|
|
- "'rid' is missing or invalid value")
|
|
|
- raise InvalidArgumentError("rid missing or invalid value")
|
|
|
- else:
|
|
|
- rtype = REPLICA_RDONLY_TYPE
|
|
|
+ It is capable of creating the first master in a topoolgy, joining
|
|
|
+ masters and consumers to that topology, populating per-server
|
|
|
+ replication credentials, dynamic rid allocation, and more.
|
|
|
|
|
|
- # Set the properties provided as mandatory parameter
|
|
|
- properties = {'cn': 'replica',
|
|
|
- REPL_ROOT: suffix,
|
|
|
- REPL_ID: str(replicaID),
|
|
|
- REPL_TYPE: str(rtype)}
|
|
|
+ Unlike hand management of agreements, this is able to take simpler
|
|
|
+ steps to agreement creation. For example:
|
|
|
|
|
|
- # If the properties in args are valid add them to 'properties'
|
|
|
- if args:
|
|
|
- for prop in args:
|
|
|
- if not inProperties(prop, REPLICA_PROPNAME_TO_ATTRNAME):
|
|
|
- raise ValueError("unknown property: %s" % prop)
|
|
|
- properties[prop] = args[prop]
|
|
|
+ repl = ReplicationManager(<suffix>)
|
|
|
+ repl.create_first_master(master1)
|
|
|
+ repl.join_master(master1, master2)
|
|
|
|
|
|
- # Set flags explicitly, so it will be more readable
|
|
|
- if role == ReplicaRole.CONSUMER:
|
|
|
- properties[REPL_FLAGS] = str(REPLICA_FLAGS_RDONLY)
|
|
|
+ Contrast to previous implementations of replication which required
|
|
|
+ much more knowledge and parameters, this is able to securely add
|
|
|
+ masters.
|
|
|
+
|
|
|
+ :param suffix: The suffix to replicate.
|
|
|
+ :type suffix: str
|
|
|
+ :param logger: A logging interface
|
|
|
+ :type logger: python logging
|
|
|
+
|
|
|
+ """
|
|
|
+ def __init__(self, suffix, logger=None):
|
|
|
+ self._suffix = suffix
|
|
|
+ if logger is not None:
|
|
|
+ self._log = logger
|
|
|
else:
|
|
|
- properties[REPL_FLAGS] = str(REPLICA_FLAGS_WRITE)
|
|
|
+ self._log = logging.getLogger(__name__)
|
|
|
+ self._alloc_rids = []
|
|
|
|
|
|
- # Check if replica entry is already in the mapping-tree
|
|
|
+ def _ensure_changelog(self, instance):
|
|
|
+ """Internally guarantee a changelog exists for
|
|
|
+ an instance. Internal only.
|
|
|
+ """
|
|
|
+ cl = Changelog5(instance)
|
|
|
try:
|
|
|
- replica = self.get(suffix)
|
|
|
- # Should we return an error, or just return the existing relica?
|
|
|
- self._log.warn("Already setup replica for suffix %s" % suffix)
|
|
|
- return replica
|
|
|
+ cl.create(properties={
|
|
|
+ 'cn': 'changelog5',
|
|
|
+ 'nsslapd-changelogdir': instance.get_changelog_dir()
|
|
|
+ })
|
|
|
+ except ldap.ALREADY_EXISTS:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def _inst_to_agreement_name(self, to_instance):
|
|
|
+ """From an instance, determine the agreement name that we
|
|
|
+ would use for it. Internal only.
|
|
|
+ """
|
|
|
+ to_replicas = Replicas(to_instance)
|
|
|
+ to_r = to_replicas.get(self._suffix)
|
|
|
+ return to_r.get_rid()
|
|
|
+
|
|
|
+ def create_first_master(self, instance):
|
|
|
+ """In a topology, this creates the "first" master that has the
|
|
|
+ database and content. A number of bootstrap tasks are performed
|
|
|
+ on this master, as well as creating it's replica type.
|
|
|
+
|
|
|
+ Once the first master is created, all other masters can be joined to
|
|
|
+ it via "join_master".
|
|
|
+
|
|
|
+ :param instance: An instance
|
|
|
+ :type instance: lib389.DirSrv
|
|
|
+ """
|
|
|
+ # This is a special wrapper to create. We know it's a master,
|
|
|
+ # and this is the "first" of the topology.
|
|
|
+ # So this can wrap it and make it easy.
|
|
|
+ self._log.debug("Creating first master on %s" % instance.ldapuri)
|
|
|
+
|
|
|
+ self._ensure_changelog(instance)
|
|
|
+
|
|
|
+ rgroup_dn = self._create_service_account(instance, instance)
|
|
|
+
|
|
|
+ # Allocate the first rid, 1.
|
|
|
+ replicas = Replicas(instance)
|
|
|
+ replicas.create(properties={
|
|
|
+ 'cn': 'replica',
|
|
|
+ 'nsDS5ReplicaRoot': self._suffix,
|
|
|
+ 'nsDS5ReplicaId': '1',
|
|
|
+ 'nsDS5Flags': '1',
|
|
|
+ 'nsDS5ReplicaType': '3',
|
|
|
+ 'nsDS5ReplicaBindDNGroup': rgroup_dn,
|
|
|
+ 'nsds5replicabinddngroupcheckinterval': '0'
|
|
|
+ })
|
|
|
+ self._log.debug("SUCCESS: Created first master on %s" % instance.ldapuri)
|
|
|
+
|
|
|
+ def _create_service_group(self, from_instance):
|
|
|
+ """Internally create the service group that contains replication managers.
|
|
|
+ This may become part of the default objects in the future. Internal only.
|
|
|
+ """
|
|
|
+ groups = Groups(from_instance, basedn=self._suffix, rdn=None)
|
|
|
+ repl_group = groups.ensure_state(properties={
|
|
|
+ 'cn': 'replication_managers',
|
|
|
+ })
|
|
|
+ return repl_group
|
|
|
+
|
|
|
+ def _create_service_account(self, from_instance, to_instance):
|
|
|
+ """Create the server replication service account, and
|
|
|
+ make it a member of the service group. Internal Only.
|
|
|
+ """
|
|
|
+ repl_group = self._create_service_group(from_instance)
|
|
|
+ # Create our service account.
|
|
|
+ ous = OrganisationalUnits(from_instance, self._suffix)
|
|
|
+ ous.ensure_state(properties={
|
|
|
+ 'ou': 'Services'
|
|
|
+ })
|
|
|
+
|
|
|
+ # Do we have TLS?
|
|
|
+ port = to_instance.sslport
|
|
|
+
|
|
|
+ services = ServiceAccounts(from_instance, self._suffix)
|
|
|
+ # We don't have an agreement yet, so don't bother with the
|
|
|
+ # password yet ...
|
|
|
+ repl_service = services.ensure_state(properties={
|
|
|
+ 'cn': '%s:%s' % (to_instance.host, port),
|
|
|
+ })
|
|
|
+
|
|
|
+ repl_group.ensure_member(repl_service.dn)
|
|
|
+
|
|
|
+ return repl_group.dn
|
|
|
+
|
|
|
+ def _bootstrap_replica(self, from_replica, to_replica, to_instance):
|
|
|
+ """In the master join process a chicken-egg issues arises
|
|
|
+ that we require the service account on the target master for
|
|
|
+ our agreement to be valid, but be can't send it that data without
|
|
|
+ our service account.
|
|
|
+
|
|
|
+ Resolve that issue by "bootstrapping" the database. This creates a
|
|
|
+ bootstrap replication manager and conducts a one-way total init.
|
|
|
+ Once complete the bootstrap agreement is removed, and the service
|
|
|
+ accounts now exist on both ends allowing the join process to continue.
|
|
|
+
|
|
|
+ Internal Only.
|
|
|
+ """
|
|
|
+ repl_manager_password = password_generate()
|
|
|
+ # Create a repl manager on the replica
|
|
|
+ brm = BootstrapReplicationManager(to_instance)
|
|
|
+ brm.create(properties={
|
|
|
+ 'cn': brm.common_name,
|
|
|
+ 'userPassword': repl_manager_password
|
|
|
+ })
|
|
|
+
|
|
|
+ to_replica.set('nsDS5ReplicaBindDN', brm.dn)
|
|
|
+
|
|
|
+ agmt_name = self._inst_to_agreement_name(to_instance)
|
|
|
+
|
|
|
+ # add a temp agreement from A -> B
|
|
|
+ from_agreements = from_replica.get_agreements()
|
|
|
+ temp_agmt = from_agreements.create(properties={
|
|
|
+ 'cn': "temp_%s" % agmt_name,
|
|
|
+ 'nsDS5ReplicaRoot': self._suffix,
|
|
|
+ 'nsDS5ReplicaBindDN': brm.dn,
|
|
|
+ 'nsDS5ReplicaBindMethod': 'simple' ,
|
|
|
+ 'nsDS5ReplicaTransportInfo': 'LDAP',
|
|
|
+ 'nsds5replicaTimeout': '5',
|
|
|
+ 'description': "temp_%s" % agmt_name,
|
|
|
+ 'nsDS5ReplicaHost': to_instance.host,
|
|
|
+ 'nsDS5ReplicaPort': str(to_instance.port),
|
|
|
+ 'nsDS5ReplicaCredentials': repl_manager_password,
|
|
|
+ })
|
|
|
+ # Do a replica refresh.
|
|
|
+ temp_agmt.begin_reinit()
|
|
|
+ (done, error) = temp_agmt.wait_reinit()
|
|
|
+ assert done is True
|
|
|
+ assert error is False
|
|
|
+ # Now remove the temp agmt between A -> B
|
|
|
+ temp_agmt.delete()
|
|
|
+ # Rm the binddn.
|
|
|
+ to_replica.remove_all('nsDS5ReplicaBindDN')
|
|
|
+ # Remove the repl manager.
|
|
|
+ brm.delete()
|
|
|
+ self._log.info("SUCCESS: bootstrap to %s completed" % to_instance.ldapuri)
|
|
|
+
|
|
|
+ def join_master(self, from_instance, to_instance):
|
|
|
+ """Join a new master in MMR to this instance. This will complete
|
|
|
+ a total init of the data "from instance" to "to instance".
|
|
|
+
|
|
|
+ This can be conducted from any master in the topology as "from" master.
|
|
|
+
|
|
|
+ :param from_instance: An instance already in the topology.
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: An instance to join to the topology.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ """
|
|
|
+ # Is the to_instance already a replica of the suffix?
|
|
|
+ to_replicas = Replicas(to_instance)
|
|
|
+ try:
|
|
|
+ to_r = to_replicas.get(self._suffix)
|
|
|
+ self._log("WARNING: to_instance is already a replica for this suffix")
|
|
|
+ return
|
|
|
except ldap.NO_SUCH_OBJECT:
|
|
|
pass
|
|
|
|
|
|
- # Create changelog
|
|
|
- if (role == ReplicaRole.MASTER) or (role == ReplicaRole.HUB):
|
|
|
- self._instance.changelog.create()
|
|
|
+ # Make sure we replicate this suffix too ...
|
|
|
+ fr_replicas = Replicas(from_instance)
|
|
|
+ fr_r = fr_replicas.get(self._suffix)
|
|
|
|
|
|
- # Create the default replica manager entry if it does not exist
|
|
|
- if REPL_BINDDN not in properties:
|
|
|
- properties[REPL_BINDDN] = defaultProperties[REPLICATION_BIND_DN]
|
|
|
- if REPLICATION_BIND_PW not in properties:
|
|
|
- repl_pw = defaultProperties[REPLICATION_BIND_PW]
|
|
|
- else:
|
|
|
- repl_pw = properties[REPLICATION_BIND_PW]
|
|
|
- # Remove this property so we don't add it to the replica entry
|
|
|
- del properties[REPLICATION_BIND_PW]
|
|
|
+ # Ensure we have a cl
|
|
|
+ self._ensure_changelog(to_instance)
|
|
|
|
|
|
- ReplTools.createReplManager(self._instance,
|
|
|
- repl_manager_dn=properties[REPL_BINDDN],
|
|
|
- repl_manager_pw=repl_pw)
|
|
|
+ # Create our credentials
|
|
|
+ repl_dn = self._create_service_account(from_instance, to_instance)
|
|
|
|
|
|
- # Now create the replica entry
|
|
|
- mtents = self._instance.mappingtree.list(suffix=suffix)
|
|
|
- self._basedn = mtents[0].dn
|
|
|
- replica = self.create(RDN_REPLICA, properties)
|
|
|
- replica._suffix = suffix
|
|
|
+ # Find the ruv on from_instance
|
|
|
+ ruv = fr_r.get_ruv()
|
|
|
|
|
|
- return replica
|
|
|
+ # Get a free rid
|
|
|
+ rid = ruv.alloc_rid()
|
|
|
+ assert rid not in self._alloc_rids
|
|
|
+ self._alloc_rids.append(rid)
|
|
|
|
|
|
- def disable(self, suffix):
|
|
|
- """Disable replication on the suffix specified
|
|
|
+ self._log.debug("Allocating rid %s" % rid)
|
|
|
+ # Create replica on to_instance, with bootstrap details.
|
|
|
+ to_r = to_replicas.create(properties={
|
|
|
+ 'cn': 'replica',
|
|
|
+ 'nsDS5ReplicaRoot': self._suffix,
|
|
|
+ 'nsDS5ReplicaId': rid,
|
|
|
+ 'nsDS5Flags': '1',
|
|
|
+ 'nsDS5ReplicaType': '3',
|
|
|
+ 'nsds5replicabinddngroupcheckinterval': '0'
|
|
|
+ })
|
|
|
|
|
|
- :param suffix: Replicated suffix to disable
|
|
|
- :type suffix: str
|
|
|
+ # WARNING: You need to create passwords and agmts BEFORE you tot_init!
|
|
|
|
|
|
- :returns: None
|
|
|
- :raises: ValueError is suffix is not being replicated
|
|
|
+ # Now put in an agreement from to -> from
|
|
|
+ # both ends.
|
|
|
+ self.ensure_agreement(from_instance, to_instance)
|
|
|
+ self.ensure_agreement(to_instance, from_instance, init=True)
|
|
|
+
|
|
|
+ # perform the _bootstrap. This creates a temporare repl manager
|
|
|
+ # to allow the tot_init to occur.
|
|
|
+ self._bootstrap_replica(fr_r, to_r, to_instance)
|
|
|
+
|
|
|
+ # Now fix our replica credentials from -> to
|
|
|
+ to_r.set('nsDS5ReplicaBindDNGroup', repl_dn)
|
|
|
+
|
|
|
+ # Now finally test it ...
|
|
|
+ self.test_replication(from_instance, to_instance)
|
|
|
+ self.test_replication(to_instance, from_instance)
|
|
|
+ # Done!
|
|
|
+ self._log.info("SUCCESS: joined master from %s to %s" % (from_instance.ldapuri, to_instance.ldapuri))
|
|
|
+
|
|
|
+ def join_hub(self, from_instance, to_instance):
|
|
|
+ """Join a new hub to this instance. This will complete
|
|
|
+ a total init of the data "from instance" to "to instance".
|
|
|
+
|
|
|
+ This can be conducted from any master or hub in the topology as "from" master.
|
|
|
+
|
|
|
+ Not implement yet.
|
|
|
+
|
|
|
+ :param from_instance: An instance already in the topology.
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: An instance to join to the topology.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
"""
|
|
|
+ # Ensure we have a cl
|
|
|
+ self._ensure_changelog(to_instance)
|
|
|
+ raise Exception
|
|
|
+
|
|
|
+ def join_consumer(self, from_instance, to_instance):
|
|
|
+ """Join a new consumer to this instance. This will complete
|
|
|
+ a total init of the data "from instance" to "to instance".
|
|
|
+
|
|
|
+ This can be conducted from any master or hub in the topology as "from" master.
|
|
|
|
|
|
+
|
|
|
+ :param from_instance: An instance already in the topology.
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: An instance to join to the topology.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ """
|
|
|
+ to_replicas = Replicas(to_instance)
|
|
|
try:
|
|
|
- replica = self.get(suffix)
|
|
|
+ to_r = to_replicas.get(self._suffix)
|
|
|
+ self._log("WARNING: to_instance is already a replica for this suffix")
|
|
|
+ return
|
|
|
except ldap.NO_SUCH_OBJECT:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % suffix)
|
|
|
+ pass
|
|
|
|
|
|
- role = replica.get_role()
|
|
|
- if role in (ReplicaRole.MASTER, ReplicaRole.HUB):
|
|
|
- self._instance.changelog.delete()
|
|
|
+ # Make sure we replicate this suffix too ...
|
|
|
+ fr_replicas = Replicas(from_instance)
|
|
|
+ fr_r = fr_replicas.get(self._suffix)
|
|
|
+
|
|
|
+ # Create replica on to_instance, with bootstrap details.
|
|
|
+ to_r = to_replicas.create(properties={
|
|
|
+ 'cn': 'replica',
|
|
|
+ 'nsDS5ReplicaRoot': self._suffix,
|
|
|
+ 'nsDS5ReplicaId': '65535',
|
|
|
+ 'nsDS5Flags': '0',
|
|
|
+ 'nsDS5ReplicaType': '2',
|
|
|
+ 'nsds5replicabinddngroupcheckinterval': '0'
|
|
|
+ })
|
|
|
+
|
|
|
+ # WARNING: You need to create passwords and agmts BEFORE you tot_init!
|
|
|
+ repl_group = self._create_service_group(from_instance)
|
|
|
+
|
|
|
+ # Now put in an agreement from to -> from
|
|
|
+ # both ends.
|
|
|
+ self.ensure_agreement(from_instance, to_instance)
|
|
|
+
|
|
|
+ # perform the _bootstrap. This creates a temporare repl manager
|
|
|
+ # to allow the tot_init to occur.
|
|
|
+ self._bootstrap_replica(fr_r, to_r, to_instance)
|
|
|
+
|
|
|
+ # Now fix our replica credentials from -> to
|
|
|
+ to_r.set('nsDS5ReplicaBindDNGroup', repl_group.dn)
|
|
|
+
|
|
|
+ # Now finally test it ...
|
|
|
+ self.test_replication(from_instance, to_instance)
|
|
|
+ # Done!
|
|
|
+ self._log.info("SUCCESS: joined consumer from %s to %s" % (from_instance.ldapuri, to_instance.ldapuri))
|
|
|
+
|
|
|
+ def _get_replica_creds(self, from_instance, write_instance):
|
|
|
+ """For the master "from_instance" create or derive the credentials
|
|
|
+ needed for it's replication service account. In some cases the
|
|
|
+ credentials are created, write them to "write instance" as a new
|
|
|
+ service account userPassword.
|
|
|
+
|
|
|
+ This function signature exists for bootstrapping: We need to
|
|
|
+ link master A and B, but they have not yet replicated. So we generate
|
|
|
+ credentials for B, and write them to A's instance, where they will
|
|
|
+ then be replicated back to B. If this wasn't the case, we would generate
|
|
|
+ the credentials on B, write them to B, but B has no way to authenticate
|
|
|
+ to A because the service account doesn't have credentials there yet.
|
|
|
+
|
|
|
+ Internal Only.
|
|
|
+ """
|
|
|
+ # We write all our changes to "write_instance", but we read data
|
|
|
+ # from the "from" instance.
|
|
|
+
|
|
|
+ dn = None
|
|
|
+ creds = None
|
|
|
+
|
|
|
+ fr_replicas = Replicas(from_instance)
|
|
|
+ fr_r = fr_replicas.get(self._suffix)
|
|
|
+ from_agmts = fr_r.get_agreements()
|
|
|
+ # see if any exist already ....
|
|
|
+ agmts = from_agmts.list()
|
|
|
+ if len(agmts) > 0:
|
|
|
+ # okay, re-use the creds
|
|
|
+ agmt = agmts[0]
|
|
|
+ dn = agmt.get_attr_val_utf8('nsDS5ReplicaBindDN')
|
|
|
+ creds = agmt.get_attr_val_utf8('nsDS5ReplicaCredentials')
|
|
|
+ else:
|
|
|
+ # Create them ...
|
|
|
+ # Get the service account.
|
|
|
+ services = ServiceAccounts(write_instance, self._suffix)
|
|
|
+ sa = services.get('%s:%s' % (from_instance.host, from_instance.sslport))
|
|
|
+ creds = password_generate()
|
|
|
+ # Gen a password
|
|
|
+ sa.set('userPassword', creds)
|
|
|
+ dn = sa.dn
|
|
|
+
|
|
|
+ return (dn, creds)
|
|
|
+
|
|
|
+ def ensure_agreement(self, from_instance, to_instance, init=False):
|
|
|
+ """Guarantee that a replication agreement exists 'from_instance' send
|
|
|
+ data 'to_instance'. This can be for *any* instance, master, hub, or
|
|
|
+ consumer.
|
|
|
+
|
|
|
+ Both instances must have been added to the topology with
|
|
|
+ create first master, join_master or join_consumer.
|
|
|
+
|
|
|
+ :param from_instance: An instance already in the topology.
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: An instance to replicate to.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ """
|
|
|
+ # Make sure that an agreement from -> to exists.
|
|
|
+ # At the moment we assert this by checking host and port
|
|
|
+ # details.
|
|
|
|
|
|
- try:
|
|
|
- replica.delete()
|
|
|
- except ldap.LDAPError as e:
|
|
|
- raise ValueError('Failed to disable replication for suffix ' +
|
|
|
- '(%s) LDAP error (%s)' % (suffix, str(e)))
|
|
|
+ # init = True means to create credentials on the "to" master, because
|
|
|
+ # we are initialising in reverse.
|
|
|
|
|
|
- def promote(self, suffix, newrole, binddn=None, rid=None):
|
|
|
- """Promote the replica to hub or master
|
|
|
+ # init = False (default) means creds *might* exist, and we create them
|
|
|
+ # on the "from" master.
|
|
|
|
|
|
- :param newrole: The new replication role for the replica: MASTER and HUB
|
|
|
- :type newrole: ReplicaRole
|
|
|
- :param binddn: The replication bind dn - only applied to master
|
|
|
- :type binddn: str
|
|
|
- :param rid: The replication ID, applies only to promotions to "master"
|
|
|
- :type rid: int
|
|
|
+ fr_replicas = Replicas(from_instance)
|
|
|
+ fr_r = fr_replicas.get(self._suffix)
|
|
|
|
|
|
- :returns: None
|
|
|
- :raises: ValueError - If replica is not promoted
|
|
|
- """
|
|
|
+ from_agmts = fr_r.get_agreements()
|
|
|
+
|
|
|
+ agmt_name = self._inst_to_agreement_name(to_instance)
|
|
|
|
|
|
- replica = self.get(suffix)
|
|
|
try:
|
|
|
- replica = self.get(suffix)
|
|
|
+ agmt = from_agmts.get(agmt_name)
|
|
|
+ self._log.info("SUCCESS: Agreement from %s to %s already exists" % (from_instance.ldapuri, to_instance.ldapuri))
|
|
|
+ return
|
|
|
except ldap.NO_SUCH_OBJECT:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % suffix)
|
|
|
- replica.promote(newrole, binddn, rid)
|
|
|
+ # Okay, it doesn't exist, lets go ahead!
|
|
|
+ pass
|
|
|
|
|
|
- def demote(self, suffix, newrole):
|
|
|
- """Demote a replica to a hub or consumer
|
|
|
+ if init is True:
|
|
|
+ (dn, creds) = self._get_replica_creds(from_instance, to_instance)
|
|
|
+ else:
|
|
|
+ (dn, creds) = self._get_replica_creds(from_instance, from_instance)
|
|
|
+
|
|
|
+ assert dn is not None
|
|
|
+ assert creds is not None
|
|
|
+
|
|
|
+ agmt = from_agmts.create(properties={
|
|
|
+ 'cn': agmt_name,
|
|
|
+ 'nsDS5ReplicaRoot': self._suffix,
|
|
|
+ 'nsDS5ReplicaBindDN': dn,
|
|
|
+ 'nsDS5ReplicaBindMethod': 'simple' ,
|
|
|
+ 'nsDS5ReplicaTransportInfo': 'LDAP',
|
|
|
+ 'nsds5replicaTimeout': '5',
|
|
|
+ 'description': agmt_name,
|
|
|
+ 'nsDS5ReplicaHost': to_instance.host,
|
|
|
+ 'nsDS5ReplicaPort': str(to_instance.port),
|
|
|
+ 'nsDS5ReplicaCredentials': creds,
|
|
|
+ })
|
|
|
+ # Done!
|
|
|
+ self._log.info("SUCCESS: Agreement from %s to %s is was created" % (from_instance.ldapuri, to_instance.ldapuri))
|
|
|
+ return agmt
|
|
|
+
|
|
|
+ def remove_master(self, instance, remaining_instances=[], purge_sa=True):
|
|
|
+ """Remove an instance from the replication topology.
|
|
|
+
|
|
|
+ If purge service accounts is true, remove the instances service account.
|
|
|
+
|
|
|
+ The purge_sa *must* be conducted on a remaining master to guarantee
|
|
|
+ the result.
|
|
|
+
|
|
|
+ We recommend remaining instances contains *all* masters that have an
|
|
|
+ agreement to instance, to ensure no dangling agreements exist. Masters
|
|
|
+ with no agreement are skipped.
|
|
|
+
|
|
|
+ :param instance: An instance to remove from the topology.
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param remaining_instances: The remaining masters of the topology.
|
|
|
+ :type remaining_instances: list[lib389.DirSrv]
|
|
|
+ :param purge_sa: Purge the service account for instance
|
|
|
+ :type purge_sa: bool
|
|
|
+ """
|
|
|
+ if purge_sa and len(remaining_instances) > 0:
|
|
|
+ services = ServiceAccounts(remaining_instances[0], self._suffix)
|
|
|
+ try:
|
|
|
+ sa = services.get('%s:%s' % (instance.host, instance.sslport))
|
|
|
+ sa.delete()
|
|
|
+ except ldap.NO_SUCH_OBJECT:
|
|
|
+ # It's already gone ...
|
|
|
+ pass
|
|
|
|
|
|
- :param newrole: The new replication role for the replica: CONSUMER and HUB
|
|
|
- :type newrole: ReplicaRole
|
|
|
+ agmt_name = self._inst_to_agreement_name(instance)
|
|
|
+ for r_inst in remaining_instances:
|
|
|
+ agmts = Agreements(r_inst)
|
|
|
+ try:
|
|
|
+ agmt = agmts.get(agmt_name)
|
|
|
+ agmt.delete()
|
|
|
+ except ldap.NO_SUCH_OBJECT:
|
|
|
+ # No agreement, that's good!
|
|
|
+ pass
|
|
|
|
|
|
- :returns: None
|
|
|
- :raises: ValueError - If replica is not demoted
|
|
|
+ fr_replicas = Replicas(instance)
|
|
|
+ fr_r = fr_replicas.get(self._suffix)
|
|
|
+ # This should delete the agreements ....
|
|
|
+ fr_r.delete()
|
|
|
+
|
|
|
+ def disable_to_master(self, to_instance, from_instances=[]):
|
|
|
+ """For all masters "from" disable all agreements "to" instance.
|
|
|
+
|
|
|
+ :param to_instance: The instance to stop recieving data.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ :param from_instances: The instances to stop sending data.
|
|
|
+ :type from_instances: list[lib389.DirSrv]
|
|
|
+ """
|
|
|
+ agmt_name = self._inst_to_agreement_name(to_instance)
|
|
|
+ for r_inst in from_instances:
|
|
|
+ agmts = Agreements(r_inst)
|
|
|
+ agmt = agmts.get(agmt_name)
|
|
|
+ agmt.pause()
|
|
|
+
|
|
|
+ def enable_to_master(self, to_instance, from_instances=[]):
|
|
|
+ """For all masters "from" enable all agreements "to" instance.
|
|
|
+
|
|
|
+ :param to_instance: The instance to start recieving data.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ :param from_instances: The instances to start sending data.
|
|
|
+ :type from_instances: list[lib389.DirSrv]
|
|
|
"""
|
|
|
+ agmt_name = self._inst_to_agreement_name(to_instance)
|
|
|
+ for r_inst in from_instances:
|
|
|
+ agmts = Agreements(r_inst)
|
|
|
+ agmt = agmts.get(agmt_name)
|
|
|
+ agmt.resume()
|
|
|
+
|
|
|
+ def wait_for_ruv(self, from_instance, to_instance, timeout=20):
|
|
|
+ """Wait for the in-memory ruv 'from_instance' to be advanced past on
|
|
|
+ 'to_instance'. Note this does not mean the ruvs are "exact matches"
|
|
|
+ only that some set of CSN states has been advanced past. Topics like
|
|
|
+ fractional replication may or may not interfer in this process.
|
|
|
+
|
|
|
+ In essence this is a rough check that to_instance is at least
|
|
|
+ at the replication state of from_instance. You should consider using
|
|
|
+ wait_for_replication instead for a guarantee.
|
|
|
+
|
|
|
+ :param from_instance: The instance whos state we we want to check from
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: The instance whos state we want to check matches from.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
|
|
|
- replica = self.get(suffix)
|
|
|
- try:
|
|
|
- replica = self.get(suffix)
|
|
|
- except ldap.NO_SUCH_OBJECT:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % suffix)
|
|
|
- replica.demote(newrole)
|
|
|
+ """
|
|
|
+ fr_replicas = Replicas(from_instance)
|
|
|
+ fr_r = fr_replicas.get(self._suffix)
|
|
|
+
|
|
|
+ to_replicas = Replicas(to_instance)
|
|
|
+ to_r = to_replicas.get(self._suffix)
|
|
|
+
|
|
|
+ from_ruv = fr_r.get_ruv()
|
|
|
+
|
|
|
+ for i in range(0, timeout):
|
|
|
+ to_ruv = to_r.get_ruv()
|
|
|
+ if to_ruv.is_synced(from_ruv):
|
|
|
+ self._log.info("SUCCESS: RUV from %s to %s is in sync" % (from_instance.ldapuri, to_instance.ldapuri))
|
|
|
+ return True
|
|
|
+ time.sleep(1)
|
|
|
+ raise Exception("RUV did not sync in time!")
|
|
|
|
|
|
- def get_dn(self, suffix):
|
|
|
- """Return the DN of the replica from cn=config, this is also
|
|
|
- known as the mapping tree entry
|
|
|
+ def wait_for_replication(self, from_instance, to_instance, timeout=20):
|
|
|
+ """Wait for a replication event to occur from instance to instance. This
|
|
|
+ shows some point of synchronisation has occured.
|
|
|
|
|
|
- :param suffix: The replication suffix to get the mapping tree DN
|
|
|
- :type suffix: str
|
|
|
+ :param from_instance: The instance whos state we we want to check from
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: The instance whos state we want to check matches from.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ :param timeout: Fail after timeout seconds.
|
|
|
+ :type timeout: int
|
|
|
|
|
|
- :returns: The DN of the replication entry from cn=config
|
|
|
"""
|
|
|
+ # Touch something then wait_for_replication.
|
|
|
+ from_groups = Groups(from_instance, basedn=self._suffix, rdn=None)
|
|
|
+ to_groups = Groups(to_instance, basedn=self._suffix, rdn=None)
|
|
|
+ from_group = from_groups.get('replication_managers')
|
|
|
+ to_group = to_groups.get('replication_managers')
|
|
|
|
|
|
- try:
|
|
|
- replica = self.get(suffix)
|
|
|
- except ldap.NO_SUCH_OBJECT:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % suffix)
|
|
|
- return replica._dn
|
|
|
+ change = str(uuid.uuid4())
|
|
|
+
|
|
|
+ from_group.replace('description', change)
|
|
|
+
|
|
|
+ for i in range(0, timeout):
|
|
|
+ desc = to_group.get_attr_val_utf8('description')
|
|
|
+ if change == desc:
|
|
|
+ self._log.info("SUCCESS: Replication from %s to %s is working" % (from_instance.ldapuri, to_instance.ldapuri))
|
|
|
+ return True
|
|
|
+ time.sleep(1)
|
|
|
+ raise Exception("Replication did not sync in time!")
|
|
|
|
|
|
- def get_ruv_entry(self, suffix):
|
|
|
- """Return the database RUV entry for the provided suffix
|
|
|
+ self.wait_for_replication(from_instance, to_instance)
|
|
|
+
|
|
|
+
|
|
|
+ def test_replication(self, from_instance, to_instance, timeout=20):
|
|
|
+ """Wait for a replication event to occur from instance to instance. This
|
|
|
+ shows some point of synchronisation has occured.
|
|
|
+
|
|
|
+ :param from_instance: The instance whos state we we want to check from
|
|
|
+ :type from_instance: lib389.DirSrv
|
|
|
+ :param to_instance: The instance whos state we want to check matches from.
|
|
|
+ :type to_instance: lib389.DirSrv
|
|
|
+ :param timeout: Fail after timeout seconds.
|
|
|
+ :type timeout: int
|
|
|
|
|
|
- :returns: The database RUV entry
|
|
|
- :raises: - ValeuError - If suffix is not setup for replication
|
|
|
- - LDAPError - If there is a problem trying to search for the RUV
|
|
|
"""
|
|
|
+ # It's the same ....
|
|
|
+ self.wait_for_replication(from_instance, to_instance, timeout)
|
|
|
|
|
|
- try:
|
|
|
- replica = self.get(suffix)
|
|
|
- except ldap.NO_SUCH_OBJECT:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % suffix)
|
|
|
- return replica.get_ruv_entry()
|
|
|
+ def test_replication_topology(self, instances, timeout=20):
|
|
|
+ """Confirm replication works between all permutations of masters
|
|
|
+ in the topology.
|
|
|
|
|
|
- def test(self, suffix, *replica_dirsrvs):
|
|
|
- """Make a "dummy" update on the the replicated suffix, and check
|
|
|
- all the provided replicas to see if they received the update.
|
|
|
+ :param instances: The masters.
|
|
|
+ :type instances: list[lib389.DirSrv]
|
|
|
+ :param timeout: Fail after timeout seconds.
|
|
|
+ :type timeout: int
|
|
|
|
|
|
- :param suffix: The replicated suffix we want to check
|
|
|
- :type suffix: str
|
|
|
- :param *replica_dirsrvs: DirSrv instance, DirSrv instance, ...
|
|
|
- :type *replica_dirsrvs: list of DirSrv
|
|
|
+ """
|
|
|
+ for p in permutations(instances, 2):
|
|
|
+ a, b = p
|
|
|
+ self.test_replication(a, b, timeout)
|
|
|
|
|
|
- :returns: True - if all servers have received the update by this
|
|
|
- replica, otherwise return False
|
|
|
- :raises: LDAPError - when failing to update/search database
|
|
|
+ def get_rid(self, instance):
|
|
|
+ """For a given master, retrieve it's RID for this suffix.
|
|
|
+
|
|
|
+ :param instance: The instance
|
|
|
+ :type instance: lib389.DirSrv
|
|
|
+ :returns: str
|
|
|
"""
|
|
|
+ replicas = Replicas(instance)
|
|
|
+ replica = replicas.get(self._suffix)
|
|
|
+ return replica.get_rid()
|
|
|
+
|
|
|
|
|
|
- try:
|
|
|
- replica = self.get(suffix)
|
|
|
- except ldap.NO_SUCH_OBJECT:
|
|
|
- raise ValueError('Suffix (%s) is not setup for replication' % suffix)
|
|
|
- return replica.test(*replica_dirsrvs)
|