1
0

ticket47981_test.py 12 KB


  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import ldap.sasl
  6. import logging
  7. import socket
  8. import pytest
  9. from lib389 import DirSrv, Entry, tools, tasks
  10. from lib389.tools import DirSrvTools
  11. from lib389._constants import *
  12. from lib389.properties import *
  13. from lib389.tasks import *
  14. from constants import *
  15. log = logging.getLogger(__name__)
  16. installation_prefix = None
  17. BRANCH = 'ou=people,' + DEFAULT_SUFFIX
  18. USER_DN = 'uid=user1,%s' % (BRANCH)
  19. BRANCH_CONTAINER = 'cn=nsPwPolicyContainer,ou=people,dc=example,dc=com'
  20. BRANCH_COS_DEF = 'cn=nsPwPolicy_CoS,ou=people,dc=example,dc=com'
  21. BRANCH_PWP = 'cn=cn\\3DnsPwPolicyEntry\\2Cou\\3DPeople\\2Cdc\\3Dexample\\2Cdc\\3Dcom,' + \
  22. 'cn=nsPwPolicyContainer,ou=People,dc=example,dc=com'
  23. BRANCH_COS_TMPL = 'cn=cn\\3DnsPwTemplateEntry\\2Cou\\3DPeople\\2Cdc\\3Dexample\\2Cdc\\3Dcom,' + \
  24. 'cn=nsPwPolicyContainer,ou=People,dc=example,dc=com'
  25. SECOND_SUFFIX = 'o=netscaperoot'
  26. BE_NAME = 'netscaperoot'
  27. class TopologyStandalone(object):
  28. def __init__(self, standalone):
  29. standalone.open()
  30. self.standalone = standalone
  31. @pytest.fixture(scope="module")
  32. def topology(request):
  33. '''
  34. This fixture is used to standalone topology for the 'module'.
  35. At the beginning, It may exists a standalone instance.
  36. It may also exists a backup for the standalone instance.
  37. Principle:
  38. If standalone instance exists:
  39. restart it
  40. If backup of standalone exists:
  41. create/rebind to standalone
  42. restore standalone instance from backup
  43. else:
  44. Cleanup everything
  45. remove instance
  46. remove backup
  47. Create instance
  48. Create backup
  49. '''
  50. global installation_prefix
  51. if installation_prefix:
  52. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  53. standalone = DirSrv(verbose=False)
  54. # Args for the standalone instance
  55. args_instance[SER_HOST] = HOST_STANDALONE
  56. args_instance[SER_PORT] = PORT_STANDALONE
  57. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  58. args_standalone = args_instance.copy()
  59. standalone.allocate(args_standalone)
  60. # Get the status of the backups
  61. backup_standalone = standalone.checkBackupFS()
  62. # Get the status of the instance and restart it if it exists
  63. instance_standalone = standalone.exists()
  64. if instance_standalone:
  65. # assuming the instance is already stopped, just wait 5 sec max
  66. standalone.stop(timeout=5)
  67. standalone.start(timeout=10)
  68. if backup_standalone:
  69. # The backup exist, assuming it is correct
  70. # we just re-init the instance with it
  71. if not instance_standalone:
  72. standalone.create()
  73. # Used to retrieve configuration information (dbdir, confdir...)
  74. standalone.open()
  75. # restore standalone instance from backup
  76. standalone.stop(timeout=10)
  77. standalone.restoreFS(backup_standalone)
  78. standalone.start(timeout=10)
  79. else:
  80. # We should be here only in two conditions
  81. # - This is the first time a test involve standalone instance
  82. # - Something weird happened (instance/backup destroyed)
  83. # so we discard everything and recreate all
  84. # Remove the backup. So even if we have a specific backup file
  85. # (e.g backup_standalone) we clear backup that an instance may have created
  86. if backup_standalone:
  87. standalone.clearBackupFS()
  88. # Remove the instance
  89. if instance_standalone:
  90. standalone.delete()
  91. # Create the instance
  92. standalone.create()
  93. # Used to retrieve configuration information (dbdir, confdir...)
  94. standalone.open()
  95. # Time to create the backups
  96. standalone.stop(timeout=10)
  97. standalone.backupfile = standalone.backupFS()
  98. standalone.start(timeout=10)
  99. # clear the tmp directory
  100. standalone.clearTmpDir(__file__)
  101. #
  102. # Here we have standalone instance up and running
  103. # Either coming from a backup recovery
  104. # or from a fresh (re)init
  105. # Time to return the topology
  106. return TopologyStandalone(standalone)
  107. def addSubtreePwPolicy(inst):
  108. #
  109. # Add subtree policy to the people branch
  110. #
  111. try:
  112. inst.add_s(Entry((BRANCH_CONTAINER, {
  113. 'objectclass': 'top nsContainer'.split(),
  114. 'cn': 'nsPwPolicyContainer'
  115. })))
  116. except ldap.LDAPError, e:
  117. log.error('Failed to add subtree container for ou=people: error ' + e.message['desc'])
  118. assert False
  119. # Add the password policy subentry
  120. try:
  121. inst.add_s(Entry((BRANCH_PWP, {
  122. 'objectclass': 'top ldapsubentry passwordpolicy'.split(),
  123. 'cn': 'cn=nsPwPolicyEntry,ou=people,dc=example,dc=com',
  124. 'passwordMustChange': 'off',
  125. 'passwordExp': 'off',
  126. 'passwordHistory': 'off',
  127. 'passwordMinAge': '0',
  128. 'passwordChange': 'off',
  129. 'passwordStorageScheme': 'ssha'
  130. })))
  131. except ldap.LDAPError, e:
  132. log.error('Failed to add passwordpolicy: error ' + e.message['desc'])
  133. assert False
  134. # Add the COS template
  135. try:
  136. inst.add_s(Entry((BRANCH_COS_TMPL, {
  137. 'objectclass': 'top ldapsubentry costemplate extensibleObject'.split(),
  138. 'cn': 'cn=nsPwPolicyEntry,ou=people,dc=example,dc=com',
  139. 'cosPriority': '1',
  140. 'cn': 'cn=nsPwTemplateEntry,ou=people,dc=example,dc=com',
  141. 'pwdpolicysubentry': BRANCH_PWP
  142. })))
  143. except ldap.LDAPError, e:
  144. log.error('Failed to add COS template: error ' + e.message['desc'])
  145. assert False
  146. # Add the COS definition
  147. try:
  148. inst.add_s(Entry((BRANCH_COS_DEF, {
  149. 'objectclass': 'top ldapsubentry cosSuperDefinition cosPointerDefinition'.split(),
  150. 'cn': 'cn=nsPwPolicyEntry,ou=people,dc=example,dc=com',
  151. 'costemplatedn': BRANCH_COS_TMPL,
  152. 'cosAttribute': 'pwdpolicysubentry default operational-default'
  153. })))
  154. except ldap.LDAPError, e:
  155. log.error('Failed to add COS def: error ' + e.message['desc'])
  156. assert False
  157. time.sleep(0.5)
  158. def delSubtreePwPolicy(inst):
  159. try:
  160. inst.delete_s(BRANCH_COS_DEF)
  161. except ldap.LDAPError, e:
  162. log.error('Failed to delete COS def: error ' + e.message['desc'])
  163. assert False
  164. try:
  165. inst.delete_s(BRANCH_COS_TMPL)
  166. except ldap.LDAPError, e:
  167. log.error('Failed to delete COS template: error ' + e.message['desc'])
  168. assert False
  169. try:
  170. inst.delete_s(BRANCH_PWP)
  171. except ldap.LDAPError, e:
  172. log.error('Failed to delete COS password policy: error ' + e.message['desc'])
  173. assert False
  174. try:
  175. inst.delete_s(BRANCH_CONTAINER)
  176. except ldap.LDAPError, e:
  177. log.error('Failed to delete COS container: error ' + e.message['desc'])
  178. assert False
  179. time.sleep(0.5)
  180. def test_ticket47981(topology):
  181. """
  182. If there are multiple suffixes, and the last suffix checked does not contain any COS entries,
  183. while other suffixes do, then the vattr cache is not invalidated as it should be. Then any
  184. cached entries will still contain the old COS attributes/values.
  185. """
  186. log.info('Testing Ticket 47981 - Test that COS def changes are correctly reflected in affected users')
  187. #
  188. # Create a second backend that does not have any COS entries
  189. #
  190. log.info('Adding second suffix that will not contain any COS entries...\n')
  191. topology.standalone.backend.create(SECOND_SUFFIX, {BACKEND_NAME: BE_NAME})
  192. topology.standalone.mappingtree.create(SECOND_SUFFIX, bename=BE_NAME)
  193. try:
  194. topology.standalone.add_s(Entry((SECOND_SUFFIX, {
  195. 'objectclass': 'top organization'.split(),
  196. 'o': BE_NAME})))
  197. except ldap.ALREADY_EXISTS:
  198. pass
  199. except ldap.LDAPError, e:
  200. log.error('Failed to create suffix entry: error ' + e.message['desc'])
  201. assert False
  202. #
  203. # Add People branch, it might already exist
  204. #
  205. log.info('Add our test entries to the default suffix, and proceed with the test...')
  206. try:
  207. topology.standalone.add_s(Entry((BRANCH, {
  208. 'objectclass': 'top extensibleObject'.split(),
  209. 'ou': 'level4'
  210. })))
  211. except ldap.ALREADY_EXISTS:
  212. pass
  213. except ldap.LDAPError, e:
  214. log.error('Failed to add ou=people: error ' + e.message['desc'])
  215. assert False
  216. #
  217. # Add a user to the branch
  218. #
  219. try:
  220. topology.standalone.add_s(Entry((USER_DN, {
  221. 'objectclass': 'top extensibleObject'.split(),
  222. 'uid': 'user1'
  223. })))
  224. except ldap.LDAPError, e:
  225. log.error('Failed to add user1: error ' + e.message['desc'])
  226. assert False
  227. #
  228. # Enable password policy and add the subtree policy
  229. #
  230. try:
  231. topology.standalone.modify_s(DN_CONFIG, [(ldap.MOD_REPLACE, 'nsslapd-pwpolicy-local', 'on')])
  232. except ldap.LDAPError, e:
  233. log.error('Failed to set pwpolicy-local: error ' + e.message['desc'])
  234. assert False
  235. addSubtreePwPolicy(topology.standalone)
  236. #
  237. # Now check the user has its expected passwordPolicy subentry
  238. #
  239. try:
  240. entries = topology.standalone.search_s(USER_DN,
  241. ldap.SCOPE_BASE,
  242. '(objectclass=top)',
  243. ['pwdpolicysubentry', 'dn'])
  244. if not entries[0].hasAttr('pwdpolicysubentry'):
  245. log.fatal('User does not have expected pwdpolicysubentry!')
  246. assert False
  247. except ldap.LDAPError, e:
  248. log.fatal('Unable to search for entry %s: error %s' % (USER_DN, e.message['desc']))
  249. assert False
  250. #
  251. # Delete the password policy and make sure it is removed from the same user
  252. #
  253. delSubtreePwPolicy(topology.standalone)
  254. try:
  255. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, '(objectclass=top)', ['pwdpolicysubentry'])
  256. if entries[0].hasAttr('pwdpolicysubentry'):
  257. log.fatal('User unexpectedly does have the pwdpolicysubentry!')
  258. assert False
  259. except ldap.LDAPError, e:
  260. log.fatal('Unable to search for entry %s: error %s' % (USER_DN, e.message['desc']))
  261. assert False
  262. #
  263. # Add the subtree policvy back and see if the user now has it
  264. #
  265. addSubtreePwPolicy(topology.standalone)
  266. try:
  267. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, '(objectclass=top)', ['pwdpolicysubentry'])
  268. if not entries[0].hasAttr('pwdpolicysubentry'):
  269. log.fatal('User does not have expected pwdpolicysubentry!')
  270. assert False
  271. except ldap.LDAPError, e:
  272. log.fatal('Unable to search for entry %s: error %s' % (USER_DN, e.message['desc']))
  273. assert False
  274. # If we got here the test passed
  275. log.info('Test PASSED')
  276. def test_ticket47981_final(topology):
  277. topology.standalone.delete()
  278. def run_isolated():
  279. '''
  280. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  281. To run isolated without py.test, you need to
  282. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  283. - set the installation prefix
  284. - run this program
  285. '''
  286. global installation_prefix
  287. installation_prefix = None
  288. topo = topology(True)
  289. test_ticket47981(topo)
  290. test_ticket47981_final(topo)
  291. if __name__ == '__main__':
  292. run_isolated()