test_schema.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. '''
  2. Created on Dec 18, 2013
  3. @author: rmeggins
  4. '''
  5. import os
  6. import sys
  7. import time
  8. import ldap
  9. from ldap.cidict import cidict
  10. from ldap.schema import SubSchema
  11. import logging
  12. import socket
  13. import time
  14. import logging
  15. import pytest
  16. import re
  17. from lib389 import DirSrv, Entry, tools
  18. from lib389.tools import DirSrvTools
  19. from lib389._constants import *
  20. from lib389.properties import *
  21. from constants import *
  22. logging.getLogger(__name__).setLevel(logging.DEBUG)
  23. log = logging.getLogger(__name__)
  24. installation_prefix = None
  25. class TopologyStandalone(object):
  26. def __init__(self, standalone):
  27. standalone.open()
  28. self.standalone = standalone
  29. @pytest.fixture(scope="module")
  30. def topology(request):
  31. '''
  32. This fixture is used to create a DirSrv instance for the 'module'.
  33. At the beginning, there may already be an instance.
  34. There may also be a backup for the instance.
  35. Principle:
  36. If instance exists:
  37. restart it
  38. If backup exists:
  39. create or rebind to instance
  40. restore instance from backup
  41. else:
  42. Cleanup everything
  43. remove instance
  44. remove backup
  45. Create instance
  46. Create backup
  47. '''
  48. global installation_prefix
  49. if installation_prefix:
  50. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  51. schemainst = DirSrv(verbose=False)
  52. # Args for the master instance
  53. args_instance[SER_HOST] = HOST_STANDALONE
  54. args_instance[SER_PORT] = PORT_STANDALONE
  55. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  56. schemainst.allocate(args_instance)
  57. # Get the status of the backups
  58. backup = schemainst.checkBackupFS()
  59. # Get the status of the instance and restart it if it exists
  60. if schemainst.exists():
  61. schemainst.stop(timeout=10)
  62. schemainst.start(timeout=10)
  63. if backup:
  64. # The backup exists, assuming it is correct
  65. # we just re-init the instance with it
  66. if not schemainst.exists():
  67. schemainst.create()
  68. # Used to retrieve configuration information (dbdir, confdir...)
  69. schemainst.open()
  70. # restore from backup
  71. schemainst.stop(timeout=10)
  72. schemainst.restoreFS(backup)
  73. schemainst.start(timeout=10)
  74. else:
  75. # We should be here only in two conditions
  76. # - This is the first test
  77. # so we need to create everything
  78. # - Something weird happened (instance/backup destroyed)
  79. # so we discard everything and recreate all
  80. # Remove the backup. So even if we have a specific backup file
  81. # (e.g backup) we clear all backups that an instance may have created
  82. if backup:
  83. schemainst.clearBackupFS()
  84. # Remove all the instances
  85. if schemainst.exists():
  86. schemainst.delete()
  87. # Create the instances
  88. schemainst.create()
  89. schemainst.open()
  90. # Time to create the backup
  91. schemainst.stop(timeout=10)
  92. schemainst.backupfile = schemainst.backupFS()
  93. schemainst.start(timeout=10)
  94. #
  95. return TopologyStandalone(schemainst)
  96. attrclass = ldap.schema.models.AttributeType
  97. occlass = ldap.schema.models.ObjectClass
  98. def ochasattr(subschema, oc, mustormay, attr, key):
  99. """See if the oc and any of its parents and ancestors have the
  100. given attr"""
  101. rc = False
  102. if not key in oc.__dict__:
  103. dd = cidict()
  104. for ii in oc.__dict__[mustormay]:
  105. dd[ii] = ii
  106. oc.__dict__[key] = dd
  107. if attr in oc.__dict__[key]:
  108. rc = True
  109. else:
  110. # look in parents
  111. for noroid in oc.sup:
  112. ocpar = subschema.get_obj(occlass, noroid)
  113. assert(ocpar)
  114. rc = ochasattr(subschema, ocpar, mustormay, attr, key)
  115. if rc:
  116. break
  117. return rc
  118. def ochasattrs(subschema, oc, mustormay, attrs):
  119. key = mustormay + "dict"
  120. ret = []
  121. for attr in attrs:
  122. if not ochasattr(subschema, oc, mustormay, attr, key):
  123. ret.append(attr)
  124. return ret
  125. def mycmp(v1, v2):
  126. v1ary, v2ary = [v1], [v2]
  127. if isinstance(v1, list) or isinstance(v1, tuple):
  128. v1ary, v2ary = list(set([x.lower() for x in v1])), list(set([x.lower() for x in v2]))
  129. if not len(v1ary) == len(v2ary):
  130. return False
  131. for v1, v2 in zip(v1ary, v2ary):
  132. if isinstance(v1, basestring):
  133. if not len(v1) == len(v2):
  134. return False
  135. if not v1 == v2:
  136. return False
  137. return True
  138. def ocgetdiffs(ldschema, oc1, oc2):
  139. fields = ['obsolete', 'names', 'desc', 'must', 'may', 'kind', 'sup']
  140. ret = ''
  141. for field in fields:
  142. v1, v2 = oc1.__dict__[field], oc2.__dict__[field]
  143. if field == 'may' or field == 'must':
  144. missing = ochasattrs(ldschema, oc1, field, oc2.__dict__[field])
  145. if missing:
  146. ret = ret + '\t%s is missing %s\n' % (field, missing)
  147. missing = ochasattrs(ldschema, oc2, field, oc1.__dict__[field])
  148. if missing:
  149. ret = ret + '\t%s is missing %s\n' % (field, missing)
  150. elif not mycmp(v1, v2):
  151. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, oc1.__dict__[field], oc2.__dict__[field])
  152. return ret
  153. def atgetparfield(subschema, at, field):
  154. v = None
  155. for nameoroid in at.sup:
  156. atpar = subschema.get_obj(attrclass, nameoroid)
  157. assert(atpar)
  158. v = atpar.__dict__.get(field, atgetparfield(subschema, atpar, field))
  159. if v is not None:
  160. break
  161. return v
  162. syntax_len_supported = False
  163. def atgetdiffs(ldschema, at1, at2):
  164. fields = ['names', 'desc', 'obsolete', 'sup', 'equality', 'ordering', 'substr', 'syntax',
  165. 'single_value', 'collective', 'no_user_mod', 'usage']
  166. if syntax_len_supported:
  167. fields.append('syntax_len')
  168. ret = ''
  169. for field in fields:
  170. v1 = at1.__dict__.get(field) or atgetparfield(ldschema, at1, field)
  171. v2 = at2.__dict__.get(field) or atgetparfield(ldschema, at2, field)
  172. if not mycmp(v1, v2):
  173. ret = ret + '\t%s differs: [%s] vs. [%s]\n' % (field, at1.__dict__[field], at2.__dict__[field])
  174. return ret
  175. def test_schema_comparewithfiles(topology):
  176. '''Compare the schema from ldap cn=schema with the schema files'''
  177. retval = True
  178. schemainst = topology.standalone
  179. ldschema = schemainst.schema.get_subschema()
  180. assert ldschema
  181. for fn in schemainst.schema.list_files():
  182. fschema = schemainst.schema.file_to_subschema(fn)
  183. if not fschema:
  184. log.warn("Unable to parse %s as a schema file - skipping" % fn)
  185. continue
  186. assert fschema
  187. for oid in fschema.listall(occlass):
  188. se = fschema.get_obj(occlass, oid)
  189. assert se
  190. ldse = ldschema.get_obj(occlass, oid)
  191. if not ldse:
  192. log.error("objectclass in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  193. retval = False
  194. continue
  195. ret = ocgetdiffs(ldschema, ldse, se)
  196. if ret:
  197. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  198. retval = False
  199. for oid in fschema.listall(attrclass):
  200. se = fschema.get_obj(attrclass, oid)
  201. assert se
  202. ldse = ldschema.get_obj(attrclass, oid)
  203. if not ldse:
  204. log.error("attributetype in %s but not in %s: %s" % (fn, DN_SCHEMA, se))
  205. retval = False
  206. continue
  207. ret = atgetdiffs(ldschema, ldse, se)
  208. if ret:
  209. log.error("name %s oid %s\n%s" % (se.names[0], oid, ret))
  210. retval = False
  211. assert retval
  212. def test_schema_final(topology):
  213. topology.standalone.delete()
  214. def run_isolated():
  215. '''
  216. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  217. To run isolated without py.test, you need to
  218. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  219. - set the installation prefix
  220. - run this program
  221. '''
  222. global installation_prefix
  223. installation_prefix = os.environ.get('PREFIX')
  224. topo = topology(True)
  225. test_schema_comparewithfiles(topo)
  226. test_schema_final(topo)
  227. if __name__ == '__main__':
  228. run_isolated()