ticket47653_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import socket
  7. import time
  8. import logging
  9. import pytest
  10. from lib389 import DirSrv, Entry, tools
  11. from lib389.tools import DirSrvTools
  12. from lib389._constants import *
  13. from lib389.properties import *
  14. from constants import *
  15. log = logging.getLogger(__name__)
  16. installation_prefix = None
  17. OC_NAME = 'OCticket47653'
  18. MUST = "(postalAddress $ postalCode)"
  19. MAY = "(member $ street)"
  20. OTHER_NAME = 'other_entry'
  21. MAX_OTHERS = 10
  22. BIND_NAME = 'bind_entry'
  23. BIND_DN = 'cn=%s, %s' % (BIND_NAME, SUFFIX)
  24. BIND_PW = 'password'
  25. ENTRY_NAME = 'test_entry'
  26. ENTRY_DN = 'cn=%s, %s' % (ENTRY_NAME, SUFFIX)
  27. ENTRY_OC = "top person %s" % OC_NAME
  28. def _oc_definition(oid_ext, name, must=None, may=None):
  29. oid = "1.2.3.4.5.6.7.8.9.10.%d" % oid_ext
  30. desc = 'To test ticket 47490'
  31. sup = 'person'
  32. if not must:
  33. must = MUST
  34. if not may:
  35. may = MAY
  36. new_oc = "( %s NAME '%s' DESC '%s' SUP %s AUXILIARY MUST %s MAY %s )" % (oid, name, desc, sup, must, may)
  37. return new_oc
  38. class TopologyStandalone(object):
  39. def __init__(self, standalone):
  40. standalone.open()
  41. self.standalone = standalone
  42. @pytest.fixture(scope="module")
  43. def topology(request):
  44. '''
  45. This fixture is used to standalone topology for the 'module'.
  46. At the beginning, It may exists a standalone instance.
  47. It may also exists a backup for the standalone instance.
  48. Principle:
  49. If standalone instance exists:
  50. restart it
  51. If backup of standalone exists:
  52. create/rebind to standalone
  53. restore standalone instance from backup
  54. else:
  55. Cleanup everything
  56. remove instance
  57. remove backup
  58. Create instance
  59. Create backup
  60. '''
  61. global installation_prefix
  62. if installation_prefix:
  63. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  64. standalone = DirSrv(verbose=False)
  65. # Args for the standalone instance
  66. args_instance[SER_HOST] = HOST_STANDALONE
  67. args_instance[SER_PORT] = PORT_STANDALONE
  68. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  69. args_standalone = args_instance.copy()
  70. standalone.allocate(args_standalone)
  71. # Get the status of the backups
  72. backup_standalone = standalone.checkBackupFS()
  73. # Get the status of the instance and restart it if it exists
  74. instance_standalone = standalone.exists()
  75. if instance_standalone:
  76. # assuming the instance is already stopped, just wait 5 sec max
  77. standalone.stop(timeout=5)
  78. standalone.start(timeout=10)
  79. if backup_standalone:
  80. # The backup exist, assuming it is correct
  81. # we just re-init the instance with it
  82. if not instance_standalone:
  83. standalone.create()
  84. # Used to retrieve configuration information (dbdir, confdir...)
  85. standalone.open()
  86. # restore standalone instance from backup
  87. standalone.stop(timeout=10)
  88. standalone.restoreFS(backup_standalone)
  89. standalone.start(timeout=10)
  90. else:
  91. # We should be here only in two conditions
  92. # - This is the first time a test involve standalone instance
  93. # - Something weird happened (instance/backup destroyed)
  94. # so we discard everything and recreate all
  95. # Remove the backup. So even if we have a specific backup file
  96. # (e.g backup_standalone) we clear backup that an instance may have created
  97. if backup_standalone:
  98. standalone.clearBackupFS()
  99. # Remove the instance
  100. if instance_standalone:
  101. standalone.delete()
  102. # Create the instance
  103. standalone.create()
  104. # Used to retrieve configuration information (dbdir, confdir...)
  105. standalone.open()
  106. # Time to create the backups
  107. standalone.stop(timeout=10)
  108. standalone.backupfile = standalone.backupFS()
  109. standalone.start(timeout=10)
  110. # clear the tmp directory
  111. standalone.clearTmpDir(__file__)
  112. #
  113. # Here we have standalone instance up and running
  114. # Either coming from a backup recovery
  115. # or from a fresh (re)init
  116. # Time to return the topology
  117. return TopologyStandalone(standalone)
  118. def test_ticket47653_init(topology):
  119. """
  120. It adds
  121. - Objectclass with MAY 'member'
  122. - an entry ('bind_entry') with which we bind to test the 'SELFDN' operation
  123. It deletes the anonymous aci
  124. """
  125. topology.standalone.log.info("Add %s that allows 'member' attribute" % OC_NAME)
  126. new_oc = _oc_definition(2, OC_NAME, must=MUST, may=MAY)
  127. topology.standalone.schema.add_schema('objectClasses', new_oc)
  128. # entry used to bind with
  129. topology.standalone.log.info("Add %s" % BIND_DN)
  130. topology.standalone.add_s(Entry((BIND_DN, {
  131. 'objectclass': "top person".split(),
  132. 'sn': BIND_NAME,
  133. 'cn': BIND_NAME,
  134. 'userpassword': BIND_PW})))
  135. # enable acl error logging
  136. mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
  137. topology.standalone.modify_s(DN_CONFIG, mod)
  138. # get read of anonymous ACI for use 'read-search' aci in SEARCH test
  139. ACI_ANONYMOUS = "(targetattr!=\"userPassword\")(version 3.0; acl \"Enable anonymous access\"; allow (read, search, compare) userdn=\"ldap:///anyone\";)"
  140. mod = [(ldap.MOD_DELETE, 'aci', ACI_ANONYMOUS)]
  141. topology.standalone.modify_s(SUFFIX, mod)
  142. # add dummy entries
  143. for cpt in range(MAX_OTHERS):
  144. name = "%s%d" % (OTHER_NAME, cpt)
  145. topology.standalone.add_s(Entry(("cn=%s,%s" % (name, SUFFIX), {
  146. 'objectclass': "top person".split(),
  147. 'sn': name,
  148. 'cn': name})))
  149. def test_ticket47653_add(topology):
  150. '''
  151. It checks that, bound as bind_entry,
  152. - we can not ADD an entry without the proper SELFDN aci.
  153. - with the proper ACI we can not ADD with 'member' attribute
  154. - with the proper ACI and 'member' it succeeds to ADD
  155. '''
  156. topology.standalone.log.info("\n\n######################### ADD ######################\n")
  157. # bind as bind_entry
  158. topology.standalone.log.info("Bind as %s" % BIND_DN)
  159. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  160. # Prepare the entry with multivalued members
  161. entry_with_members = Entry(ENTRY_DN)
  162. entry_with_members.setValues('objectclass', 'top', 'person', 'OCticket47653')
  163. entry_with_members.setValues('sn', ENTRY_NAME)
  164. entry_with_members.setValues('cn', ENTRY_NAME)
  165. entry_with_members.setValues('postalAddress', 'here')
  166. entry_with_members.setValues('postalCode', '1234')
  167. members = []
  168. for cpt in range(MAX_OTHERS):
  169. name = "%s%d" % (OTHER_NAME, cpt)
  170. members.append("cn=%s,%s" % (name, SUFFIX))
  171. members.append(BIND_DN)
  172. entry_with_members.setValues('member', members)
  173. # Prepare the entry with one member
  174. entry_with_member = Entry(ENTRY_DN)
  175. entry_with_member.setValues('objectclass', 'top', 'person', 'OCticket47653')
  176. entry_with_member.setValues('sn', ENTRY_NAME)
  177. entry_with_member.setValues('cn', ENTRY_NAME)
  178. entry_with_member.setValues('postalAddress', 'here')
  179. entry_with_member.setValues('postalCode', '1234')
  180. member = []
  181. member.append(BIND_DN)
  182. entry_with_member.setValues('member', member)
  183. # entry to add WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
  184. try:
  185. topology.standalone.log.info("Try to add Add %s (aci is missing): %r" % (ENTRY_DN, entry_with_member))
  186. topology.standalone.add_s(entry_with_member)
  187. except Exception as e:
  188. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  189. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  190. # Ok Now add the proper ACI
  191. topology.standalone.log.info("Bind as %s and add the ADD SELFDN aci" % DN_DM)
  192. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  193. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  194. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  195. ACI_ALLOW = "(version 3.0; acl \"SelfDN add\"; allow (add)"
  196. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  197. ACI_BODY = ACI_TARGET + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  198. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  199. topology.standalone.modify_s(SUFFIX, mod)
  200. # bind as bind_entry
  201. topology.standalone.log.info("Bind as %s" % BIND_DN)
  202. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  203. # entry to add WITHOUT member and WITH the ACI -> ldap.INSUFFICIENT_ACCESS
  204. try:
  205. topology.standalone.log.info("Try to add Add %s (member is missing)" % ENTRY_DN)
  206. topology.standalone.add_s(Entry((ENTRY_DN, {
  207. 'objectclass': ENTRY_OC.split(),
  208. 'sn': ENTRY_NAME,
  209. 'cn': ENTRY_NAME,
  210. 'postalAddress': 'here',
  211. 'postalCode': '1234'})))
  212. except Exception as e:
  213. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  214. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  215. # entry to add WITH memberS and WITH the ACI -> ldap.INSUFFICIENT_ACCESS
  216. # member should contain only one value
  217. try:
  218. topology.standalone.log.info("Try to add Add %s (with several member values)" % ENTRY_DN)
  219. topology.standalone.add_s(entry_with_members)
  220. except Exception as e:
  221. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  222. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  223. topology.standalone.log.info("Try to add Add %s should be successful" % ENTRY_DN)
  224. topology.standalone.add_s(entry_with_member)
  225. def test_ticket47653_search(topology):
  226. '''
  227. It checks that, bound as bind_entry,
  228. - we can not search an entry without the proper SELFDN aci.
  229. - adding the ACI, we can search the entry
  230. '''
  231. topology.standalone.log.info("\n\n######################### SEARCH ######################\n")
  232. # bind as bind_entry
  233. topology.standalone.log.info("Bind as %s" % BIND_DN)
  234. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  235. # entry to search WITH member being BIND_DN but WITHOUT the ACI -> no entry returned
  236. topology.standalone.log.info("Try to search %s (aci is missing)" % ENTRY_DN)
  237. ents = topology.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
  238. assert len(ents) == 0
  239. # Ok Now add the proper ACI
  240. topology.standalone.log.info("Bind as %s and add the READ/SEARCH SELFDN aci" % DN_DM)
  241. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  242. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  243. ACI_TARGETATTR = "(targetattr = *)"
  244. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  245. ACI_ALLOW = "(version 3.0; acl \"SelfDN search-read\"; allow (read, search, compare)"
  246. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  247. ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  248. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  249. topology.standalone.modify_s(SUFFIX, mod)
  250. # bind as bind_entry
  251. topology.standalone.log.info("Bind as %s" % BIND_DN)
  252. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  253. # entry to search with the proper aci
  254. topology.standalone.log.info("Try to search %s should be successful" % ENTRY_DN)
  255. ents = topology.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
  256. assert len(ents) == 1
  257. def test_ticket47653_modify(topology):
  258. '''
  259. It checks that, bound as bind_entry,
  260. - we can not modify an entry without the proper SELFDN aci.
  261. - adding the ACI, we can modify the entry
  262. '''
  263. # bind as bind_entry
  264. topology.standalone.log.info("Bind as %s" % BIND_DN)
  265. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  266. topology.standalone.log.info("\n\n######################### MODIFY ######################\n")
  267. # entry to modify WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
  268. try:
  269. topology.standalone.log.info("Try to modify %s (aci is missing)" % ENTRY_DN)
  270. mod = [(ldap.MOD_REPLACE, 'postalCode', '9876')]
  271. topology.standalone.modify_s(ENTRY_DN, mod)
  272. except Exception as e:
  273. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  274. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  275. # Ok Now add the proper ACI
  276. topology.standalone.log.info("Bind as %s and add the WRITE SELFDN aci" % DN_DM)
  277. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  278. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  279. ACI_TARGETATTR = "(targetattr = *)"
  280. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  281. ACI_ALLOW = "(version 3.0; acl \"SelfDN write\"; allow (write)"
  282. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  283. ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  284. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  285. topology.standalone.modify_s(SUFFIX, mod)
  286. # bind as bind_entry
  287. topology.standalone.log.info("Bind as %s" % BIND_DN)
  288. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  289. # modify the entry and checks the value
  290. topology.standalone.log.info("Try to modify %s. It should succeeds" % ENTRY_DN)
  291. mod = [(ldap.MOD_REPLACE, 'postalCode', '1928')]
  292. topology.standalone.modify_s(ENTRY_DN, mod)
  293. ents = topology.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
  294. assert len(ents) == 1
  295. assert ents[0].postalCode == '1928'
  296. def test_ticket47653_delete(topology):
  297. '''
  298. It checks that, bound as bind_entry,
  299. - we can not delete an entry without the proper SELFDN aci.
  300. - adding the ACI, we can delete the entry
  301. '''
  302. topology.standalone.log.info("\n\n######################### DELETE ######################\n")
  303. # bind as bind_entry
  304. topology.standalone.log.info("Bind as %s" % BIND_DN)
  305. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  306. # entry to delete WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
  307. try:
  308. topology.standalone.log.info("Try to delete %s (aci is missing)" % ENTRY_DN)
  309. topology.standalone.delete_s(ENTRY_DN)
  310. except Exception as e:
  311. topology.standalone.log.info("Exception (expected): %s" % type(e).__name__)
  312. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  313. # Ok Now add the proper ACI
  314. topology.standalone.log.info("Bind as %s and add the READ/SEARCH SELFDN aci" % DN_DM)
  315. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  316. ACI_TARGET = "(target = \"ldap:///cn=*,%s\")" % SUFFIX
  317. ACI_TARGETFILTER = "(targetfilter =\"(objectClass=%s)\")" % OC_NAME
  318. ACI_ALLOW = "(version 3.0; acl \"SelfDN delete\"; allow (delete)"
  319. ACI_SUBJECT = " userattr = \"member#selfDN\";)"
  320. ACI_BODY = ACI_TARGET + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
  321. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  322. topology.standalone.modify_s(SUFFIX, mod)
  323. # bind as bind_entry
  324. topology.standalone.log.info("Bind as %s" % BIND_DN)
  325. topology.standalone.simple_bind_s(BIND_DN, BIND_PW)
  326. # entry to search with the proper aci
  327. topology.standalone.log.info("Try to delete %s should be successful" % ENTRY_DN)
  328. topology.standalone.delete_s(ENTRY_DN)
  329. def test_ticket47653_final(topology):
  330. topology.standalone.delete()
  331. def run_isolated():
  332. '''
  333. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  334. To run isolated without py.test, you need to
  335. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  336. - set the installation prefix
  337. - run this program
  338. '''
  339. global installation_prefix
  340. installation_prefix = None
  341. topo = topology(True)
  342. test_ticket47653_init(topo)
  343. test_ticket47653_add(topo)
  344. test_ticket47653_search(topo)
  345. test_ticket47653_modify(topo)
  346. test_ticket47653_delete(topo)
  347. test_ticket47653_final(topo)
  348. if __name__ == '__main__':
  349. run_isolated()