ticket47963_test.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. from lib389 import DirSrv, Entry, tools, tasks
  16. from lib389.tools import DirSrvTools
  17. from lib389._constants import *
  18. from lib389.properties import *
  19. from lib389.tasks import *
  20. logging.getLogger(__name__).setLevel(logging.DEBUG)
  21. log = logging.getLogger(__name__)
  22. installation1_prefix = None
  23. class TopologyStandalone(object):
  24. def __init__(self, standalone):
  25. standalone.open()
  26. self.standalone = standalone
  27. @pytest.fixture(scope="module")
  28. def topology(request):
  29. global installation1_prefix
  30. if installation1_prefix:
  31. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  32. # Creating standalone instance ...
  33. standalone = DirSrv(verbose=False)
  34. args_instance[SER_HOST] = HOST_STANDALONE
  35. args_instance[SER_PORT] = PORT_STANDALONE
  36. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  37. args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
  38. args_standalone = args_instance.copy()
  39. standalone.allocate(args_standalone)
  40. instance_standalone = standalone.exists()
  41. if instance_standalone:
  42. standalone.delete()
  43. standalone.create()
  44. standalone.open()
  45. def fin():
  46. standalone.delete()
  47. request.addfinalizer(fin)
  48. return TopologyStandalone(standalone)
  49. def test_ticket47963(topology):
  50. '''
  51. Test that the memberOf plugin works correctly after setting:
  52. memberofskipnested: on
  53. '''
  54. PLUGIN_DN = 'cn=' + PLUGIN_MEMBER_OF + ',cn=plugins,cn=config'
  55. USER_DN = 'uid=test_user,' + DEFAULT_SUFFIX
  56. GROUP_DN1 = 'cn=group1,' + DEFAULT_SUFFIX
  57. GROUP_DN2 = 'cn=group2,' + DEFAULT_SUFFIX
  58. GROUP_DN3 = 'cn=group3,' + DEFAULT_SUFFIX
  59. #
  60. # Enable the plugin and configure the skiop nest attribute, then restart the server
  61. #
  62. topology.standalone.plugins.enable(name=PLUGIN_MEMBER_OF)
  63. try:
  64. topology.standalone.modify_s(PLUGIN_DN, [(ldap.MOD_REPLACE, 'memberofskipnested', 'on')])
  65. except ldap.LDAPError as e:
  66. log.error('test_automember: Failed to modify config entry: error ' + e.message['desc'])
  67. assert False
  68. topology.standalone.restart(timeout=10)
  69. #
  70. # Add our groups, users, memberships, etc
  71. #
  72. try:
  73. topology.standalone.add_s(Entry((USER_DN, {
  74. 'objectclass': 'top extensibleObject'.split(),
  75. 'uid': 'test_user'
  76. })))
  77. except ldap.LDAPError as e:
  78. log.error('Failed to add teset user: error ' + e.message['desc'])
  79. assert False
  80. try:
  81. topology.standalone.add_s(Entry((GROUP_DN1, {
  82. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  83. 'cn': 'group1',
  84. 'member': USER_DN
  85. })))
  86. except ldap.LDAPError as e:
  87. log.error('Failed to add group1: error ' + e.message['desc'])
  88. assert False
  89. try:
  90. topology.standalone.add_s(Entry((GROUP_DN2, {
  91. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  92. 'cn': 'group2',
  93. 'member': USER_DN
  94. })))
  95. except ldap.LDAPError as e:
  96. log.error('Failed to add group2: error ' + e.message['desc'])
  97. assert False
  98. # Add group with no member(yet)
  99. try:
  100. topology.standalone.add_s(Entry((GROUP_DN3, {
  101. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  102. 'cn': 'group'
  103. })))
  104. except ldap.LDAPError as e:
  105. log.error('Failed to add group3: error ' + e.message['desc'])
  106. assert False
  107. time.sleep(1)
  108. #
  109. # Test we have the correct memberOf values in the user entry
  110. #
  111. try:
  112. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN2 + '))')
  113. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  114. if not entries:
  115. log.fatal('User is missing expected memberOf attrs')
  116. assert False
  117. except ldap.LDAPError as e:
  118. log.fatal('Search for user1 failed: ' + e.message['desc'])
  119. assert False
  120. # Add the user to the group
  121. try:
  122. topology.standalone.modify_s(GROUP_DN3, [(ldap.MOD_ADD, 'member', USER_DN)])
  123. except ldap.LDAPError as e:
  124. log.error('Failed to member to group: error ' + e.message['desc'])
  125. assert False
  126. time.sleep(1)
  127. # Check that the test user is a "memberOf" all three groups
  128. try:
  129. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN2 +
  130. ')(memberOf=' + GROUP_DN3 + '))')
  131. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  132. if not entries:
  133. log.fatal('User is missing expected memberOf attrs')
  134. assert False
  135. except ldap.LDAPError as e:
  136. log.fatal('Search for user1 failed: ' + e.message['desc'])
  137. assert False
  138. #
  139. # Delete group2, and check memberOf values in the user entry
  140. #
  141. try:
  142. topology.standalone.delete_s(GROUP_DN2)
  143. except ldap.LDAPError as e:
  144. log.error('Failed to delete test group2: ' + e.message['desc'])
  145. assert False
  146. time.sleep(1)
  147. try:
  148. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN3 + '))')
  149. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  150. if not entries:
  151. log.fatal('User incorrect memberOf attrs')
  152. assert False
  153. except ldap.LDAPError as e:
  154. log.fatal('Search for user1 failed: ' + e.message['desc'])
  155. assert False
  156. log.info('Test complete')
  157. def test_ticket47963_final(topology):
  158. log.info('Testcase PASSED')
  159. def run_isolated():
  160. global installation1_prefix
  161. installation1_prefix = None
  162. topo = topology(True)
  163. test_ticket47963(topo)
  164. test_ticket47963_final(topo)
  165. if __name__ == '__main__':
  166. run_isolated()