1
0

ticket47950_test.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. from lib389 import DirSrv, Entry, tools, tasks
  16. from lib389.tools import DirSrvTools
  17. from lib389._constants import *
  18. from lib389.properties import *
  19. from lib389.tasks import *
  20. log = logging.getLogger(__name__)
  21. installation_prefix = None
  22. USER1_DN = "uid=user1,%s" % DEFAULT_SUFFIX
  23. USER2_DN = "uid=user2,%s" % DEFAULT_SUFFIX
  24. class TopologyStandalone(object):
  25. def __init__(self, standalone):
  26. standalone.open()
  27. self.standalone = standalone
  28. @pytest.fixture(scope="module")
  29. def topology(request):
  30. '''
  31. This fixture is used to standalone topology for the 'module'.
  32. '''
  33. global installation_prefix
  34. if installation_prefix:
  35. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  36. standalone = DirSrv(verbose=False)
  37. # Args for the standalone instance
  38. args_instance[SER_HOST] = HOST_STANDALONE
  39. args_instance[SER_PORT] = PORT_STANDALONE
  40. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  41. args_standalone = args_instance.copy()
  42. standalone.allocate(args_standalone)
  43. # Get the status of the instance and restart it if it exists
  44. instance_standalone = standalone.exists()
  45. # Remove the instance
  46. if instance_standalone:
  47. standalone.delete()
  48. # Create the instance
  49. standalone.create()
  50. # Used to retrieve configuration information (dbdir, confdir...)
  51. standalone.open()
  52. # clear the tmp directory
  53. standalone.clearTmpDir(__file__)
  54. # Here we have standalone instance up and running
  55. return TopologyStandalone(standalone)
  56. def test_ticket47950(topology):
  57. """
  58. Testing nsslapd-plugin-binddn-tracking does not cause issues around
  59. access control and reconfiguring replication/repl agmt.
  60. """
  61. log.info('Testing Ticket 47950 - Testing nsslapd-plugin-binddn-tracking')
  62. #
  63. # Turn on bind dn tracking
  64. #
  65. try:
  66. topology.standalone.modify_s("cn=config", [(ldap.MOD_REPLACE, 'nsslapd-plugin-binddn-tracking', 'on')])
  67. log.info('nsslapd-plugin-binddn-tracking enabled.')
  68. except ldap.LDAPError as e:
  69. log.error('Failed to enable bind dn tracking: ' + e.message['desc'])
  70. assert False
  71. #
  72. # Add two users
  73. #
  74. try:
  75. topology.standalone.add_s(Entry((USER1_DN, {
  76. 'objectclass': "top person inetuser".split(),
  77. 'userpassword': "password",
  78. 'sn': "1",
  79. 'cn': "user 1"})))
  80. log.info('Added test user %s' % USER1_DN)
  81. except ldap.LDAPError as e:
  82. log.error('Failed to add %s: %s' % (USER1_DN, e.message['desc']))
  83. assert False
  84. try:
  85. topology.standalone.add_s(Entry((USER2_DN, {
  86. 'objectclass': "top person inetuser".split(),
  87. 'sn': "2",
  88. 'cn': "user 2"})))
  89. log.info('Added test user %s' % USER2_DN)
  90. except ldap.LDAPError as e:
  91. log.error('Failed to add user1: ' + e.message['desc'])
  92. assert False
  93. #
  94. # Add an aci
  95. #
  96. try:
  97. acival = '(targetattr ="cn")(version 3.0;acl "Test bind dn tracking"' + \
  98. ';allow (all) (userdn = "ldap:///%s");)' % USER1_DN
  99. topology.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', acival)])
  100. log.info('Added aci')
  101. except ldap.LDAPError as e:
  102. log.error('Failed to add aci: ' + e.message['desc'])
  103. assert False
  104. #
  105. # Make modification as user
  106. #
  107. try:
  108. topology.standalone.simple_bind_s(USER1_DN, "password")
  109. log.info('Bind as user %s successful' % USER1_DN)
  110. except ldap.LDAPError as e:
  111. log.error('Failed to bind as user1: ' + e.message['desc'])
  112. assert False
  113. try:
  114. topology.standalone.modify_s(USER2_DN, [(ldap.MOD_REPLACE, 'cn', 'new value')])
  115. log.info('%s successfully modified user %s' % (USER1_DN, USER2_DN))
  116. except ldap.LDAPError as e:
  117. log.error('Failed to update user2: ' + e.message['desc'])
  118. assert False
  119. #
  120. # Setup replica and create a repl agmt
  121. #
  122. try:
  123. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  124. log.info('Bind as %s successful' % DN_DM)
  125. except ldap.LDAPError as e:
  126. log.error('Failed to bind as rootDN: ' + e.message['desc'])
  127. assert False
  128. try:
  129. topology.standalone.replica.enableReplication(suffix=DEFAULT_SUFFIX, role=REPLICAROLE_MASTER,
  130. replicaId=REPLICAID_MASTER_1)
  131. log.info('Successfully enabled replication.')
  132. except ValueError:
  133. log.error('Failed to enable replication')
  134. assert False
  135. properties = {RA_NAME: r'test plugin internal bind dn',
  136. RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
  137. RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
  138. RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
  139. RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
  140. try:
  141. repl_agreement = topology.standalone.agreement.create(suffix=DEFAULT_SUFFIX, host="127.0.0.1",
  142. port="7777", properties=properties)
  143. log.info('Successfully created replication agreement')
  144. except InvalidArgumentError as e:
  145. log.error('Failed to create replication agreement: ' + e.message['desc'])
  146. assert False
  147. #
  148. # modify replica
  149. #
  150. try:
  151. properties = {REPLICA_ID: "7"}
  152. topology.standalone.replica.setProperties(DEFAULT_SUFFIX, None, None, properties)
  153. log.info('Successfully modified replica')
  154. except ldap.LDAPError as e:
  155. log.error('Failed to update replica config: ' + e.message['desc'])
  156. assert False
  157. #
  158. # modify repl agmt
  159. #
  160. try:
  161. properties = {RA_CONSUMER_PORT: "8888"}
  162. topology.standalone.agreement.setProperties(None, repl_agreement, None, properties)
  163. log.info('Successfully modified replication agreement')
  164. except ValueError:
  165. log.error('Failed to update replica agreement: ' + repl_agreement)
  166. assert False
  167. def test_ticket47953_final(topology):
  168. topology.standalone.delete()
  169. log.info('Testcase PASSED')
  170. def run_isolated():
  171. '''
  172. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  173. To run isolated without py.test, you need to
  174. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  175. - set the installation prefix
  176. - run this program
  177. '''
  178. global installation_prefix
  179. installation_prefix = None
  180. topo = topology(True)
  181. test_ticket47950(topo)
  182. test_ticket47953_final(topo)
  183. if __name__ == '__main__':
  184. run_isolated()