test_schema.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. '''
  10. Created on Dec 18, 2013
  11. @author: rmeggins
  12. '''
  13. import os
  14. import sys
  15. import time
  16. import ldap
  17. import six
  18. from ldap.cidict import cidict
  19. from ldap.schema import SubSchema
  20. import logging
  21. import pytest
  22. from lib389 import DirSrv, Entry, tools
  23. from lib389.tools import DirSrvTools
  24. from lib389._constants import *
  25. from lib389.properties import *
  26. logging.getLogger(__name__).setLevel(logging.DEBUG)
  27. log = logging.getLogger(__name__)
  28. attrclass = ldap.schema.models.AttributeType
  29. occlass = ldap.schema.models.ObjectClass
  30. syntax_len_supported = False
  31. class TopologyStandalone(object):
  32. def __init__(self, standalone):
  33. standalone.open()
  34. self.standalone = standalone
  35. @pytest.fixture(scope="module")
  36. def topology(request):
  37. '''
  38. This fixture is used to create a DirSrv instance for the 'module'.
  39. '''
  40. schemainst = DirSrv(verbose=False)
  41. # Args for the master instance
  42. args_instance[SER_HOST] = HOST_STANDALONE
  43. args_instance[SER_PORT] = PORT_STANDALONE
  44. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  45. schemainst.allocate(args_instance)
  46. # Remove all the instance
  47. if schemainst.exists():
  48. schemainst.delete()
  49. # Create the instance
  50. schemainst.create()
  51. schemainst.open()
  52. def fin():
  53. schemainst.delete()
  54. request.addfinalizer(fin)
  55. return TopologyStandalone(schemainst)
  56. def ochasattr(subschema, oc, mustormay, attr, key):
  57. """See if the oc and any of its parents and ancestors have the
  58. given attr"""
  59. rc = False
  60. if not key in oc.__dict__:
  61. dd = cidict()
  62. for ii in oc.__dict__[mustormay]:
  63. dd[ii] = ii
  64. oc.__dict__[key] = dd
  65. if attr in oc.__dict__[key]:
  66. rc = True
  67. else:
  68. # look in parents
  69. for noroid in oc.sup:
  70. ocpar = subschema.get_obj(occlass, noroid)
  71. assert(ocpar)
  72. rc = ochasattr(subschema, ocpar, mustormay, attr, key)
  73. if rc:
  74. break
  75. return rc
  76. def ochasattrs(subschema, oc, mustormay, attrs):
  77. key = mustormay + "dict"
  78. ret = []
  79. for attr in attrs:
  80. if not ochasattr(subschema, oc, mustormay, attr, key):
  81. ret.append(attr)
  82. return ret
  83. def mycmp(v1, v2):
  84. v1ary, v2ary = [v1], [v2]
  85. if isinstance(v1, list) or isinstance(v1, tuple):
  86. v1ary, v2ary = list(set([x.lower() for x in v1])), list(set([x.lower() for x in v2]))
  87. if not len(v1ary) == len(v2ary):
  88. return False
  89. for v1, v2 in zip(v1ary, v2ary):
  90. if isinstance(v1, six.string_types):
  91. if not len(v1) == len(v2):
  92. return False
  93. if not v1 == v2:
  94. return False
  95. return True
  96. def ocgetdiffs(ldschema, oc1, oc2):
  97. fields = ['obsolete', 'names', 'desc', 'must', 'may', 'kind', 'sup']
  98. ret = ''
  99. for field in fields:
  100. v1, v2 = oc1.__dict__[field], oc2.__dict__[field]
  101. if field == 'may' or field == 'must':
  102. missing = ochasattrs(ldschema, oc1, field, oc2.__dict__[field])
  103. if missing:
  104. ret = ret + '\t%s is missing %s\n' % (field, missing)
  105. missing = ochasattrs(ldschema, oc2, field, oc1.__dict__[field])
  106. if missing:
  107. ret = ret + '\t%s is missing %s\n' % (field, missing)
  108. elif not mycmp(v1, v2):
  109. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, oc1.__dict__[field], oc2.__dict__[field])
  110. return ret
  111. def atgetparfield(subschema, at, field):
  112. v = None
  113. for nameoroid in at.sup:
  114. atpar = subschema.get_obj(attrclass, nameoroid)
  115. assert(atpar)
  116. v = atpar.__dict__.get(field, atgetparfield(subschema, atpar, field))
  117. if v is not None:
  118. break
  119. return v
  120. def atgetdiffs(ldschema, at1, at2):
  121. fields = ['names', 'desc', 'obsolete', 'sup', 'equality', 'ordering', 'substr', 'syntax',
  122. 'single_value', 'collective', 'no_user_mod', 'usage']
  123. if syntax_len_supported:
  124. fields.append('syntax_len')
  125. ret = ''
  126. for field in fields:
  127. v1 = at1.__dict__.get(field) or atgetparfield(ldschema, at1, field)
  128. v2 = at2.__dict__.get(field) or atgetparfield(ldschema, at2, field)
  129. if not mycmp(v1, v2):
  130. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, at1.__dict__[field], at2.__dict__[field])
  131. return ret
  132. def test_schema_comparewithfiles(topology):
  133. '''Compare the schema from ldap cn=schema with the schema files'''
  134. log.info('Running test_schema_comparewithfiles...')
  135. retval = True
  136. schemainst = topology.standalone
  137. ldschema = schemainst.schema.get_subschema()
  138. assert ldschema
  139. for fn in schemainst.schema.list_files():
  140. fschema = schemainst.schema.file_to_subschema(fn)
  141. if not fschema:
  142. log.warn("Unable to parse %s as a schema file - skipping" % fn)
  143. continue
  144. assert fschema
  145. for oid in fschema.listall(occlass):
  146. se = fschema.get_obj(occlass, oid)
  147. assert se
  148. ldse = ldschema.get_obj(occlass, oid)
  149. if not ldse:
  150. log.error("objectclass in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  151. retval = False
  152. continue
  153. ret = ocgetdiffs(ldschema, ldse, se)
  154. if ret:
  155. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  156. retval = False
  157. for oid in fschema.listall(attrclass):
  158. se = fschema.get_obj(attrclass, oid)
  159. assert se
  160. ldse = ldschema.get_obj(attrclass, oid)
  161. if not ldse:
  162. log.error("attributetype in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  163. retval = False
  164. continue
  165. ret = atgetdiffs(ldschema, ldse, se)
  166. if ret:
  167. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  168. retval = False
  169. assert retval
  170. log.info('test_schema_comparewithfiles: PASSED')
  171. if __name__ == '__main__':
  172. # Run isolated
  173. # -s for DEBUG mode
  174. CURRENT_FILE = os.path.realpath(__file__)
  175. pytest.main("-s %s" % CURRENT_FILE)