ticket47653_test.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import socket
  7. import time
  8. import logging
  9. import pytest
  10. from lib389 import DirSrv, Entry, tools
  11. from lib389.tools import DirSrvTools
  12. from lib389._constants import *
  13. from lib389.properties import *
  14. from constants import *
  15. log = logging.getLogger(__name__)
  16. installation_prefix = None
  17. OC_NAME = 'OCticket47653'
  18. MUST = "(postalAddress $ postalCode)"
  19. MAY = "(member $ street)"
  20. OTHER_NAME = 'other_entry'
  21. MAX_OTHERS = 10
  22. BIND_NAME = 'bind_entry'
  23. BIND_DN = 'cn=%s, %s' % (BIND_NAME, SUFFIX)
  24. BIND_PW = 'password'
  25. ENTRY_NAME = 'test_entry'
  26. ENTRY_DN = 'cn=%s, %s' % (ENTRY_NAME, SUFFIX)
  27. ENTRY_OC = "top person %s" % OC_NAME
  28. def _oc_definition(oid_ext, name, must=None, may=None):
  29. oid = "1.2.3.4.5.6.7.8.9.10.%d" % oid_ext
  30. desc = 'To test ticket 47490'
  31. sup = 'person'
  32. if not must:
  33. must = MUST
  34. if not may:
  35. may = MAY
  36. new_oc = "( %s NAME '%s' DESC '%s' SUP %s AUXILIARY MUST %s MAY %s )" % (oid, name, desc, sup, must, may)
  37. return new_oc
  38. class TopologyStandalone(object):
  39. def __init__(self, standalone):
  40. standalone.open()
  41. self.standalone = standalone
  42. @pytest.fixture(scope="module")
  43. def topology(request):
  44. '''
  45. This fixture is used to standalone topology for the 'module'.
  46. At the beginning, It may exists a standalone instance.
  47. It may also exists a backup for the standalone instance.
  48. Principle:
  49. If standalone instance exists:
  50. restart it
  51. If backup of standalone exists:
  52. create/rebind to standalone
  53. restore standalone instance from backup
  54. else:
  55. Cleanup everything
  56. remove instance
  57. remove backup
  58. Create instance
  59. Create backup
  60. '''
  61. global installation_prefix
  62. if installation_prefix:
  63. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  64. standalone = DirSrv(verbose=False)
  65. # Args for the standalone instance
  66. args_instance[SER_HOST] = HOST_STANDALONE
  67. args_instance[SER_PORT] = PORT_STANDALONE
  68. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  69. args_standalone = args_instance.copy()
  70. standalone.allocate(args_standalone)
  71. # Get the status of the backups
  72. backup_standalone = standalone.checkBackupFS()
  73. # Get the status of the instance and restart it if it exists
  74. instance_standalone = standalone.exists()
  75. if instance_standalone:
  76. # assuming the instance is already stopped, just wait 5 sec max
  77. standalone.stop(timeout=5)
  78. standalone.start(timeout=10)
  79. if backup_standalone:
  80. # The backup exist, assuming it is correct
  81. # we just re-init the instance with it
  82. if not instance_standalone:
  83. standalone.create()
  84. # Used to retrieve configuration information (dbdir, confdir...)
  85. standalone.open()
  86. # restore standalone instance from backup
  87. standalone.stop(timeout=10)
  88. standalone.restoreFS(backup_standalone)
  89. standalone.start(timeout=10)
  90. else:
  91. # We should be here only in two conditions
  92. # - This is the first time a test involve standalone instance
  93. # - Something weird happened (instance/backup destroyed)
  94. # so we discard everything and recreate all
  95. # Remove the backup. So even if we have a specific backup file
  96. # (e.g backup_standalone) we clear backup that an instance may have created
  97. if backup_standalone:
  98. standalone.clearBackupFS()
  99. # Remove the instance
  100. if instance_standalone:
  101. standalone.delete()
  102. # Create the instance
  103. standalone.create()
  104. # Used to retrieve configuration information (dbdir, confdir...)
  105. standalone.open()
  106. # Time to create the backups
  107. standalone.stop(timeout=10)
  108. standalone.backupfile = standalone.backupFS()
  109. standalone.start(timeout=10)
  110. #
  111. # Here we have standalone instance up and running
  112. # Either coming from a backup recovery
  113. # or from a fresh (re)init
  114. # Time to return the topology
  115. return TopologyStandalone(standalone)
  116. def test_ticket47653_init(topology):
  117. """
  118. It adds
  119. - Objectclass with MAY 'member'
  120. - an entry ('bind_entry') with which we bind to test the 'SELFDN' operation
  121. It deletes the anonymous aci
  122. """
  123. topology.standalone.log.info("Add %s that allows 'member' attribute" % OC_NAME)
  124. new_oc = _oc_definition(2, OC_NAME, must = MUST, may = MAY)
  125. topology.standalone.schema.add_schema('objectClasses', new_oc)
  126. # entry used to bind with
  127. topology.standalone.log.info("Add %s" % BIND_DN)
  128. topology.standalone.add_s(Entry((BIND_DN, {
  129. 'objectclass': "top person".split(),
  130. 'sn': BIND_NAME,
  131. 'cn': BIND_NAME,
  132. 'userpassword': BIND_PW})))
  133. # enable acl error logging
  134. mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
  135. topology.standalone.modify_s(DN_CONFIG, mod)
  136. # get read of anonymous ACI for use 'read-search' aci in SEARCH test
  137. ACI_ANONYMOUS = "(targetattr!=\"userPassword\")(version 3.0; acl \"Enable anonymous access\"; allow (read, search, compare) userdn=\"ldap:///anyone\";)"
  138. mod = [(ldap.MOD_DELETE, 'aci', ACI_ANONYMOUS)]
  139. topology.standalone.modify_s(SUFFIX, mod)
  140. # add dummy entries
  141. for cpt in range(MAX_OTHERS):
  142. name = "%s%d" % (OTHER_NAME, cpt)
  143. topology.standalone.add_s(Entry(("cn=%s,%s" % (name, SUFFIX), {
  144. 'objectclass': "top person".split(),
  145. 'sn': name,
  146. 'cn': name})))
  147. def test_ticket47653_add(topology):
  148. '''
  149. It checks that, bound as bind_entry,
  150. - we can not ADD an entry without the proper SELFDN aci.
  151. - with the proper ACI we can not ADD with 'member' attribute
  152. - with the proper ACI and 'member' it succeeds to ADD
  153. '''
  154. topology.standalone.log.info("\n\n######################### ADD ######################\n")
  155. # bind as bind_entry
  156. topology.standalone.log.info("Bind as %s" % BIND_DN)
  157. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  158. # Prepare the entry with multivalued members
  159. entry_with_members = Entry(ENTRY_DN)
  160. entry_with_members.setValues('objectclass', 'top', 'person', 'OCticket47653')
  161. entry_with_members.setValues('sn', ENTRY_NAME)
  162. entry_with_members.setValues('cn', ENTRY_NAME)
  163. entry_with_members.setValues('postalAddress', 'here')
  164. entry_with_members.setValues('postalCode', '1234')
  165. members = []
  166. for cpt in range(MAX_OTHERS):
  167. name = "%s%d" % (OTHER_NAME, cpt)
  168. members.append("cn=%s,%s" % (name, SUFFIX))
  169. members.append(BIND_DN)
  170. entry_with_members.setValues('member', members)
  171. # Prepare the entry with one member
  172. entry_with_member = Entry(ENTRY_DN)
  173. entry_with_member.setValues('objectclass', 'top', 'person', 'OCticket47653')
  174. entry_with_member.setValues('sn', ENTRY_NAME)
  175. entry_with_member.setValues('cn', ENTRY_NAME)
  176. entry_with_member.setValues('postalAddress', 'here')
  177. entry_with_member.setValues('postalCode', '1234')
  178. member = []
  179. member.append(BIND_DN)
  180. entry_with_member.setValues('member', member)
  181. # entry to add WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
  182. try:
  183. topology.standalone.log.info("Try to add Add %s (aci is missing): %r" % (ENTRY_DN, entry_with_member))
  184. topology.standalone.add_s(entry_with_member)
  185. except Exception as e:
  186. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  187. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  188. # Ok Now add the proper ACI
  189. topology.standalone.log.info("Bind as %s and add the ADD SELFDN aci" % DN_DM)
  190. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  191. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  192. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  193. ACI_ALLOW = "(version 3.0; acl \"SelfDN add\"; allow (add)"
  194. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  195. ACI_BODY = ACI_TARGET + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  196. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  197. topology.standalone.modify_s(SUFFIX, mod)
  198. # bind as bind_entry
  199. topology.standalone.log.info("Bind as %s" % BIND_DN)
  200. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  201. # entry to add WITHOUT member and WITH the ACI -> ldap.INSUFFICIENT_ACCESS
  202. try:
  203. topology.standalone.log.info("Try to add Add %s (member is missing)" % ENTRY_DN)
  204. topology.standalone.add_s(Entry((ENTRY_DN, {
  205. 'objectclass': ENTRY_OC.split(),
  206. 'sn': ENTRY_NAME,
  207. 'cn': ENTRY_NAME,
  208. 'postalAddress': 'here',
  209. 'postalCode': '1234'})))
  210. except Exception as e:
  211. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  212. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  213. # entry to add WITH memberS and WITH the ACI -> ldap.INSUFFICIENT_ACCESS
  214. # member should contain only one value
  215. try:
  216. topology.standalone.log.info("Try to add Add %s (with several member values)" % ENTRY_DN)
  217. topology.standalone.add_s(entry_with_members)
  218. except Exception as e:
  219. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  220. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  221. topology.standalone.log.info("Try to add Add %s should be successful" % ENTRY_DN)
  222. topology.standalone.add_s(entry_with_member)
  223. def test_ticket47653_search(topology):
  224. '''
  225. It checks that, bound as bind_entry,
  226. - we can not search an entry without the proper SELFDN aci.
  227. - adding the ACI, we can search the entry
  228. '''
  229. topology.standalone.log.info("\n\n######################### SEARCH ######################\n")
  230. # bind as bind_entry
  231. topology.standalone.log.info("Bind as %s" % BIND_DN)
  232. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  233. # entry to search WITH member being BIND_DN but WITHOUT the ACI -> no entry returned
  234. topology.standalone.log.info("Try to search %s (aci is missing)" % ENTRY_DN)
  235. ents = topology.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
  236. assert len(ents) == 0
  237. # Ok Now add the proper ACI
  238. topology.standalone.log.info("Bind as %s and add the READ/SEARCH SELFDN aci" % DN_DM)
  239. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  240. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  241. ACI_TARGETATTR = "(targetattr = *)"
  242. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  243. ACI_ALLOW = "(version 3.0; acl \"SelfDN search-read\"; allow (read, search, compare)"
  244. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  245. ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  246. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  247. topology.standalone.modify_s(SUFFIX, mod)
  248. # bind as bind_entry
  249. topology.standalone.log.info("Bind as %s" % BIND_DN)
  250. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  251. # entry to search with the proper aci
  252. topology.standalone.log.info("Try to search %s should be successful" % ENTRY_DN)
  253. ents = topology.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
  254. assert len(ents) == 1
  255. def test_ticket47653_modify(topology):
  256. '''
  257. It checks that, bound as bind_entry,
  258. - we can not modify an entry without the proper SELFDN aci.
  259. - adding the ACI, we can modify the entry
  260. '''
  261. # bind as bind_entry
  262. topology.standalone.log.info("Bind as %s" % BIND_DN)
  263. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  264. topology.standalone.log.info("\n\n######################### MODIFY ######################\n")
  265. # entry to modify WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
  266. try:
  267. topology.standalone.log.info("Try to modify %s (aci is missing)" % ENTRY_DN)
  268. mod = [(ldap.MOD_REPLACE, 'postalCode', '9876')]
  269. topology.standalone.modify_s(ENTRY_DN, mod)
  270. except Exception as e:
  271. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  272. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  273. # Ok Now add the proper ACI
  274. topology.standalone.log.info("Bind as %s and add the WRITE SELFDN aci" % DN_DM)
  275. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  276. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  277. ACI_TARGETATTR = "(targetattr = *)"
  278. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  279. ACI_ALLOW = "(version 3.0; acl \"SelfDN write\"; allow (write)"
  280. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  281. ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  282. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  283. topology.standalone.modify_s(SUFFIX, mod)
  284. # bind as bind_entry
  285. topology.standalone.log.info("Bind as %s" % BIND_DN)
  286. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  287. # modify the entry and checks the value
  288. topology.standalone.log.info("Try to modify %s. It should succeeds" % ENTRY_DN)
  289. mod = [(ldap.MOD_REPLACE, 'postalCode', '1928')]
  290. topology.standalone.modify_s(ENTRY_DN, mod)
  291. ents = topology.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
  292. assert len(ents) == 1
  293. assert ents[0].postalCode == '1928'
  294. def test_ticket47653_delete(topology):
  295. '''
  296. It checks that, bound as bind_entry,
  297. - we can not delete an entry without the proper SELFDN aci.
  298. - adding the ACI, we can delete the entry
  299. '''
  300. topology.standalone.log.info("\n\n######################### DELETE ######################\n")
  301. # bind as bind_entry
  302. topology.standalone.log.info("Bind as %s" % BIND_DN)
  303. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  304. # entry to delete WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
  305. try:
  306. topology.standalone.log.info("Try to delete %s (aci is missing)" % ENTRY_DN)
  307. topology.standalone.delete_s(ENTRY_DN)
  308. except Exception as e:
  309. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  310. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  311. # Ok Now add the proper ACI
  312. topology.standalone.log.info("Bind as %s and add the READ/SEARCH SELFDN aci" % DN_DM)
  313. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  314. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  315. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  316. ACI_ALLOW = "(version 3.0; acl \"SelfDN delete\"; allow (delete)"
  317. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  318. ACI_BODY = ACI_TARGET + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  319. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  320. topology.standalone.modify_s(SUFFIX, mod)
  321. # bind as bind_entry
  322. topology.standalone.log.info("Bind as %s" % BIND_DN)
  323. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  324. # entry to search with the proper aci
  325. topology.standalone.log.info("Try to delete %s should be successful" % ENTRY_DN)
  326. topology.standalone.delete_s(ENTRY_DN)
  327. def test_ticket47653_final(topology):
  328. topology.standalone.stop(timeout=10)
  329. def run_isolated():
  330. '''
  331. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  332. To run isolated without py.test, you need to
  333. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  334. - set the installation prefix
  335. - run this program
  336. '''
  337. global installation_prefix
  338. installation_prefix = None
  339. topo = topology(True)
  340. test_ticket47653_init(topo)
  341. test_ticket47653_add(topo)
  342. test_ticket47653_search(topo)
  343. test_ticket47653_modify(topo)
  344. test_ticket47653_delete(topo)
  345. test_ticket47653_final(topo)
  346. if __name__ == '__main__':
  347. run_isolated()