1
0

ticket47963_test.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. from lib389 import DirSrv, Entry, tools, tasks
  16. from lib389.tools import DirSrvTools
  17. from lib389._constants import *
  18. from lib389.properties import *
  19. from lib389.tasks import *
  20. logging.getLogger(__name__).setLevel(logging.DEBUG)
  21. log = logging.getLogger(__name__)
  22. installation1_prefix = None
  23. class TopologyStandalone(object):
  24. def __init__(self, standalone):
  25. standalone.open()
  26. self.standalone = standalone
  27. @pytest.fixture(scope="module")
  28. def topology(request):
  29. global installation1_prefix
  30. if installation1_prefix:
  31. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  32. # Creating standalone instance ...
  33. standalone = DirSrv(verbose=False)
  34. args_instance[SER_HOST] = HOST_STANDALONE
  35. args_instance[SER_PORT] = PORT_STANDALONE
  36. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  37. args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
  38. args_standalone = args_instance.copy()
  39. standalone.allocate(args_standalone)
  40. instance_standalone = standalone.exists()
  41. if instance_standalone:
  42. standalone.delete()
  43. standalone.create()
  44. standalone.open()
  45. # Clear out the tmp dir
  46. standalone.clearTmpDir(__file__)
  47. return TopologyStandalone(standalone)
  48. def test_ticket47963(topology):
  49. '''
  50. Test that the memberOf plugin works correctly after setting:
  51. memberofskipnested: on
  52. '''
  53. PLUGIN_DN = 'cn=' + PLUGIN_MEMBER_OF + ',cn=plugins,cn=config'
  54. USER_DN = 'uid=test_user,' + DEFAULT_SUFFIX
  55. GROUP_DN1 = 'cn=group1,' + DEFAULT_SUFFIX
  56. GROUP_DN2 = 'cn=group2,' + DEFAULT_SUFFIX
  57. GROUP_DN3 = 'cn=group3,' + DEFAULT_SUFFIX
  58. #
  59. # Enable the plugin and configure the skiop nest attribute, then restart the server
  60. #
  61. topology.standalone.plugins.enable(name=PLUGIN_MEMBER_OF)
  62. try:
  63. topology.standalone.modify_s(PLUGIN_DN, [(ldap.MOD_REPLACE, 'memberofskipnested', 'on')])
  64. except ldap.LDAPError as e:
  65. log.error('test_automember: Failed to modify config entry: error ' + e.message['desc'])
  66. assert False
  67. topology.standalone.restart(timeout=10)
  68. #
  69. # Add our groups, users, memberships, etc
  70. #
  71. try:
  72. topology.standalone.add_s(Entry((USER_DN, {
  73. 'objectclass': 'top extensibleObject'.split(),
  74. 'uid': 'test_user'
  75. })))
  76. except ldap.LDAPError as e:
  77. log.error('Failed to add teset user: error ' + e.message['desc'])
  78. assert False
  79. try:
  80. topology.standalone.add_s(Entry((GROUP_DN1, {
  81. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  82. 'cn': 'group1',
  83. 'member': USER_DN
  84. })))
  85. except ldap.LDAPError as e:
  86. log.error('Failed to add group1: error ' + e.message['desc'])
  87. assert False
  88. try:
  89. topology.standalone.add_s(Entry((GROUP_DN2, {
  90. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  91. 'cn': 'group2',
  92. 'member': USER_DN
  93. })))
  94. except ldap.LDAPError as e:
  95. log.error('Failed to add group2: error ' + e.message['desc'])
  96. assert False
  97. # Add group with no member(yet)
  98. try:
  99. topology.standalone.add_s(Entry((GROUP_DN3, {
  100. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  101. 'cn': 'group'
  102. })))
  103. except ldap.LDAPError as e:
  104. log.error('Failed to add group3: error ' + e.message['desc'])
  105. assert False
  106. time.sleep(1)
  107. #
  108. # Test we have the correct memberOf values in the user entry
  109. #
  110. try:
  111. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN2 + '))')
  112. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  113. if not entries:
  114. log.fatal('User is missing expected memberOf attrs')
  115. assert False
  116. except ldap.LDAPError as e:
  117. log.fatal('Search for user1 failed: ' + e.message['desc'])
  118. assert False
  119. # Add the user to the group
  120. try:
  121. topology.standalone.modify_s(GROUP_DN3, [(ldap.MOD_ADD, 'member', USER_DN)])
  122. except ldap.LDAPError as e:
  123. log.error('Failed to member to group: error ' + e.message['desc'])
  124. assert False
  125. time.sleep(1)
  126. # Check that the test user is a "memberOf" all three groups
  127. try:
  128. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN2 +
  129. ')(memberOf=' + GROUP_DN3 + '))')
  130. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  131. if not entries:
  132. log.fatal('User is missing expected memberOf attrs')
  133. assert False
  134. except ldap.LDAPError as e:
  135. log.fatal('Search for user1 failed: ' + e.message['desc'])
  136. assert False
  137. #
  138. # Delete group2, and check memberOf values in the user entry
  139. #
  140. try:
  141. topology.standalone.delete_s(GROUP_DN2)
  142. except ldap.LDAPError as e:
  143. log.error('Failed to delete test group2: ' + e.message['desc'])
  144. assert False
  145. time.sleep(1)
  146. try:
  147. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN3 + '))')
  148. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  149. if not entries:
  150. log.fatal('User incorrect memberOf attrs')
  151. assert False
  152. except ldap.LDAPError as e:
  153. log.fatal('Search for user1 failed: ' + e.message['desc'])
  154. assert False
  155. log.info('Test complete')
  156. def test_ticket47963_final(topology):
  157. topology.standalone.delete()
  158. log.info('Testcase PASSED')
  159. def run_isolated():
  160. global installation1_prefix
  161. installation1_prefix = None
  162. topo = topology(True)
  163. test_ticket47963(topo)
  164. test_ticket47963_final(topo)
  165. if __name__ == '__main__':
  166. run_isolated()