| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- import os
- import sys
- import time
- import ldap
- import logging
- import pytest
- from lib389 import DirSrv, Entry, tools, tasks
- from lib389.tools import DirSrvTools
- from lib389._constants import *
- from lib389.properties import *
- from lib389.tasks import *
- from lib389.utils import *
- from collections import Counter
- logging.getLogger(__name__).setLevel(logging.DEBUG)
- log = logging.getLogger(__name__)
- installation1_prefix = None
- WAITFOR_ASYNC_ATTR = "nsDS5ReplicaWaitForAsyncResults"
- class TopologyReplication(object):
- def __init__(self, master1, master2, m1_m2_agmt, m2_m1_agmt):
- master1.open()
- master2.open()
- self.masters = ((master1, m1_m2_agmt),
- (master2, m2_m1_agmt))
- @pytest.fixture(scope="module")
- def topology(request):
- global installation1_prefix
- if installation1_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation1_prefix
- # Creating master 1...
- master1 = DirSrv(verbose=False)
- args_instance[SER_HOST] = HOST_MASTER_1
- args_instance[SER_PORT] = PORT_MASTER_1
- args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
- args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
- args_master = args_instance.copy()
- master1.allocate(args_master)
- instance_master1 = master1.exists()
- if instance_master1:
- master1.delete()
- master1.create()
- master1.open()
- master1.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER, replicaId=REPLICAID_MASTER_1)
- # Creating master 2...
- master2 = DirSrv(verbose=False)
- args_instance[SER_HOST] = HOST_MASTER_2
- args_instance[SER_PORT] = PORT_MASTER_2
- args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
- args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
- args_master = args_instance.copy()
- master2.allocate(args_master)
- instance_master2 = master2.exists()
- if instance_master2:
- master2.delete()
- master2.create()
- master2.open()
- master2.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER, replicaId=REPLICAID_MASTER_2)
- #
- # Create all the agreements
- #
- # Creating agreement from master 1 to master 2
- properties = {RA_NAME: r'meTo_$host:$port',
- RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
- RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
- RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
- RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
- m1_m2_agmt = master1.agreement.create(suffix=SUFFIX, host=master2.host, port=master2.port, properties=properties)
- if not m1_m2_agmt:
- log.fatal("Fail to create a master -> master replica agreement")
- sys.exit(1)
- log.debug("%s created" % m1_m2_agmt)
- # Creating agreement from master 2 to master 1
- properties = {RA_NAME: r'meTo_$host:$port',
- RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
- RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
- RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
- RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
- m2_m1_agmt = master2.agreement.create(suffix=SUFFIX, host=master1.host, port=master1.port, properties=properties)
- if not m2_m1_agmt:
- log.fatal("Fail to create a master -> master replica agreement")
- sys.exit(1)
- log.debug("%s created" % m2_m1_agmt)
- # Allow the replicas to get situated with the new agreements...
- time.sleep(5)
- #
- # Initialize all the agreements
- #
- master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2)
- master1.waitForReplInit(m1_m2_agmt)
- master2.agreement.init(SUFFIX, HOST_MASTER_1, PORT_MASTER_1)
- master2.waitForReplInit(m2_m1_agmt)
- # Check replication is working...
- if master1.testReplication(DEFAULT_SUFFIX, master2):
- log.info('Replication is working.')
- else:
- log.fatal('Replication is not working.')
- assert False
- log.info("Set Replication Debugging loglevel for the errorlog")
- master1.setLogLevel(lib389.LOG_REPLICA)
- master2.setLogLevel(lib389.LOG_REPLICA)
- # Delete each instance in the end
- def fin():
- master1.delete()
- master2.delete()
- request.addfinalizer(fin)
- # Clear out the tmp dir
- master1.clearTmpDir(__file__)
- return TopologyReplication(master1, master2, m1_m2_agmt, m2_m1_agmt)
- @pytest.fixture(params=[(None, (4, 10)),
- ('2000', (0, 1)),
- ('0', (4, 10)),
- ('-5', (4, 10))])
- def waitfor_async_attr(topology, request):
- """Sets attribute on all replicas"""
- attr_value = request.param[0]
- expected_result = request.param[1]
- # Run through all masters
- for master in topology.masters:
- agmt = master[1]
- try:
- if attr_value:
- log.info("Set %s: %s on %s" % (
- WAITFOR_ASYNC_ATTR, attr_value, master[0].serverid))
- mod = [(ldap.MOD_REPLACE, WAITFOR_ASYNC_ATTR, attr_value)]
- else:
- log.info("Delete %s from %s" % (
- WAITFOR_ASYNC_ATTR, master[0].serverid))
- mod = [(ldap.MOD_DELETE, WAITFOR_ASYNC_ATTR, None)]
- master[0].modify_s(agmt, mod)
- except ldap.LDAPError as e:
- log.error('Failed to set or delete %s attribute: (%s)' % (
- WAITFOR_ASYNC_ATTR, e.message['desc']))
- return (attr_value, expected_result)
- @pytest.fixture
- def entries(topology, request):
- """Adds entries to the master1"""
- master1 = topology.masters[0][0]
- TEST_OU = "test"
- test_dn = SUFFIX
- test_list = []
- log.info("Add 100 nested entries under replicated suffix on %s" % master1.serverid)
- for i in xrange(100):
- test_dn = 'ou=%s%s,%s' % (TEST_OU, i, test_dn)
- test_list.insert(0, test_dn)
- try:
- master1.add_s(Entry((test_dn,
- {'objectclass': 'top',
- 'objectclass': 'organizationalUnit',
- 'ou': TEST_OU})))
- except ldap.LDAPError as e:
- log.error('Failed to add entry (%s): error (%s)' % (test_dn,
- e.message['desc']))
- assert False
- log.info("Delete created entries")
- for test_dn in test_list:
- try:
- master1.delete_s(test_dn)
- except ldap.LDAPError, e:
- log.error('Failed to delete entry (%s): error (%s)' % (test_dn,
- e.message['desc']))
- assert False
- def fin():
- log.info("Clear the errors log in the end of the test case")
- with open(master1.errlog, 'w') as errlog:
- errlog.writelines("")
- request.addfinalizer(fin)
- def test_not_int_value(topology):
- """Tests not integer value"""
- master1 = topology.masters[0][0]
- agmt = topology.masters[0][1]
- log.info("Try to set %s: wv1" % WAITFOR_ASYNC_ATTR)
- try:
- mod = [(ldap.MOD_REPLACE, WAITFOR_ASYNC_ATTR, "wv1")]
- master1.modify_s(agmt, mod)
- except ldap.LDAPError as e:
- assert e.message['desc'] == 'Invalid syntax'
- def test_multi_value(topology):
- """Tests multi value"""
- master1 = topology.masters[0][0]
- agmt = topology.masters[0][1]
- log.info("agmt: %s" % agmt)
- log.info("Try to set %s: 100 and 101 in the same time (multi value test)" % (
- WAITFOR_ASYNC_ATTR))
- try:
- mod = [(ldap.MOD_ADD, WAITFOR_ASYNC_ATTR, "100")]
- master1.modify_s(agmt, mod)
- mod = [(ldap.MOD_ADD, WAITFOR_ASYNC_ATTR, "101")]
- master1.modify_s(agmt, mod)
- except ldap.LDAPError as e:
- assert e.message['desc'] == 'Object class violation'
- def test_value_check(topology, waitfor_async_attr):
- """Checks that value has been set correctly"""
- attr_value = waitfor_async_attr[0]
- for master in topology.masters:
- agmt = master[1]
- log.info("Check attr %s on %s" % (WAITFOR_ASYNC_ATTR, master[0].serverid))
- try:
- if attr_value:
- entry = master[0].search_s(agmt, ldap.SCOPE_BASE, "%s=%s" % (
- WAITFOR_ASYNC_ATTR, attr_value))
- assert entry
- else:
- entry = master[0].search_s(agmt, ldap.SCOPE_BASE, "%s=*" % WAITFOR_ASYNC_ATTR)
- assert not entry
- except ldap.LDAPError as e:
- log.fatal('Search failed, error: ' + e.message['desc'])
- assert False
- def test_behavior_with_value(topology, waitfor_async_attr, entries):
- """Tests replication behavior with valid
- nsDS5ReplicaWaitForAsyncResults attribute values
- """
- master1 = topology.masters[0][0]
- sync_dict = Counter()
- min_ap = waitfor_async_attr[1][0]
- max_ap = waitfor_async_attr[1][1]
- log.info("Gather all sync attempts within Counter dict, group by timestamp")
- with open(master1.errlog, 'r') as errlog:
- errlog_filtered = filter(lambda x: "waitfor_async_results" in x, errlog)
- for line in errlog_filtered:
- # Watch only over unsuccessful sync attempts
- if line.split()[4] != line.split()[5]:
- timestamp = line.split(']')[0]
- sync_dict[timestamp] += 1
- log.info("Take the most common timestamp and assert it has appeared " \
- "in the range from %s to %s times" % (min_ap, max_ap))
- most_common_val = sync_dict.most_common(1)[0][1]
- assert min_ap <= most_common_val <= max_ap
- if __name__ == '__main__':
- # Run isolated
- # -s for DEBUG mode
- CURRENT_FILE = os.path.realpath(__file__)
- pytest.main("-s %s" % CURRENT_FILE)
|