Browse Source

Ticket 47957 - Add replication test suite for a wait async feature

Description: Test new attribute "nsDS5ReplicaWaitForAsyncResults".
             After setting the attribute, supplier will sleep established
             amount of millisecond if it finds the response from consumer
             is not ready.

Tests:
- not integer value;
- multi value;
- check that value has been set correctly [None, 2000, 0, -5];
- replication behavior with valid values [None, 2000, 0, -5].

https://fedorahosted.org/389/ticket/47957

Reviewed by: [email protected] (Thanks, Noriko!)
Simon Pichugin 10 years ago
parent
commit
2c702c4f44
1 changed files with 280 additions and 0 deletions
  1. 280 0
      dirsrvtests/suites/replication/wait_for_async_feature_test.py

+ 280 - 0
dirsrvtests/suites/replication/wait_for_async_feature_test.py

@@ -0,0 +1,280 @@
+import os
+import sys
+import time
+import ldap
+import logging
+import pytest
+from lib389 import DirSrv, Entry, tools, tasks
+from lib389.tools import DirSrvTools
+from lib389._constants import *
+from lib389.properties import *
+from lib389.tasks import *
+from lib389.utils import *
+from collections import Counter
+
+logging.getLogger(__name__).setLevel(logging.DEBUG)
+log = logging.getLogger(__name__)
+
+installation1_prefix = None
+
+WAITFOR_ASYNC_ATTR = "nsDS5ReplicaWaitForAsyncResults"
+
+class TopologyReplication(object):
+    def __init__(self, master1, master2, m1_m2_agmt, m2_m1_agmt):
+        master1.open()
+        master2.open()
+        self.masters = ((master1, m1_m2_agmt),
+                        (master2, m2_m1_agmt))
+
+
[email protected](scope="module")
+def topology(request):
+    global installation1_prefix
+    if installation1_prefix:
+        args_instance[SER_DEPLOYED_DIR] = installation1_prefix
+
+    # Creating master 1...
+    master1 = DirSrv(verbose=False)
+    args_instance[SER_HOST] = HOST_MASTER_1
+    args_instance[SER_PORT] = PORT_MASTER_1
+    args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
+    args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
+    args_master = args_instance.copy()
+    master1.allocate(args_master)
+    instance_master1 = master1.exists()
+    if instance_master1:
+        master1.delete()
+    master1.create()
+    master1.open()
+    master1.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER, replicaId=REPLICAID_MASTER_1)
+
+    # Creating master 2...
+    master2 = DirSrv(verbose=False)
+    args_instance[SER_HOST] = HOST_MASTER_2
+    args_instance[SER_PORT] = PORT_MASTER_2
+    args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
+    args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
+    args_master = args_instance.copy()
+    master2.allocate(args_master)
+    instance_master2 = master2.exists()
+    if instance_master2:
+        master2.delete()
+    master2.create()
+    master2.open()
+    master2.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER, replicaId=REPLICAID_MASTER_2)
+
+    #
+    # Create all the agreements
+    #
+    # Creating agreement from master 1 to master 2
+    properties = {RA_NAME:      r'meTo_$host:$port',
+                  RA_BINDDN:    defaultProperties[REPLICATION_BIND_DN],
+                  RA_BINDPW:    defaultProperties[REPLICATION_BIND_PW],
+                  RA_METHOD:    defaultProperties[REPLICATION_BIND_METHOD],
+                  RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
+    m1_m2_agmt = master1.agreement.create(suffix=SUFFIX, host=master2.host, port=master2.port, properties=properties)
+    if not m1_m2_agmt:
+        log.fatal("Fail to create a master -> master replica agreement")
+        sys.exit(1)
+    log.debug("%s created" % m1_m2_agmt)
+
+    # Creating agreement from master 2 to master 1
+    properties = {RA_NAME:      r'meTo_$host:$port',
+                  RA_BINDDN:    defaultProperties[REPLICATION_BIND_DN],
+                  RA_BINDPW:    defaultProperties[REPLICATION_BIND_PW],
+                  RA_METHOD:    defaultProperties[REPLICATION_BIND_METHOD],
+                  RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
+    m2_m1_agmt = master2.agreement.create(suffix=SUFFIX, host=master1.host, port=master1.port, properties=properties)
+    if not m2_m1_agmt:
+        log.fatal("Fail to create a master -> master replica agreement")
+        sys.exit(1)
+    log.debug("%s created" % m2_m1_agmt)
+
+    # Allow the replicas to get situated with the new agreements...
+    time.sleep(5)
+
+    #
+    # Initialize all the agreements
+    #
+    master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2)
+    master1.waitForReplInit(m1_m2_agmt)
+    master2.agreement.init(SUFFIX, HOST_MASTER_1, PORT_MASTER_1)
+    master2.waitForReplInit(m2_m1_agmt)
+
+    # Check replication is working...
+    if master1.testReplication(DEFAULT_SUFFIX, master2):
+        log.info('Replication is working.')
+    else:
+        log.fatal('Replication is not working.')
+        assert False
+
+    log.info("Set Replication Debugging loglevel for the errorlog")
+    master1.setLogLevel(lib389.LOG_REPLICA)
+    master2.setLogLevel(lib389.LOG_REPLICA)
+
+    # Delete each instance in the end
+    def fin():
+        master1.delete()
+        master2.delete()
+    request.addfinalizer(fin)
+
+    # Clear out the tmp dir
+    master1.clearTmpDir(__file__)
+
+    return TopologyReplication(master1, master2, m1_m2_agmt, m2_m1_agmt)
+
+
[email protected](params=[(None, (4, 10)),
+                        ('2000', (0, 1)),
+                        ('0', (4, 10)),
+                        ('-5', (4, 10))])
+def waitfor_async_attr(topology, request):
+    """Sets attribute on all replicas"""
+
+    attr_value = request.param[0]
+    expected_result = request.param[1]
+
+    # Run through all masters
+    for master in topology.masters:
+        agmt = master[1]
+        try:
+            if attr_value:
+                log.info("Set %s: %s on %s" % (
+                        WAITFOR_ASYNC_ATTR, attr_value, master[0].serverid))
+                mod = [(ldap.MOD_REPLACE, WAITFOR_ASYNC_ATTR, attr_value)]
+            else:
+                log.info("Delete %s from %s" % (
+                        WAITFOR_ASYNC_ATTR, master[0].serverid))
+                mod = [(ldap.MOD_DELETE, WAITFOR_ASYNC_ATTR, None)]
+            master[0].modify_s(agmt, mod)
+        except ldap.LDAPError as e:
+            log.error('Failed to set or delete %s attribute: (%s)' % (
+                   WAITFOR_ASYNC_ATTR, e.message['desc']))
+
+    return (attr_value, expected_result)
+
+
[email protected]
+def entries(topology, request):
+    """Adds entries to the master1"""
+
+    master1 = topology.masters[0][0]
+
+    TEST_OU = "test"
+    test_dn = SUFFIX
+    test_list = []
+
+    log.info("Add 100 nested entries under replicated suffix on %s" % master1.serverid)
+    for i in xrange(100):
+        test_dn = 'ou=%s%s,%s' % (TEST_OU, i, test_dn)
+        test_list.insert(0, test_dn)
+        try:
+            master1.add_s(Entry((test_dn,
+                                 {'objectclass': 'top',
+                                  'objectclass': 'organizationalUnit',
+                                  'ou': TEST_OU})))
+        except ldap.LDAPError as e:
+            log.error('Failed to add entry (%s): error (%s)' % (test_dn,
+                                                               e.message['desc']))
+            assert False
+
+    log.info("Delete created entries")
+    for test_dn in test_list:
+        try:
+            master1.delete_s(test_dn)
+        except ldap.LDAPError, e:
+            log.error('Failed to delete entry (%s): error (%s)' % (test_dn,
+                                                                   e.message['desc']))
+            assert False
+
+    def fin():
+        log.info("Clear the errors log in the end of the test case")
+        with open(master1.errlog, 'w') as errlog:
+            errlog.writelines("")
+    request.addfinalizer(fin)
+
+
+def test_not_int_value(topology):
+    """Tests not integer value"""
+
+    master1 = topology.masters[0][0]
+    agmt = topology.masters[0][1]
+
+    log.info("Try to set %s: wv1" % WAITFOR_ASYNC_ATTR)
+    try:
+        mod = [(ldap.MOD_REPLACE, WAITFOR_ASYNC_ATTR, "wv1")]
+        master1.modify_s(agmt, mod)
+    except ldap.LDAPError as e:
+        assert e.message['desc'] == 'Invalid syntax'
+
+
+def test_multi_value(topology):
+    """Tests multi value"""
+
+    master1 = topology.masters[0][0]
+    agmt = topology.masters[0][1]
+    log.info("agmt: %s" % agmt)
+
+    log.info("Try to set %s: 100 and 101 in the same time (multi value test)" % (
+            WAITFOR_ASYNC_ATTR))
+    try:
+        mod = [(ldap.MOD_ADD, WAITFOR_ASYNC_ATTR, "100")]
+        master1.modify_s(agmt, mod)
+        mod = [(ldap.MOD_ADD, WAITFOR_ASYNC_ATTR, "101")]
+        master1.modify_s(agmt, mod)
+    except ldap.LDAPError as e:
+        assert e.message['desc'] == 'Object class violation'
+
+
+def test_value_check(topology, waitfor_async_attr):
+    """Checks that value has been set correctly"""
+
+    attr_value = waitfor_async_attr[0]
+
+    for master in topology.masters:
+        agmt = master[1]
+
+        log.info("Check attr %s on %s" % (WAITFOR_ASYNC_ATTR, master[0].serverid))
+        try:
+            if attr_value:
+                entry = master[0].search_s(agmt, ldap.SCOPE_BASE, "%s=%s" % (
+                                          WAITFOR_ASYNC_ATTR, attr_value))
+                assert entry
+            else:
+                entry = master[0].search_s(agmt, ldap.SCOPE_BASE, "%s=*" % WAITFOR_ASYNC_ATTR)
+                assert not entry
+        except ldap.LDAPError as e:
+            log.fatal('Search failed, error: ' + e.message['desc'])
+            assert False
+
+
+def test_behavior_with_value(topology, waitfor_async_attr, entries):
+    """Tests replication behavior with valid
+    nsDS5ReplicaWaitForAsyncResults attribute values
+    """
+
+    master1 = topology.masters[0][0]
+    sync_dict = Counter()
+    min_ap = waitfor_async_attr[1][0]
+    max_ap = waitfor_async_attr[1][1]
+
+    log.info("Gather all sync attempts within Counter dict, group by timestamp")
+    with open(master1.errlog, 'r') as errlog:
+        errlog_filtered = filter(lambda x: "waitfor_async_results" in x, errlog)
+        for line in errlog_filtered:
+            # Watch only over unsuccessful sync attempts
+            if line.split()[4] != line.split()[5]:
+                timestamp = line.split(']')[0]
+                sync_dict[timestamp] += 1
+
+    log.info("Take the most common timestamp and assert it has appeared " \
+             "in the range from %s to %s times" % (min_ap, max_ap))
+    most_common_val = sync_dict.most_common(1)[0][1]
+    assert min_ap <= most_common_val <= max_ap
+
+
+if __name__ == '__main__':
+    # Run isolated
+    # -s for DEBUG mode
+    CURRENT_FILE = os.path.realpath(__file__)
+    pytest.main("-s %s" % CURRENT_FILE)