瀏覽代碼

Ticket #47657 add schema test suite and tests for Ticket #47634

https://fedorahosted.org/389/ticket/47657
Reviewed by: tbordaz (Thanks!)
Branch: master
Fix Description: Add tests for schema ticket #47634.  This also adds a suites
subdirectory to dirsrvtests and a schema subdirectory of suites.
Platforms tested: RHEL6 x86_64
Flag Day: no
Doc impact: no
Rich Megginson 12 年之前
父節點
當前提交
3b049f6623
共有 3 個文件被更改,包括 344 次插入0 次删除
  1. 33 0
      dirsrvtests/suites/schema/constants.py
  2. 51 0
      dirsrvtests/suites/schema/finalizer.py
  3. 260 0
      dirsrvtests/suites/schema/test_schema.py

+ 33 - 0
dirsrvtests/suites/schema/constants.py

@@ -0,0 +1,33 @@
+'''
+Created on Dec 18, 2013
+
+@author: rmeggins
+'''
+import os
+from lib389 import DN_DM
+from lib389._constants import *
+from lib389.properties import *
+
+SUFFIX    = 'dc=example,dc=com'
+PASSWORD  = 'password'
+
+# Used for standalone topology
+HOST_STANDALONE = LOCALHOST
+PORT_STANDALONE = 33389
+SERVERID_STANDALONE = 'schematest'
+
+# Each defined instance above must be added in that list 
+ALL_INSTANCES = [ {SER_HOST: HOST_STANDALONE, SER_PORT: PORT_STANDALONE, SER_SERVERID_PROP: SERVERID_STANDALONE},
+                  ]
+# This is a template
+args_instance = {
+                   SER_DEPLOYED_DIR: os.environ.get('PREFIX', None),
+                   SER_BACKUP_INST_DIR: os.environ.get('BACKUPDIR', DEFAULT_BACKUPDIR),
+                   SER_ROOT_DN: DN_DM,
+                   SER_ROOT_PW: PASSWORD,
+                   SER_HOST: LOCALHOST,
+                   SER_PORT: DEFAULT_PORT,
+                   SER_SERVERID_PROP: "template",
+                   SER_CREATION_SUFFIX: DEFAULT_SUFFIX}
+
+

+ 51 - 0
dirsrvtests/suites/schema/finalizer.py

@@ -0,0 +1,51 @@
+'''
+Created on Nov 5, 2013
+
+@author: tbordaz
+'''
+import os
+import sys
+import time
+import ldap
+import logging
+import socket
+import time
+import logging
+import pytest
+from lib389 import DirSrv, Entry, tools
+from lib389.tools import DirSrvTools
+from lib389._constants import DN_DM
+from lib389.properties import *
+from constants import *
+
+log = logging.getLogger(__name__)
+
+global installation_prefix
+installation_prefix=os.getenv('PREFIX')
+
+def test_finalizer():
+    global installation_prefix
+    
+    # for each defined instance, remove it
+    for args_instance in ALL_INSTANCES:
+        if installation_prefix:
+            # overwrite the environment setting
+            args_instance[SER_DEPLOYED_DIR] = installation_prefix
+            
+        instance = DirSrv(verbose=True)
+        instance.allocate(args_instance)
+        if instance.exists():
+            instance.delete()
+        
+def run_isolated():
+    '''
+        run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
+        To run isolated without py.test, you need to 
+            - set the installation prefix
+            - run this program
+    '''        
+    test_finalizer()
+
+if __name__ == '__main__':
+    run_isolated()
+

+ 260 - 0
dirsrvtests/suites/schema/test_schema.py

