test_schema.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. '''
  2. Created on Dec 18, 2013
  3. @author: rmeggins
  4. '''
  5. import os
  6. import sys
  7. import time
  8. import ldap
  9. from ldap.cidict import cidict
  10. from ldap.schema import SubSchema
  11. import logging
  12. import pytest
  13. from lib389 import DirSrv, Entry, tools
  14. from lib389.tools import DirSrvTools
  15. from lib389._constants import *
  16. from lib389.properties import *
  17. logging.getLogger(__name__).setLevel(logging.DEBUG)
  18. log = logging.getLogger(__name__)
  19. installation_prefix = None
  20. class TopologyStandalone(object):
  21. def __init__(self, standalone):
  22. standalone.open()
  23. self.standalone = standalone
  24. @pytest.fixture(scope="module")
  25. def topology(request):
  26. '''
  27. This fixture is used to create a DirSrv instance for the 'module'.
  28. '''
  29. global installation_prefix
  30. if installation_prefix:
  31. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  32. schemainst = DirSrv(verbose=False)
  33. # Args for the master instance
  34. args_instance[SER_HOST] = HOST_STANDALONE
  35. args_instance[SER_PORT] = PORT_STANDALONE
  36. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  37. schemainst.allocate(args_instance)
  38. # Remove all the instance
  39. if schemainst.exists():
  40. schemainst.delete()
  41. # Create the instance
  42. schemainst.create()
  43. schemainst.open()
  44. return TopologyStandalone(schemainst)
  45. attrclass = ldap.schema.models.AttributeType
  46. occlass = ldap.schema.models.ObjectClass
  47. def ochasattr(subschema, oc, mustormay, attr, key):
  48. """See if the oc and any of its parents and ancestors have the
  49. given attr"""
  50. rc = False
  51. if not key in oc.__dict__:
  52. dd = cidict()
  53. for ii in oc.__dict__[mustormay]:
  54. dd[ii] = ii
  55. oc.__dict__[key] = dd
  56. if attr in oc.__dict__[key]:
  57. rc = True
  58. else:
  59. # look in parents
  60. for noroid in oc.sup:
  61. ocpar = subschema.get_obj(occlass, noroid)
  62. assert(ocpar)
  63. rc = ochasattr(subschema, ocpar, mustormay, attr, key)
  64. if rc:
  65. break
  66. return rc
  67. def ochasattrs(subschema, oc, mustormay, attrs):
  68. key = mustormay + "dict"
  69. ret = []
  70. for attr in attrs:
  71. if not ochasattr(subschema, oc, mustormay, attr, key):
  72. ret.append(attr)
  73. return ret
  74. def mycmp(v1, v2):
  75. v1ary, v2ary = [v1], [v2]
  76. if isinstance(v1, list) or isinstance(v1, tuple):
  77. v1ary, v2ary = list(set([x.lower() for x in v1])), list(set([x.lower() for x in v2]))
  78. if not len(v1ary) == len(v2ary):
  79. return False
  80. for v1, v2 in zip(v1ary, v2ary):
  81. if isinstance(v1, basestring):
  82. if not len(v1) == len(v2):
  83. return False
  84. if not v1 == v2:
  85. return False
  86. return True
  87. def ocgetdiffs(ldschema, oc1, oc2):
  88. fields = ['obsolete', 'names', 'desc', 'must', 'may', 'kind', 'sup']
  89. ret = ''
  90. for field in fields:
  91. v1, v2 = oc1.__dict__[field], oc2.__dict__[field]
  92. if field == 'may' or field == 'must':
  93. missing = ochasattrs(ldschema, oc1, field, oc2.__dict__[field])
  94. if missing:
  95. ret = ret + '\t%s is missing %s\n' % (field, missing)
  96. missing = ochasattrs(ldschema, oc2, field, oc1.__dict__[field])
  97. if missing:
  98. ret = ret + '\t%s is missing %s\n' % (field, missing)
  99. elif not mycmp(v1, v2):
  100. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, oc1.__dict__[field], oc2.__dict__[field])
  101. return ret
  102. def atgetparfield(subschema, at, field):
  103. v = None
  104. for nameoroid in at.sup:
  105. atpar = subschema.get_obj(attrclass, nameoroid)
  106. assert(atpar)
  107. v = atpar.__dict__.get(field, atgetparfield(subschema, atpar, field))
  108. if v is not None:
  109. break
  110. return v
  111. syntax_len_supported = False
  112. def atgetdiffs(ldschema, at1, at2):
  113. fields = ['names', 'desc', 'obsolete', 'sup', 'equality', 'ordering', 'substr', 'syntax',
  114. 'single_value', 'collective', 'no_user_mod', 'usage']
  115. if syntax_len_supported:
  116. fields.append('syntax_len')
  117. ret = ''
  118. for field in fields:
  119. v1 = at1.__dict__.get(field) or atgetparfield(ldschema, at1, field)
  120. v2 = at2.__dict__.get(field) or atgetparfield(ldschema, at2, field)
  121. if not mycmp(v1, v2):
  122. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, at1.__dict__[field], at2.__dict__[field])
  123. return ret
  124. def test_schema_comparewithfiles(topology):
  125. '''Compare the schema from ldap cn=schema with the schema files'''
  126. retval = True
  127. schemainst = topology.standalone
  128. ldschema = schemainst.schema.get_subschema()
  129. assert ldschema
  130. for fn in schemainst.schema.list_files():
  131. fschema = schemainst.schema.file_to_subschema(fn)
  132. if not fschema:
  133. log.warn("Unable to parse %s as a schema file - skipping" % fn)
  134. continue
  135. assert fschema
  136. for oid in fschema.listall(occlass):
  137. se = fschema.get_obj(occlass, oid)
  138. assert se
  139. ldse = ldschema.get_obj(occlass, oid)
  140. if not ldse:
  141. log.error("objectclass in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  142. retval = False
  143. continue
  144. ret = ocgetdiffs(ldschema, ldse, se)
  145. if ret:
  146. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  147. retval = False
  148. for oid in fschema.listall(attrclass):
  149. se = fschema.get_obj(attrclass, oid)
  150. assert se
  151. ldse = ldschema.get_obj(attrclass, oid)
  152. if not ldse:
  153. log.error("attributetype in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  154. retval = False
  155. continue
  156. ret = atgetdiffs(ldschema, ldse, se)
  157. if ret:
  158. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  159. retval = False
  160. assert retval
  161. def test_schema_final(topology):
  162. topology.standalone.delete()
  163. def run_isolated():
  164. '''
  165. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  166. To run isolated without py.test, you need to
  167. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  168. - set the installation prefix
  169. - run this program
  170. '''
  171. global installation_prefix
  172. installation_prefix = os.environ.get('PREFIX')
  173. topo = topology(True)
  174. test_schema_comparewithfiles(topo)
  175. test_schema_final(topo)
  176. if __name__ == '__main__':
  177. run_isolated()