ticket48366_test.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. from lib389 import DirSrv, Entry, tools
  16. from lib389.tools import DirSrvTools
  17. from lib389._constants import *
  18. from lib389.properties import *
  19. from ldap.controls.simple import ProxyAuthzControl
  20. log = logging.getLogger(__name__)
  21. PROXY_USER_DN = 'cn=proxy,ou=people,%s' % SUFFIX
  22. TEST_USER_DN = 'cn=test,ou=people,%s' % SUFFIX
  23. USER_PW = 'password'
  24. # subtrees used in test
  25. SUBTREE_GREEN = "ou=green,%s" % SUFFIX
  26. SUBTREE_RED = "ou=red,%s" % SUFFIX
  27. SUBTREES = (SUBTREE_GREEN, SUBTREE_RED)
  28. class TopologyStandalone(object):
  29. def __init__(self, standalone):
  30. standalone.open()
  31. self.standalone = standalone
  32. @pytest.fixture(scope="module")
  33. def topology(request):
  34. standalone = DirSrv(verbose=False)
  35. # Args for the standalone instance
  36. args_instance[SER_HOST] = HOST_STANDALONE
  37. args_instance[SER_PORT] = PORT_STANDALONE
  38. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  39. args_standalone = args_instance.copy()
  40. standalone.allocate(args_standalone)
  41. # Get the status of the instance and restart it if it exists
  42. instance_standalone = standalone.exists()
  43. # Remove the instance
  44. if instance_standalone:
  45. standalone.delete()
  46. # Create the instance
  47. standalone.create()
  48. # Used to retrieve configuration information (dbdir, confdir...)
  49. standalone.open()
  50. def fin():
  51. standalone.delete()
  52. request.addfinalizer(fin)
  53. # Here we have standalone instance up and running
  54. return TopologyStandalone(standalone)
  55. def test_ticket48366_init(topology):
  56. """
  57. It creates identical entries in 3 subtrees
  58. It creates aci which allow access to a set of attrs
  59. in two of these subtrees for bound users
  60. It creates a user to be used for test
  61. """
  62. topology.standalone.log.info("Add subtree: %s" % SUBTREE_GREEN)
  63. topology.standalone.add_s(Entry((SUBTREE_GREEN, {
  64. 'objectclass': "top organizationalunit".split(),
  65. 'ou': "green_one"})))
  66. topology.standalone.log.info("Add subtree: %s" % SUBTREE_RED)
  67. topology.standalone.add_s(Entry((SUBTREE_RED, {
  68. 'objectclass': "top organizationalunit".split(),
  69. 'ou': "red"})))
  70. # add proxy user and test user
  71. topology.standalone.log.info("Add %s" % TEST_USER_DN)
  72. topology.standalone.add_s(Entry((TEST_USER_DN, {
  73. 'objectclass': "top person".split(),
  74. 'sn': 'test',
  75. 'cn': 'test',
  76. 'userpassword': USER_PW})))
  77. topology.standalone.log.info("Add %s" % PROXY_USER_DN)
  78. topology.standalone.add_s(Entry((PROXY_USER_DN, {
  79. 'objectclass': "top person".split(),
  80. 'sn': 'proxy',
  81. 'cn': 'proxy',
  82. 'userpassword': USER_PW})))
  83. # enable acl error logging
  84. # mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
  85. # topology.standalone.modify_s(DN_CONFIG, mod)
  86. # get rid of default ACIs
  87. mod = [(ldap.MOD_DELETE, 'aci', None)]
  88. topology.standalone.modify_s(SUFFIX, mod)
  89. # Ok Now add the proper ACIs
  90. ACI_TARGET = "(target = \"ldap:///%s\")" % SUBTREE_GREEN
  91. ACI_TARGETATTR = "(targetattr = \"objectclass || cn || sn || uid || givenname \")"
  92. ACI_ALLOW = "(version 3.0; acl \"Allow search-read to green subtree\"; allow (read, search, compare)"
  93. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % TEST_USER_DN
  94. ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_ALLOW + ACI_SUBJECT
  95. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  96. topology.standalone.modify_s(SUFFIX, mod)
  97. ACI_ALLOW = "(version 3.0; acl \"Allow use pf proxy auth to green subtree\"; allow (proxy)"
  98. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % PROXY_USER_DN
  99. ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_ALLOW + ACI_SUBJECT
  100. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  101. topology.standalone.modify_s(SUFFIX, mod)
  102. log.info("Adding %d test entries...")
  103. for id in range(2):
  104. name = "%s%d" % ('test', id)
  105. mail = "%[email protected]" % name
  106. for subtree in SUBTREES:
  107. topology.standalone.add_s(Entry(("cn=%s,%s" % (name, subtree), {
  108. 'objectclass': "top person organizationalPerson inetOrgPerson".split(),
  109. 'sn': name,
  110. 'cn': name,
  111. 'uid': name,
  112. 'givenname': 'test',
  113. 'mail': mail,
  114. 'description': 'description',
  115. 'employeenumber': "%d" % id,
  116. 'telephonenumber': "%d%d%d" % (id,id,id),
  117. 'mobile': "%d%d%d" % (id,id,id),
  118. 'l': 'MV',
  119. 'title': 'Engineer'})))
  120. def test_ticket48366_search_user(topology):
  121. proxy_ctrl = ProxyAuthzControl(criticality=True, authzId="dn: "+TEST_USER_DN)
  122. # searching as test user should return one entry from the green subtree
  123. topology.standalone.simple_bind_s(TEST_USER_DN, PASSWORD)
  124. ents = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, 'uid=test1')
  125. assert (len(ents) == 1)
  126. # searching as proxy user should return no entry
  127. topology.standalone.simple_bind_s(PROXY_USER_DN, PASSWORD)
  128. ents = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, 'uid=test1')
  129. assert (len(ents) == 0)
  130. # serching as proxy user, authorizing as test user should return 1 entry
  131. ents = topology.standalone.search_ext_s(SUFFIX, ldap.SCOPE_SUBTREE, 'uid=test1', serverctrls=[proxy_ctrl])
  132. assert (len(ents) == 1)
  133. def test_ticket48366_search_dm(topology):
  134. # searching as directory manager should return one entries from both subtrees
  135. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  136. ents = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, 'uid=test1')
  137. assert (len(ents) == 2)
  138. # searching as directory manager proxying test user should return one entry
  139. proxy_ctrl = ProxyAuthzControl(criticality=True, authzId="dn: "+TEST_USER_DN)
  140. ents = topology.standalone.search_ext_s(SUFFIX, ldap.SCOPE_SUBTREE, 'uid=test1', serverctrls=[proxy_ctrl])
  141. assert (len(ents) == 1)
  142. # searching as directory manager proxying proxy user should return no entry
  143. proxy_ctrl = ProxyAuthzControl(criticality=True, authzId="dn: "+PROXY_USER_DN)
  144. ents = topology.standalone.search_ext_s(SUFFIX, ldap.SCOPE_SUBTREE, 'uid=test1', serverctrls=[proxy_ctrl])
  145. assert (len(ents) == 0)
  146. if __name__ == '__main__':
  147. # Run isolated
  148. # -s for DEBUG mode
  149. CURRENT_FILE = os.path.realpath(__file__)
  150. pytest.main("-s %s" % CURRENT_FILE)