ticket47963_test.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import pytest
  7. from lib389 import DirSrv, Entry, tools, tasks
  8. from lib389.tools import DirSrvTools
  9. from lib389._constants import *
  10. from lib389.properties import *
  11. from lib389.tasks import *
  12. logging.getLogger(__name__).setLevel(logging.DEBUG)
  13. log = logging.getLogger(__name__)
  14. installation1_prefix = None
  15. class TopologyStandalone(object):
  16. def __init__(self, standalone):
  17. standalone.open()
  18. self.standalone = standalone
  19. @pytest.fixture(scope="module")
  20. def topology(request):
  21. global installation1_prefix
  22. if installation1_prefix:
  23. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  24. # Creating standalone instance ...
  25. standalone = DirSrv(verbose=False)
  26. args_instance[SER_HOST] = HOST_STANDALONE
  27. args_instance[SER_PORT] = PORT_STANDALONE
  28. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  29. args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
  30. args_standalone = args_instance.copy()
  31. standalone.allocate(args_standalone)
  32. instance_standalone = standalone.exists()
  33. if instance_standalone:
  34. standalone.delete()
  35. standalone.create()
  36. standalone.open()
  37. # Clear out the tmp dir
  38. standalone.clearTmpDir(__file__)
  39. return TopologyStandalone(standalone)
  40. def test_ticket47963(topology):
  41. '''
  42. Test that the memberOf plugin works correctly after setting:
  43. memberofskipnested: on
  44. '''
  45. PLUGIN_DN = 'cn=' + PLUGIN_MEMBER_OF + ',cn=plugins,cn=config'
  46. USER_DN = 'uid=test_user,' + DEFAULT_SUFFIX
  47. GROUP_DN1 = 'cn=group1,' + DEFAULT_SUFFIX
  48. GROUP_DN2 = 'cn=group2,' + DEFAULT_SUFFIX
  49. GROUP_DN3 = 'cn=group3,' + DEFAULT_SUFFIX
  50. #
  51. # Enable the plugin and configure the skiop nest attribute, then restart the server
  52. #
  53. topology.standalone.plugins.enable(name=PLUGIN_MEMBER_OF)
  54. try:
  55. topology.standalone.modify_s(PLUGIN_DN, [(ldap.MOD_REPLACE, 'memberofskipnested', 'on')])
  56. except ldap.LDAPError, e:
  57. log.error('test_automember: Failed to modify config entry: error ' + e.message['desc'])
  58. assert False
  59. topology.standalone.restart(timeout=10)
  60. #
  61. # Add our groups, users, memberships, etc
  62. #
  63. try:
  64. topology.standalone.add_s(Entry((USER_DN, {
  65. 'objectclass': 'top extensibleObject'.split(),
  66. 'uid': 'test_user'
  67. })))
  68. except ldap.LDAPError, e:
  69. log.error('Failed to add teset user: error ' + e.message['desc'])
  70. assert False
  71. try:
  72. topology.standalone.add_s(Entry((GROUP_DN1, {
  73. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  74. 'cn': 'group1',
  75. 'member': USER_DN
  76. })))
  77. except ldap.LDAPError, e:
  78. log.error('Failed to add group1: error ' + e.message['desc'])
  79. assert False
  80. try:
  81. topology.standalone.add_s(Entry((GROUP_DN2, {
  82. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  83. 'cn': 'group2',
  84. 'member': USER_DN
  85. })))
  86. except ldap.LDAPError, e:
  87. log.error('Failed to add group2: error ' + e.message['desc'])
  88. assert False
  89. # Add group with no member(yet)
  90. try:
  91. topology.standalone.add_s(Entry((GROUP_DN3, {
  92. 'objectclass': 'top groupOfNames groupOfUniqueNames extensibleObject'.split(),
  93. 'cn': 'group'
  94. })))
  95. except ldap.LDAPError, e:
  96. log.error('Failed to add group3: error ' + e.message['desc'])
  97. assert False
  98. time.sleep(1)
  99. #
  100. # Test we have the correct memberOf values in the user entry
  101. #
  102. try:
  103. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN2 + '))')
  104. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  105. if not entries:
  106. log.fatal('User is missing expected memberOf attrs')
  107. assert False
  108. except ldap.LDAPError, e:
  109. log.fatal('Search for user1 failed: ' + e.message['desc'])
  110. assert False
  111. # Add the user to the group
  112. try:
  113. topology.standalone.modify_s(GROUP_DN3, [(ldap.MOD_ADD, 'member', USER_DN)])
  114. except ldap.LDAPError, e:
  115. log.error('Failed to member to group: error ' + e.message['desc'])
  116. assert False
  117. time.sleep(1)
  118. # Check that the test user is a "memberOf" all three groups
  119. try:
  120. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN2 +
  121. ')(memberOf=' + GROUP_DN3 + '))')
  122. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  123. if not entries:
  124. log.fatal('User is missing expected memberOf attrs')
  125. assert False
  126. except ldap.LDAPError, e:
  127. log.fatal('Search for user1 failed: ' + e.message['desc'])
  128. assert False
  129. #
  130. # Delete group2, and check memberOf values in the user entry
  131. #
  132. try:
  133. topology.standalone.delete_s(GROUP_DN2)
  134. except ldap.LDAPError, e:
  135. log.error('Failed to delete test group2: ' + e.message['desc'])
  136. assert False
  137. time.sleep(1)
  138. try:
  139. member_filter = ('(&(memberOf=' + GROUP_DN1 + ')(memberOf=' + GROUP_DN3 + '))')
  140. entries = topology.standalone.search_s(USER_DN, ldap.SCOPE_BASE, member_filter)
  141. if not entries:
  142. log.fatal('User incorrect memberOf attrs')
  143. assert False
  144. except ldap.LDAPError, e:
  145. log.fatal('Search for user1 failed: ' + e.message['desc'])
  146. assert False
  147. log.info('Test complete')
  148. def test_ticket47963_final(topology):
  149. topology.standalone.delete()
  150. log.info('Testcase PASSED')
  151. def run_isolated():
  152. global installation1_prefix
  153. installation1_prefix = None
  154. topo = topology(True)
  155. test_ticket47963(topo)
  156. test_ticket47963_final(topo)
  157. if __name__ == '__main__':
  158. run_isolated()