ticket48234_test.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import pytest
  7. from lib389 import DirSrv, Entry, tools, tasks
  8. from lib389.tools import DirSrvTools
  9. from lib389._constants import *
  10. from lib389.properties import *
  11. from lib389.tasks import *
  12. from lib389.utils import *
  13. logging.getLogger(__name__).setLevel(logging.DEBUG)
  14. log = logging.getLogger(__name__)
  15. installation1_prefix = None
  16. class TopologyStandalone(object):
  17. def __init__(self, standalone):
  18. standalone.open()
  19. self.standalone = standalone
  20. @pytest.fixture(scope="module")
  21. def topology(request):
  22. global installation1_prefix
  23. if installation1_prefix:
  24. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  25. # Creating standalone instance ...
  26. standalone = DirSrv(verbose=False)
  27. args_instance[SER_HOST] = HOST_STANDALONE
  28. args_instance[SER_PORT] = PORT_STANDALONE
  29. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  30. args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
  31. args_standalone = args_instance.copy()
  32. standalone.allocate(args_standalone)
  33. instance_standalone = standalone.exists()
  34. if instance_standalone:
  35. standalone.delete()
  36. standalone.create()
  37. standalone.open()
  38. # Delete each instance in the end
  39. def fin():
  40. standalone.delete()
  41. request.addfinalizer(fin)
  42. # Clear out the tmp dir
  43. standalone.clearTmpDir(__file__)
  44. return TopologyStandalone(standalone)
  45. def add_ou_entry(server, name, myparent):
  46. dn = 'ou=%s,%s' % (name, myparent)
  47. server.add_s(Entry((dn, {'objectclass': ['top', 'organizationalunit'],
  48. 'ou': name})))
  49. def add_user_entry(server, name, pw, myparent):
  50. dn = 'cn=%s,%s' % (name, myparent)
  51. server.add_s(Entry((dn, {'objectclass': ['top', 'person'],
  52. 'sn': name,
  53. 'cn': name,
  54. 'telephonenumber': '+1 222 333-4444',
  55. 'userpassword': pw})))
  56. def test_ticket48234(topology):
  57. """
  58. Test aci which contains an extensible filter.
  59. shutdown
  60. """
  61. log.info('Bind as root DN')
  62. try:
  63. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  64. except ldap.LDAPError as e:
  65. topology.standalone.log.error('Root DN failed to authenticate: ' + e.message['desc'])
  66. assert False
  67. ouname = 'outest'
  68. username = 'admin'
  69. passwd = 'Password'
  70. deniedattr = 'telephonenumber'
  71. log.info('Add aci which contains extensible filter.')
  72. aci_text = ('(targetattr = "%s")' % (deniedattr) +
  73. '(target = "ldap:///%s")' % (DEFAULT_SUFFIX) +
  74. '(version 3.0;acl "admin-tel-matching-rule-outest";deny (all)' +
  75. '(userdn = "ldap:///%s??sub?(&(cn=%s)(ou:dn:=%s))");)' % (DEFAULT_SUFFIX, username, ouname))
  76. try:
  77. topology.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', aci_text)])
  78. except ldap.LDAPError as e:
  79. log.error('Failed to add aci: (%s) error %s' % (aci_text, e.message['desc']))
  80. assert False
  81. log.info('Add entries ...')
  82. for idx in range(0, 2):
  83. ou0 = 'OU%d' % idx
  84. log.info('adding %s under %s...' % (ou0, DEFAULT_SUFFIX))
  85. add_ou_entry(topology.standalone, ou0, DEFAULT_SUFFIX)
  86. parent = 'ou=%s,%s' % (ou0, DEFAULT_SUFFIX)
  87. log.info('adding %s under %s...' % (ouname, parent))
  88. add_ou_entry(topology.standalone, ouname, parent)
  89. for idx in range(0, 2):
  90. parent = 'ou=%s,ou=OU%d,%s' % (ouname, idx, DEFAULT_SUFFIX)
  91. log.info('adding %s under %s...' % (username, parent))
  92. add_user_entry(topology.standalone, username, passwd, parent)
  93. binddn = 'cn=%s,%s' % (username, parent)
  94. log.info('Bind as user %s' % binddn)
  95. try:
  96. topology.standalone.simple_bind_s(binddn, passwd)
  97. except ldap.LDAPError as e:
  98. topology.standalone.log.error(bindn + ' failed to authenticate: ' + e.message['desc'])
  99. assert False
  100. filter = '(cn=%s)' % username
  101. try:
  102. entries = topology.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, filter, [deniedattr, 'dn'])
  103. assert 2 == len(entries)
  104. for idx in range(0, 1):
  105. if entries[idx].hasAttr(deniedattr):
  106. log.fatal('aci with extensible filter failed -- %s')
  107. assert False
  108. except ldap.LDAPError as e:
  109. topology.standalone.log.error('Search (%s, %s) failed: ' % (DEFAULT_SUFFIX, filter) + e.message['desc'])
  110. assert False
  111. log.info('Test complete')
  112. if __name__ == '__main__':
  113. # Run isolated
  114. # -s for DEBUG mode
  115. CURRENT_FILE = os.path.realpath(__file__)
  116. pytest.main("-s %s" % CURRENT_FILE)