@@ -0,0 +1,260 @@
+'''
+Created on Dec 18, 2013
+
+@author: rmeggins
+'''
+import os
+import sys
+import time
+import ldap
+from ldap.cidict import cidict
+from ldap.schema import SubSchema
+import logging
+import socket
+import time
+import logging
+import pytest
+import re
+from lib389 import DirSrv, Entry, tools
+from lib389.tools import DirSrvTools
+from lib389._constants import *
+from lib389.properties import *
+from constants import *
+
+logging.getLogger(__name__).setLevel(logging.DEBUG)
+log = logging.getLogger(__name__)
+
+installation_prefix = None
+
+class TopologyStandalone(object):
+    def __init__(self, standalone):
+        standalone.open()
+        self.standalone = standalone
+
[email protected](scope="module")
+def topology(request):
+    '''
+        This fixture is used to create a DirSrv instance for the 'module'.
+        At the beginning, there may already be an instance.
+        There may also be a backup for the instance.
+    
+        Principle:
+            If instance exists:
+                restart it
+            If backup exists:
+                create or rebind to instance
+                restore instance from backup
+            else:
+                Cleanup everything
+                    remove instance
+                    remove backup
+                Create instance
+                Create backup
+    '''
+    global installation_prefix
+
+    if installation_prefix:
+        args_instance[SER_DEPLOYED_DIR] = installation_prefix
+    schemainst   = DirSrv(verbose=False)
+    
+    # Args for the master instance
+    args_instance[SER_HOST] = HOST_STANDALONE
+    args_instance[SER_PORT] = PORT_STANDALONE
+    args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
+    schemainst.allocate(args_instance)
+    
+    # Get the status of the backups
+    backup   = schemainst.checkBackupFS()
+    
+    # Get the status of the instance and restart it if it exists
+    if schemainst.exists():
+        schemainst.stop(timeout=10)
+        schemainst.start(timeout=10)
+        
+    if backup:
+        # The backup exists, assuming it is correct 
+        # we just re-init the instance with it
+        if not schemainst.exists():
+            schemainst.create()
+            # Used to retrieve configuration information (dbdir, confdir...)
+            schemainst.open()
+        
+        # restore from backup
+        schemainst.stop(timeout=10)
+        schemainst.restoreFS(backup)
+        schemainst.start(timeout=10)
+    else:
+        # We should be here only in two conditions
+        #      - This is the first test
+        #        so we need to create everything
+        #      - Something weird happened (instance/backup destroyed)
+        #        so we discard everything and recreate all
+        
+        # Remove the backup. So even if we have a specific backup file
+        # (e.g backup) we clear all backups that an instance may have created
+        if backup:
+            schemainst.clearBackupFS()
+        
+        # Remove all the instances
+        if schemainst.exists():
+            schemainst.delete()
+                        
+        # Create the instances
+        schemainst.create()
+        schemainst.open()    
+                
+        # Time to create the backup
+        schemainst.stop(timeout=10)
+        schemainst.backupfile = schemainst.backupFS()
+        schemainst.start(timeout=10)    
+    # 
+    return TopologyStandalone(schemainst)
+
+attrclass = ldap.schema.models.AttributeType
+occlass = ldap.schema.models.ObjectClass
+
+def ochasattr(subschema, oc, mustormay, attr, key):
+    """See if the oc and any of its parents and ancestors have the
+    given attr"""
+    rc = False
+    if not key in oc.__dict__:
+        dd = cidict()
+        for ii in oc.__dict__[mustormay]:
+            dd[ii] = ii
+        oc.__dict__[key] = dd
+    if attr in oc.__dict__[key]:
+        rc = True
+    else:
+        # look in parents
+        for noroid in oc.sup:
+            ocpar = subschema.get_obj(occlass, noroid)
+            assert(ocpar)
+            rc = ochasattr(subschema, ocpar, mustormay, attr, key)
+            if rc:
+                break
+    return rc
+
+def ochasattrs(subschema, oc, mustormay, attrs):
+    key = mustormay + "dict"
+    ret = []
+    for attr in attrs:
+        if not ochasattr(subschema, oc, mustormay, attr, key):
+            ret.append(attr)
+    return ret
+
+def mycmp(v1, v2):
+    v1ary, v2ary = [v1], [v2]
+    if isinstance(v1, list) or isinstance(v1, tuple):
+        v1ary, v2ary = list(set([x.lower() for x in v1])), list(set([x.lower() for x in v2]))
+    if not len(v1ary) == len(v2ary):
+        return False
+    for v1, v2 in zip(v1ary, v2ary):
+        if isinstance(v1, basestring):
+            if not len(v1) == len(v2):
+                return False
+        if not v1 == v2:
+            return False
+    return True
+
+def ocgetdiffs(ldschema, oc1, oc2):
+    fields = ['obsolete', 'names', 'desc', 'must', 'may', 'kind', 'sup']
+    ret = ''
+    for field in fields:
+        v1, v2 = oc1.__dict__[field], oc2.__dict__[field]
+        if field == 'may' or field == 'must':
+            missing = ochasattrs(ldschema, oc1, field, oc2.__dict__[field])
+            if missing:
+                ret = ret + '\t%s is missing %s\n' % (field, missing)
+            missing = ochasattrs(ldschema, oc2, field, oc1.__dict__[field])
+            if missing:
+                ret = ret + '\t%s is missing %s\n' % (field, missing)
+        elif not mycmp(v1, v2):
+            ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, oc1.__dict__[field], oc2.__dict__[field])
+    return ret
+
+def atgetparfield(subschema, at, field):
+    v = None
+    for nameoroid in at.sup:
+        atpar = subschema.get_obj(attrclass, nameoroid)
+        assert(atpar)
+        v = atpar.__dict__.get(field, atgetparfield(subschema, atpar, field))
+        if v is not None:
+            break
+    return v
+
+syntax_len_supported = False
+
+def atgetdiffs(ldschema, at1, at2):
+    fields = ['names', 'desc', 'obsolete', 'sup', 'equality', 'ordering', 'substr', 'syntax',
+              'single_value', 'collective', 'no_user_mod', 'usage']
+    if syntax_len_supported:
+        fields.append('syntax_len')
+    ret = ''
+    for field in fields:
+        v1 = at1.__dict__.get(field) or atgetparfield(ldschema, at1, field)
+        v2 = at2.__dict__.get(field) or atgetparfield(ldschema, at2, field)
+        if not mycmp(v1, v2):
+            ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, at1.__dict__[field], at2.__dict__[field])
+    return ret
+
+def test_schema_comparewithfiles(topology):
+    '''Compare the schema from ldap cn=schema with the schema files'''
+    retval = True
+    schemainst = topology.standalone
+    ldschema = schemainst.schema.get_subschema()
+    assert ldschema
+    for fn in schemainst.schema.list_files():
+        fschema = schemainst.schema.file_to_subschema(fn)
+        if not fschema:
+            log.warn("Unable to parse %s as a schema file - skipping" % fn)
+            continue
+        assert fschema
+        for oid in fschema.listall(occlass):
+            se = fschema.get_obj(occlass, oid)
+            assert se
+            ldse = ldschema.get_obj(occlass, oid)
+            if not ldse:
+                log.error("objectclass in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
+                retval = False
+                continue
+            ret = ocgetdiffs(ldschema, ldse, se)
+            if ret:
+                log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
+                retval = False
+        for oid in fschema.listall(attrclass):
+            se = fschema.get_obj(attrclass, oid)
+            assert se
+            ldse = ldschema.get_obj(attrclass, oid)
+            if not ldse:
+                log.error("attributetype in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
+                retval = False
+                continue
+            ret = atgetdiffs(ldschema, ldse, se)
+            if ret:
+                log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
+                retval = False
+    assert retval
+
+def test_schema_final(topology):
+    topology.standalone.stop(timeout=10)
+
+def run_isolated():
+    '''
+        run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
+        To run isolated without py.test, you need to 
+            - edit this file and comment '@pytest.fixture' line before 'topology' function.
+            - set the installation prefix
+            - run this program
+    '''
+    global installation_prefix
+    installation_prefix =  os.environ.get('PREFIX')
+        
+    topo = topology(True)
+
+    test_schema_comparewithfiles(topo)
+    
+    test_schema_final(topo)
+
+if __name__ == '__main__':
+    run_isolated()
+