test_schema.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. '''
  10. Created on Dec 18, 2013
  11. @author: rmeggins
  12. '''
  13. import os
  14. import sys
  15. import time
  16. import ldap
  17. import six
  18. from ldap.cidict import cidict
  19. from ldap.schema import SubSchema
  20. import logging
  21. import pytest
  22. from lib389 import DirSrv, Entry, tools
  23. from lib389.tools import DirSrvTools
  24. from lib389._constants import *
  25. from lib389.properties import *
  26. logging.getLogger(__name__).setLevel(logging.DEBUG)
  27. log = logging.getLogger(__name__)
  28. installation_prefix = None
  29. attrclass = ldap.schema.models.AttributeType
  30. occlass = ldap.schema.models.ObjectClass
  31. syntax_len_supported = False
  32. class TopologyStandalone(object):
  33. def __init__(self, standalone):
  34. standalone.open()
  35. self.standalone = standalone
  36. @pytest.fixture(scope="module")
  37. def topology(request):
  38. '''
  39. This fixture is used to create a DirSrv instance for the 'module'.
  40. '''
  41. global installation_prefix
  42. if installation_prefix:
  43. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  44. schemainst = DirSrv(verbose=False)
  45. # Args for the master instance
  46. args_instance[SER_HOST] = HOST_STANDALONE
  47. args_instance[SER_PORT] = PORT_STANDALONE
  48. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  49. schemainst.allocate(args_instance)
  50. # Remove all the instance
  51. if schemainst.exists():
  52. schemainst.delete()
  53. # Create the instance
  54. schemainst.create()
  55. schemainst.open()
  56. return TopologyStandalone(schemainst)
  57. def ochasattr(subschema, oc, mustormay, attr, key):
  58. """See if the oc and any of its parents and ancestors have the
  59. given attr"""
  60. rc = False
  61. if not key in oc.__dict__:
  62. dd = cidict()
  63. for ii in oc.__dict__[mustormay]:
  64. dd[ii] = ii
  65. oc.__dict__[key] = dd
  66. if attr in oc.__dict__[key]:
  67. rc = True
  68. else:
  69. # look in parents
  70. for noroid in oc.sup:
  71. ocpar = subschema.get_obj(occlass, noroid)
  72. assert(ocpar)
  73. rc = ochasattr(subschema, ocpar, mustormay, attr, key)
  74. if rc:
  75. break
  76. return rc
  77. def ochasattrs(subschema, oc, mustormay, attrs):
  78. key = mustormay + "dict"
  79. ret = []
  80. for attr in attrs:
  81. if not ochasattr(subschema, oc, mustormay, attr, key):
  82. ret.append(attr)
  83. return ret
  84. def mycmp(v1, v2):
  85. v1ary, v2ary = [v1], [v2]
  86. if isinstance(v1, list) or isinstance(v1, tuple):
  87. v1ary, v2ary = list(set([x.lower() for x in v1])), list(set([x.lower() for x in v2]))
  88. if not len(v1ary) == len(v2ary):
  89. return False
  90. for v1, v2 in zip(v1ary, v2ary):
  91. if isinstance(v1, six.string_types):
  92. if not len(v1) == len(v2):
  93. return False
  94. if not v1 == v2:
  95. return False
  96. return True
  97. def ocgetdiffs(ldschema, oc1, oc2):
  98. fields = ['obsolete', 'names', 'desc', 'must', 'may', 'kind', 'sup']
  99. ret = ''
  100. for field in fields:
  101. v1, v2 = oc1.__dict__[field], oc2.__dict__[field]
  102. if field == 'may' or field == 'must':
  103. missing = ochasattrs(ldschema, oc1, field, oc2.__dict__[field])
  104. if missing:
  105. ret = ret + '\t%s is missing %s\n' % (field, missing)
  106. missing = ochasattrs(ldschema, oc2, field, oc1.__dict__[field])
  107. if missing:
  108. ret = ret + '\t%s is missing %s\n' % (field, missing)
  109. elif not mycmp(v1, v2):
  110. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, oc1.__dict__[field], oc2.__dict__[field])
  111. return ret
  112. def atgetparfield(subschema, at, field):
  113. v = None
  114. for nameoroid in at.sup:
  115. atpar = subschema.get_obj(attrclass, nameoroid)
  116. assert(atpar)
  117. v = atpar.__dict__.get(field, atgetparfield(subschema, atpar, field))
  118. if v is not None:
  119. break
  120. return v
  121. def atgetdiffs(ldschema, at1, at2):
  122. fields = ['names', 'desc', 'obsolete', 'sup', 'equality', 'ordering', 'substr', 'syntax',
  123. 'single_value', 'collective', 'no_user_mod', 'usage']
  124. if syntax_len_supported:
  125. fields.append('syntax_len')
  126. ret = ''
  127. for field in fields:
  128. v1 = at1.__dict__.get(field) or atgetparfield(ldschema, at1, field)
  129. v2 = at2.__dict__.get(field) or atgetparfield(ldschema, at2, field)
  130. if not mycmp(v1, v2):
  131. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, at1.__dict__[field], at2.__dict__[field])
  132. return ret
  133. def test_schema_comparewithfiles(topology):
  134. '''Compare the schema from ldap cn=schema with the schema files'''
  135. log.info('Running test_schema_comparewithfiles...')
  136. retval = True
  137. schemainst = topology.standalone
  138. ldschema = schemainst.schema.get_subschema()
  139. assert ldschema
  140. for fn in schemainst.schema.list_files():
  141. fschema = schemainst.schema.file_to_subschema(fn)
  142. if not fschema:
  143. log.warn("Unable to parse %s as a schema file - skipping" % fn)
  144. continue
  145. assert fschema
  146. for oid in fschema.listall(occlass):
  147. se = fschema.get_obj(occlass, oid)
  148. assert se
  149. ldse = ldschema.get_obj(occlass, oid)
  150. if not ldse:
  151. log.error("objectclass in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  152. retval = False
  153. continue
  154. ret = ocgetdiffs(ldschema, ldse, se)
  155. if ret:
  156. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  157. retval = False
  158. for oid in fschema.listall(attrclass):
  159. se = fschema.get_obj(attrclass, oid)
  160. assert se
  161. ldse = ldschema.get_obj(attrclass, oid)
  162. if not ldse:
  163. log.error("attributetype in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  164. retval = False
  165. continue
  166. ret = atgetdiffs(ldschema, ldse, se)
  167. if ret:
  168. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  169. retval = False
  170. assert retval
  171. log.info('test_schema_comparewithfiles: PASSED')
  172. def test_schema_final(topology):
  173. topology.standalone.delete()
  174. def run_isolated():
  175. '''
  176. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  177. To run isolated without py.test, you need to
  178. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  179. - set the installation prefix
  180. - run this program
  181. '''
  182. global installation_prefix
  183. installation_prefix = os.environ.get('PREFIX')
  184. topo = topology(True)
  185. test_schema_comparewithfiles(topo)
  186. test_schema_final(topo)
  187. if __name__ == '__main__':
  188. run_isolated()