ticket47950_test.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. from lib389 import DirSrv, Entry, tools, tasks
  16. from lib389.tools import DirSrvTools
  17. from lib389._constants import *
  18. from lib389.properties import *
  19. from lib389.tasks import *
  20. log = logging.getLogger(__name__)
  21. installation_prefix = None
  22. USER1_DN = "uid=user1,%s" % DEFAULT_SUFFIX
  23. USER2_DN = "uid=user2,%s" % DEFAULT_SUFFIX
  24. class TopologyStandalone(object):
  25. def __init__(self, standalone):
  26. standalone.open()
  27. self.standalone = standalone
  28. @pytest.fixture(scope="module")
  29. def topology(request):
  30. '''
  31. This fixture is used to standalone topology for the 'module'.
  32. '''
  33. global installation_prefix
  34. if installation_prefix:
  35. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  36. standalone = DirSrv(verbose=False)
  37. # Args for the standalone instance
  38. args_instance[SER_HOST] = HOST_STANDALONE
  39. args_instance[SER_PORT] = PORT_STANDALONE
  40. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  41. args_standalone = args_instance.copy()
  42. standalone.allocate(args_standalone)
  43. # Get the status of the instance and restart it if it exists
  44. instance_standalone = standalone.exists()
  45. # Remove the instance
  46. if instance_standalone:
  47. standalone.delete()
  48. # Create the instance
  49. standalone.create()
  50. # Used to retrieve configuration information (dbdir, confdir...)
  51. standalone.open()
  52. def fin():
  53. standalone.delete()
  54. request.addfinalizer(fin)
  55. # Here we have standalone instance up and running
  56. return TopologyStandalone(standalone)
  57. def test_ticket47950(topology):
  58. """
  59. Testing nsslapd-plugin-binddn-tracking does not cause issues around
  60. access control and reconfiguring replication/repl agmt.
  61. """
  62. log.info('Testing Ticket 47950 - Testing nsslapd-plugin-binddn-tracking')
  63. #
  64. # Turn on bind dn tracking
  65. #
  66. try:
  67. topology.standalone.modify_s("cn=config", [(ldap.MOD_REPLACE, 'nsslapd-plugin-binddn-tracking', 'on')])
  68. log.info('nsslapd-plugin-binddn-tracking enabled.')
  69. except ldap.LDAPError as e:
  70. log.error('Failed to enable bind dn tracking: ' + e.message['desc'])
  71. assert False
  72. #
  73. # Add two users
  74. #
  75. try:
  76. topology.standalone.add_s(Entry((USER1_DN, {
  77. 'objectclass': "top person inetuser".split(),
  78. 'userpassword': "password",
  79. 'sn': "1",
  80. 'cn': "user 1"})))
  81. log.info('Added test user %s' % USER1_DN)
  82. except ldap.LDAPError as e:
  83. log.error('Failed to add %s: %s' % (USER1_DN, e.message['desc']))
  84. assert False
  85. try:
  86. topology.standalone.add_s(Entry((USER2_DN, {
  87. 'objectclass': "top person inetuser".split(),
  88. 'sn': "2",
  89. 'cn': "user 2"})))
  90. log.info('Added test user %s' % USER2_DN)
  91. except ldap.LDAPError as e:
  92. log.error('Failed to add user1: ' + e.message['desc'])
  93. assert False
  94. #
  95. # Add an aci
  96. #
  97. try:
  98. acival = '(targetattr ="cn")(version 3.0;acl "Test bind dn tracking"' + \
  99. ';allow (all) (userdn = "ldap:///%s");)' % USER1_DN
  100. topology.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', acival)])
  101. log.info('Added aci')
  102. except ldap.LDAPError as e:
  103. log.error('Failed to add aci: ' + e.message['desc'])
  104. assert False
  105. #
  106. # Make modification as user
  107. #
  108. try:
  109. topology.standalone.simple_bind_s(USER1_DN, "password")
  110. log.info('Bind as user %s successful' % USER1_DN)
  111. except ldap.LDAPError as e:
  112. log.error('Failed to bind as user1: ' + e.message['desc'])
  113. assert False
  114. try:
  115. topology.standalone.modify_s(USER2_DN, [(ldap.MOD_REPLACE, 'cn', 'new value')])
  116. log.info('%s successfully modified user %s' % (USER1_DN, USER2_DN))
  117. except ldap.LDAPError as e:
  118. log.error('Failed to update user2: ' + e.message['desc'])
  119. assert False
  120. #
  121. # Setup replica and create a repl agmt
  122. #
  123. try:
  124. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  125. log.info('Bind as %s successful' % DN_DM)
  126. except ldap.LDAPError as e:
  127. log.error('Failed to bind as rootDN: ' + e.message['desc'])
  128. assert False
  129. try:
  130. topology.standalone.replica.enableReplication(suffix=DEFAULT_SUFFIX, role=REPLICAROLE_MASTER,
  131. replicaId=REPLICAID_MASTER_1)
  132. log.info('Successfully enabled replication.')
  133. except ValueError:
  134. log.error('Failed to enable replication')
  135. assert False
  136. properties = {RA_NAME: r'test plugin internal bind dn',
  137. RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
  138. RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
  139. RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
  140. RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
  141. try:
  142. repl_agreement = topology.standalone.agreement.create(suffix=DEFAULT_SUFFIX, host="127.0.0.1",
  143. port="7777", properties=properties)
  144. log.info('Successfully created replication agreement')
  145. except InvalidArgumentError as e:
  146. log.error('Failed to create replication agreement: ' + e.message['desc'])
  147. assert False
  148. #
  149. # modify replica
  150. #
  151. try:
  152. properties = {REPLICA_ID: "7"}
  153. topology.standalone.replica.setProperties(DEFAULT_SUFFIX, None, None, properties)
  154. log.info('Successfully modified replica')
  155. except ldap.LDAPError as e:
  156. log.error('Failed to update replica config: ' + e.message['desc'])
  157. assert False
  158. #
  159. # modify repl agmt
  160. #
  161. try:
  162. properties = {RA_CONSUMER_PORT: "8888"}
  163. topology.standalone.agreement.setProperties(None, repl_agreement, None, properties)
  164. log.info('Successfully modified replication agreement')
  165. except ValueError:
  166. log.error('Failed to update replica agreement: ' + repl_agreement)
  167. assert False
  168. def test_ticket47953_final(topology):
  169. log.info('Testcase PASSED')
  170. def run_isolated():
  171. '''
  172. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  173. To run isolated without py.test, you need to
  174. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  175. - set the installation prefix
  176. - run this program
  177. '''
  178. global installation_prefix
  179. installation_prefix = None
  180. topo = topology(True)
  181. test_ticket47950(topo)
  182. test_ticket47953_final(topo)
  183. if __name__ == '__main__':
  184. run_isolated()