ticket47950_test.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import pytest
  7. from lib389 import DirSrv, Entry, tools, tasks
  8. from lib389.tools import DirSrvTools
  9. from lib389._constants import *
  10. from lib389.properties import *
  11. from lib389.tasks import *
  12. log = logging.getLogger(__name__)
  13. installation_prefix = None
  14. USER1_DN = "uid=user1,%s" % DEFAULT_SUFFIX
  15. USER2_DN = "uid=user2,%s" % DEFAULT_SUFFIX
  16. class TopologyStandalone(object):
  17. def __init__(self, standalone):
  18. standalone.open()
  19. self.standalone = standalone
  20. @pytest.fixture(scope="module")
  21. def topology(request):
  22. '''
  23. This fixture is used to standalone topology for the 'module'.
  24. '''
  25. global installation_prefix
  26. if installation_prefix:
  27. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  28. standalone = DirSrv(verbose=False)
  29. # Args for the standalone instance
  30. args_instance[SER_HOST] = HOST_STANDALONE
  31. args_instance[SER_PORT] = PORT_STANDALONE
  32. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  33. args_standalone = args_instance.copy()
  34. standalone.allocate(args_standalone)
  35. # Get the status of the instance and restart it if it exists
  36. instance_standalone = standalone.exists()
  37. # Remove the instance
  38. if instance_standalone:
  39. standalone.delete()
  40. # Create the instance
  41. standalone.create()
  42. # Used to retrieve configuration information (dbdir, confdir...)
  43. standalone.open()
  44. # clear the tmp directory
  45. standalone.clearTmpDir(__file__)
  46. # Here we have standalone instance up and running
  47. return TopologyStandalone(standalone)
  48. def test_ticket47950(topology):
  49. """
  50. Testing nsslapd-plugin-binddn-tracking does not cause issues around
  51. access control and reconfiguring replication/repl agmt.
  52. """
  53. log.info('Testing Ticket 47950 - Testing nsslapd-plugin-binddn-tracking')
  54. #
  55. # Turn on bind dn tracking
  56. #
  57. try:
  58. topology.standalone.modify_s("cn=config", [(ldap.MOD_REPLACE, 'nsslapd-plugin-binddn-tracking', 'on')])
  59. log.info('nsslapd-plugin-binddn-tracking enabled.')
  60. except ldap.LDAPError, e:
  61. log.error('Failed to enable bind dn tracking: ' + e.message['desc'])
  62. assert False
  63. #
  64. # Add two users
  65. #
  66. try:
  67. topology.standalone.add_s(Entry((USER1_DN, {
  68. 'objectclass': "top person inetuser".split(),
  69. 'userpassword': "password",
  70. 'sn': "1",
  71. 'cn': "user 1"})))
  72. log.info('Added test user %s' % USER1_DN)
  73. except ldap.LDAPError, e:
  74. log.error('Failed to add %s: %s' % (USER1_DN, e.message['desc']))
  75. assert False
  76. try:
  77. topology.standalone.add_s(Entry((USER2_DN, {
  78. 'objectclass': "top person inetuser".split(),
  79. 'sn': "2",
  80. 'cn': "user 2"})))
  81. log.info('Added test user %s' % USER2_DN)
  82. except ldap.LDAPError, e:
  83. log.error('Failed to add user1: ' + e.message['desc'])
  84. assert False
  85. #
  86. # Add an aci
  87. #
  88. try:
  89. acival = '(targetattr ="cn")(version 3.0;acl "Test bind dn tracking"' + \
  90. ';allow (all) (userdn = "ldap:///%s");)' % USER1_DN
  91. topology.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', acival)])
  92. log.info('Added aci')
  93. except ldap.LDAPError, e:
  94. log.error('Failed to add aci: ' + e.message['desc'])
  95. assert False
  96. #
  97. # Make modification as user
  98. #
  99. try:
  100. topology.standalone.simple_bind_s(USER1_DN, "password")
  101. log.info('Bind as user %s successful' % USER1_DN)
  102. except ldap.LDAPError, e:
  103. log.error('Failed to bind as user1: ' + e.message['desc'])
  104. assert False
  105. try:
  106. topology.standalone.modify_s(USER2_DN, [(ldap.MOD_REPLACE, 'cn', 'new value')])
  107. log.info('%s successfully modified user %s' % (USER1_DN, USER2_DN))
  108. except ldap.LDAPError, e:
  109. log.error('Failed to update user2: ' + e.message['desc'])
  110. assert False
  111. #
  112. # Setup replica and create a repl agmt
  113. #
  114. try:
  115. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  116. log.info('Bind as %s successful' % DN_DM)
  117. except ldap.LDAPError, e:
  118. log.error('Failed to bind as rootDN: ' + e.message['desc'])
  119. assert False
  120. try:
  121. topology.standalone.replica.enableReplication(suffix=DEFAULT_SUFFIX, role=REPLICAROLE_MASTER,
  122. replicaId=REPLICAID_MASTER_1)
  123. log.info('Successfully enabled replication.')
  124. except ValueError:
  125. log.error('Failed to enable replication')
  126. assert False
  127. properties = {RA_NAME: r'test plugin internal bind dn',
  128. RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
  129. RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
  130. RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
  131. RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
  132. try:
  133. repl_agreement = topology.standalone.agreement.create(suffix=DEFAULT_SUFFIX, host="127.0.0.1",
  134. port="7777", properties=properties)
  135. log.info('Successfully created replication agreement')
  136. except InvalidArgumentError, e:
  137. log.error('Failed to create replication agreement: ' + e.message['desc'])
  138. assert False
  139. #
  140. # modify replica
  141. #
  142. try:
  143. properties = {REPLICA_ID: "7"}
  144. topology.standalone.replica.setProperties(DEFAULT_SUFFIX, None, None, properties)
  145. log.info('Successfully modified replica')
  146. except ldap.LDAPError, e:
  147. log.error('Failed to update replica config: ' + e.message['desc'])
  148. assert False
  149. #
  150. # modify repl agmt
  151. #
  152. try:
  153. properties = {RA_CONSUMER_PORT: "8888"}
  154. topology.standalone.agreement.setProperties(None, repl_agreement, None, properties)
  155. log.info('Successfully modified replication agreement')
  156. except ValueError:
  157. log.error('Failed to update replica agreement: ' + repl_agreement)
  158. assert False
  159. def test_ticket47953_final(topology):
  160. topology.standalone.delete()
  161. log.info('Testcase PASSED')
  162. def run_isolated():
  163. '''
  164. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  165. To run isolated without py.test, you need to
  166. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  167. - set the installation prefix
  168. - run this program
  169. '''
  170. global installation_prefix
  171. installation_prefix = None
  172. topo = topology(True)
  173. test_ticket47950(topo)
  174. test_ticket47953_final(topo)
  175. if __name__ == '__main__':
  176. run_isolated()