acl_test.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2015 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. from lib389 import DirSrv, Entry, tools, tasks
  16. from lib389.tools import DirSrvTools
  17. from lib389._constants import *
  18. from lib389.properties import *
  19. from lib389.tasks import *
  20. from lib389.utils import *
  21. from ldap.controls.simple import GetEffectiveRightsControl
  22. logging.getLogger(__name__).setLevel(logging.DEBUG)
  23. log = logging.getLogger(__name__)
  24. #
  25. # important part. We can deploy Master1 and Master2 on different versions
  26. #
  27. installation1_prefix = None
  28. installation2_prefix = None
  29. TEST_REPL_DN = "cn=test_repl, %s" % SUFFIX
  30. STAGING_CN = "staged user"
  31. PRODUCTION_CN = "accounts"
  32. EXCEPT_CN = "excepts"
  33. STAGING_DN = "cn=%s,%s" % (STAGING_CN, SUFFIX)
  34. PRODUCTION_DN = "cn=%s,%s" % (PRODUCTION_CN, SUFFIX)
  35. PROD_EXCEPT_DN = "cn=%s,%s" % (EXCEPT_CN, PRODUCTION_DN)
  36. STAGING_PATTERN = "cn=%s*,%s" % (STAGING_CN[:2], SUFFIX)
  37. PRODUCTION_PATTERN = "cn=%s*,%s" % (PRODUCTION_CN[:2], SUFFIX)
  38. BAD_STAGING_PATTERN = "cn=bad*,%s" % (SUFFIX)
  39. BAD_PRODUCTION_PATTERN = "cn=bad*,%s" % (SUFFIX)
  40. BIND_CN = "bind_entry"
  41. BIND_DN = "cn=%s,%s" % (BIND_CN, SUFFIX)
  42. BIND_PW = "password"
  43. NEW_ACCOUNT = "new_account"
  44. MAX_ACCOUNTS = 20
  45. CONFIG_MODDN_ACI_ATTR = "nsslapd-moddn-aci"
  46. SRC_ENTRY_CN = "tuser"
  47. EXT_RDN = "01"
  48. DST_ENTRY_CN = SRC_ENTRY_CN + EXT_RDN
  49. SRC_ENTRY_DN = "cn=%s,%s" % (SRC_ENTRY_CN, SUFFIX)
  50. DST_ENTRY_DN = "cn=%s,%s" % (DST_ENTRY_CN, SUFFIX)
  51. class TopologyMaster1Master2(object):
  52. def __init__(self, master1, master2):
  53. master1.open()
  54. self.master1 = master1
  55. master2.open()
  56. self.master2 = master2
  57. @pytest.fixture(scope="module")
  58. def topology(request):
  59. """This fixture is used to create a replicated topology for the 'module'.
  60. The replicated topology is MASTER1 <-> Master2.
  61. """
  62. global installation1_prefix
  63. global installation2_prefix
  64. # allocate master1 on a given deployement
  65. master1 = DirSrv(verbose=False)
  66. if installation1_prefix:
  67. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  68. # Args for the master1 instance
  69. args_instance[SER_HOST] = HOST_MASTER_1
  70. args_instance[SER_PORT] = PORT_MASTER_1
  71. args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
  72. args_master = args_instance.copy()
  73. master1.allocate(args_master)
  74. # allocate master1 on a given deployement
  75. master2 = DirSrv(verbose=False)
  76. if installation2_prefix:
  77. args_instance[SER_DEPLOYED_DIR] = installation2_prefix
  78. # Args for the consumer instance
  79. args_instance[SER_HOST] = HOST_MASTER_2
  80. args_instance[SER_PORT] = PORT_MASTER_2
  81. args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
  82. args_master = args_instance.copy()
  83. master2.allocate(args_master)
  84. # Get the status of the instance and restart it if it exists
  85. instance_master1 = master1.exists()
  86. instance_master2 = master2.exists()
  87. # Remove all the instances
  88. if instance_master1:
  89. master1.delete()
  90. if instance_master2:
  91. master2.delete()
  92. # Create the instances
  93. master1.create()
  94. master1.open()
  95. master2.create()
  96. master2.open()
  97. #
  98. # Now prepare the Master-Consumer topology
  99. #
  100. # First Enable replication
  101. master1.replica.enableReplication(suffix=SUFFIX,
  102. role=REPLICAROLE_MASTER,
  103. replicaId=REPLICAID_MASTER_1)
  104. master2.replica.enableReplication(suffix=SUFFIX,
  105. role=REPLICAROLE_MASTER,
  106. replicaId=REPLICAID_MASTER_2)
  107. # Initialize the supplier->consumer
  108. properties = {RA_NAME: r'meTo_$host:$port',
  109. RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
  110. RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
  111. RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
  112. RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
  113. repl_agreement = master1.agreement.create(suffix=SUFFIX,
  114. host=master2.host,
  115. port=master2.port,
  116. properties=properties)
  117. if not repl_agreement:
  118. log.fatal("Fail to create a replica agreement")
  119. sys.exit(1)
  120. log.debug("%s created" % repl_agreement)
  121. properties = {RA_NAME: r'meTo_$host:$port',
  122. RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
  123. RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
  124. RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
  125. RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
  126. master2.agreement.create(suffix=SUFFIX,
  127. host=master1.host,
  128. port=master1.port,
  129. properties=properties)
  130. master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2)
  131. master1.waitForReplInit(repl_agreement)
  132. # Check replication is working fine
  133. if master1.testReplication(DEFAULT_SUFFIX, master2):
  134. log.info('Replication is working.')
  135. else:
  136. log.fatal('Replication is not working.')
  137. assert False
  138. def fin():
  139. master1.delete()
  140. master2.delete()
  141. request.addfinalizer(fin)
  142. # clear the tmp directory
  143. master1.clearTmpDir(__file__)
  144. # Here we have two instances master and consumer
  145. # with replication working.
  146. return TopologyMaster1Master2(master1, master2)
  147. def add_attr(topology, attr_name):
  148. """Adds attribute to the schema"""
  149. ATTR_VALUE = """(NAME '%s' \
  150. DESC 'Attribute filteri-Multi-Valued' \
  151. SYNTAX 1.3.6.1.4.1.1466.115.121.1.27)""" % attr_name
  152. mod = [(ldap.MOD_ADD, 'attributeTypes', ATTR_VALUE)]
  153. try:
  154. topology.master1.modify_s(DN_SCHEMA, mod)
  155. except ldap.LDAPError as e:
  156. log.fatal('Failed to add attr (%s): error (%s)' % (attr_name,
  157. e.message['desc']))
  158. assert False
  159. @pytest.fixture(params=["lang-ja", "binary", "phonetic"])
  160. def aci_with_attr_subtype(request, topology):
  161. """Adds and deletes an ACI in the DEFAULT_SUFFIX"""
  162. TARGET_ATTR = 'protectedOperation'
  163. USER_ATTR = 'allowedToPerform'
  164. SUBTYPE = request.param
  165. log.info("========Executing test with '%s' subtype========" % SUBTYPE)
  166. log.info(" Add a target attribute")
  167. add_attr(topology, TARGET_ATTR)
  168. log.info(" Add a user attribute")
  169. add_attr(topology, USER_ATTR)
  170. ACI_TARGET = '(targetattr=%s;%s)' % (TARGET_ATTR, SUBTYPE)
  171. ACI_ALLOW = '(version 3.0; acl "test aci for subtypes"; allow (read) '
  172. ACI_SUBJECT = 'userattr = "%s;%s#GROUPDN";)' % (USER_ATTR, SUBTYPE)
  173. ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
  174. log.info(" Add an ACI with attribute subtype")
  175. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  176. try:
  177. topology.master1.modify_s(DEFAULT_SUFFIX, mod)
  178. except ldap.LDAPError as e:
  179. log.fatal('Failed to add ACI: error (%s)' % (e.message['desc']))
  180. assert False
  181. def fin():
  182. log.info(" Finally, delete an ACI with the '%s' subtype" %
  183. SUBTYPE)
  184. mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
  185. try:
  186. topology.master1.modify_s(DEFAULT_SUFFIX, mod)
  187. except ldap.LDAPError as e:
  188. log.fatal('Failed to delete ACI: error (%s)' % (e.message['desc']))
  189. assert False
  190. request.addfinalizer(fin)
  191. return ACI_BODY
  192. def test_aci_attr_subtype_targetattr(topology, aci_with_attr_subtype):
  193. """Checks, that ACIs allow attribute subtypes in the targetattr keyword
  194. Test description:
  195. 1. Define two attributes in the schema
  196. - first will be a targetattr
  197. - second will be a userattr
  198. 2. Add an ACI with an attribute subtype
  199. - or language subtype
  200. - or binary subtype
  201. - or pronunciation subtype
  202. """
  203. log.info(" Search for the added attribute")
  204. try:
  205. entries = topology.master1.search_s(DEFAULT_SUFFIX,
  206. ldap.SCOPE_BASE,
  207. '(objectclass=*)', ['aci'])
  208. entry = str(entries[0])
  209. assert aci_with_attr_subtype in entry
  210. log.info(" The added attribute was found")
  211. except ldap.LDAPError as e:
  212. log.fatal('Search failed, error: ' + e.message['desc'])
  213. assert False
  214. def _bind_manager(topology):
  215. topology.master1.log.info("Bind as %s " % DN_DM)
  216. topology.master1.simple_bind_s(DN_DM, PASSWORD)
  217. def _bind_normal(topology):
  218. # bind as bind_entry
  219. topology.master1.log.info("Bind as %s" % BIND_DN)
  220. topology.master1.simple_bind_s(BIND_DN, BIND_PW)
  221. def _moddn_aci_deny_tree(topology, mod_type=None,
  222. target_from=STAGING_DN, target_to=PROD_EXCEPT_DN):
  223. """It denies the access moddn_to in cn=except,cn=accounts,SUFFIX"""
  224. assert mod_type is not None
  225. ACI_TARGET_FROM = ""
  226. ACI_TARGET_TO = ""
  227. if target_from:
  228. ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" % (target_from)
  229. if target_to:
  230. ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" % (target_to)
  231. ACI_ALLOW = "(version 3.0; acl \"Deny MODDN to prod_except\"; deny (moddn)"
  232. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  233. ACI_BODY = ACI_TARGET_TO + ACI_TARGET_FROM + ACI_ALLOW + ACI_SUBJECT
  234. mod = [(mod_type, 'aci', ACI_BODY)]
  235. #topology.master1.modify_s(SUFFIX, mod)
  236. topology.master1.log.info("Add a DENY aci under %s " % PROD_EXCEPT_DN)
  237. topology.master1.modify_s(PROD_EXCEPT_DN, mod)
  238. def _write_aci_staging(topology, mod_type=None):
  239. assert mod_type is not None
  240. ACI_TARGET = "(targetattr= \"cn\")(target=\"ldap:///cn=*,%s\")" % STAGING_DN
  241. ACI_ALLOW = "(version 3.0; acl \"write staging entries\"; allow (write)"
  242. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  243. ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
  244. mod = [(mod_type, 'aci', ACI_BODY)]
  245. topology.master1.modify_s(SUFFIX, mod)
  246. def _write_aci_production(topology, mod_type=None):
  247. assert mod_type is not None
  248. ACI_TARGET = "(targetattr= \"cn\")(target=\"ldap:///cn=*,%s\")" % PRODUCTION_DN
  249. ACI_ALLOW = "(version 3.0; acl \"write production entries\"; allow (write)"
  250. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  251. ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
  252. mod = [(mod_type, 'aci', ACI_BODY)]
  253. topology.master1.modify_s(SUFFIX, mod)
  254. def _moddn_aci_staging_to_production(topology, mod_type=None,
  255. target_from=STAGING_DN, target_to=PRODUCTION_DN):
  256. assert mod_type is not None
  257. ACI_TARGET_FROM = ""
  258. ACI_TARGET_TO = ""
  259. if target_from:
  260. ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" % (target_from)
  261. if target_to:
  262. ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" % (target_to)
  263. ACI_ALLOW = "(version 3.0; acl \"MODDN from staging to production\"; allow (moddn)"
  264. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  265. ACI_BODY = ACI_TARGET_FROM + ACI_TARGET_TO + ACI_ALLOW + ACI_SUBJECT
  266. mod = [(mod_type, 'aci', ACI_BODY)]
  267. topology.master1.modify_s(SUFFIX, mod)
  268. _write_aci_staging(topology, mod_type=mod_type)
  269. def _moddn_aci_from_production_to_staging(topology, mod_type=None):
  270. assert mod_type is not None
  271. ACI_TARGET = "(target_from = \"ldap:///%s\") (target_to = \"ldap:///%s\")" % (
  272. PRODUCTION_DN, STAGING_DN)
  273. ACI_ALLOW = "(version 3.0; acl \"MODDN from production to staging\"; allow (moddn)"
  274. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  275. ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
  276. mod = [(mod_type, 'aci', ACI_BODY)]
  277. topology.master1.modify_s(SUFFIX, mod)
  278. _write_aci_production(topology, mod_type=mod_type)
  279. @pytest.fixture(scope="module")
  280. def moddn_setup(topology):
  281. """Creates
  282. - a staging DIT
  283. - a production DIT
  284. - add accounts in staging DIT
  285. - enable ACL logging (commented for performance reason)
  286. """
  287. topology.master1.log.info("\n\n######## INITIALIZATION ########\n")
  288. # entry used to bind with
  289. topology.master1.log.info("Add %s" % BIND_DN)
  290. topology.master1.add_s(Entry((BIND_DN, {
  291. 'objectclass': "top person".split(),
  292. 'sn': BIND_CN,
  293. 'cn': BIND_CN,
  294. 'userpassword': BIND_PW})))
  295. # DIT for staging
  296. topology.master1.log.info("Add %s" % STAGING_DN)
  297. topology.master1.add_s(Entry((STAGING_DN, {
  298. 'objectclass': "top organizationalRole".split(),
  299. 'cn': STAGING_CN,
  300. 'description': "staging DIT"})))
  301. # DIT for production
  302. topology.master1.log.info("Add %s" % PRODUCTION_DN)
  303. topology.master1.add_s(Entry((PRODUCTION_DN, {
  304. 'objectclass': "top organizationalRole".split(),
  305. 'cn': PRODUCTION_CN,
  306. 'description': "production DIT"})))
  307. # DIT for production/except
  308. topology.master1.log.info("Add %s" % PROD_EXCEPT_DN)
  309. topology.master1.add_s(Entry((PROD_EXCEPT_DN, {
  310. 'objectclass': "top organizationalRole".split(),
  311. 'cn': EXCEPT_CN,
  312. 'description': "production except DIT"})))
  313. # enable acl error logging
  314. #mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
  315. #topology.master1.modify_s(DN_CONFIG, mod)
  316. #topology.master2.modify_s(DN_CONFIG, mod)
  317. # add dummy entries in the staging DIT
  318. for cpt in range(MAX_ACCOUNTS):
  319. name = "%s%d" % (NEW_ACCOUNT, cpt)
  320. topology.master1.add_s(Entry(("cn=%s,%s" % (name, STAGING_DN), {
  321. 'objectclass': "top person".split(),
  322. 'sn': name,
  323. 'cn': name})))
  324. def test_mode_default_add_deny(topology, moddn_setup):
  325. """This test case checks
  326. that the ADD operation fails (no ADD aci on production)
  327. """
  328. topology.master1.log.info("\n\n######## mode moddn_aci : ADD (should fail) ########\n")
  329. _bind_normal(topology)
  330. #
  331. # First try to add an entry in production => INSUFFICIENT_ACCESS
  332. #
  333. try:
  334. topology.master1.log.info("Try to add %s" % PRODUCTION_DN)
  335. name = "%s%d" % (NEW_ACCOUNT, 0)
  336. topology.master1.add_s(Entry(("cn=%s,%s" % (name, PRODUCTION_DN), {
  337. 'objectclass': "top person".split(),
  338. 'sn': name,
  339. 'cn': name})))
  340. assert 0 # this is an error, we should not be allowed to add an entry in production
  341. except Exception as e:
  342. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  343. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  344. def test_mode_default_delete_deny(topology, moddn_setup):
  345. """This test case checks
  346. that the DEL operation fails (no 'delete' aci on production)
  347. """
  348. topology.master1.log.info("\n\n######## DELETE (should fail) ########\n")
  349. _bind_normal(topology)
  350. #
  351. # Second try to delete an entry in staging => INSUFFICIENT_ACCESS
  352. #
  353. try:
  354. topology.master1.log.info("Try to delete %s" % STAGING_DN)
  355. name = "%s%d" % (NEW_ACCOUNT, 0)
  356. topology.master1.delete_s("cn=%s,%s" % (name, STAGING_DN))
  357. assert 0 # this is an error, we should not be allowed to add an entry in production
  358. except Exception as e:
  359. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  360. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  361. @pytest.mark.parametrize("index,tfrom,tto,failure",
  362. [(0, STAGING_DN, PRODUCTION_DN, False),
  363. (1, STAGING_DN, PRODUCTION_DN, False),
  364. (2, STAGING_DN, BAD_PRODUCTION_PATTERN, True),
  365. (3, STAGING_PATTERN, PRODUCTION_DN, False),
  366. (4, BAD_STAGING_PATTERN, PRODUCTION_DN, True),
  367. (5, STAGING_PATTERN, PRODUCTION_PATTERN, False),
  368. (6, None, PRODUCTION_PATTERN, False),
  369. (7, STAGING_PATTERN, None, False),
  370. (8, None, None, False)])
  371. def test_moddn_staging_prod(topology, moddn_setup,
  372. index, tfrom, tto, failure):
  373. """This test case MOVE entry NEW_ACCOUNT0 from staging to prod
  374. target_to/target_from: equality filter
  375. """
  376. topology.master1.log.info("\n\n######## MOVE staging -> Prod (%s) ########\n" % index)
  377. _bind_normal(topology)
  378. old_rdn = "cn=%s%s" % (NEW_ACCOUNT, index)
  379. old_dn = "%s,%s" % (old_rdn, STAGING_DN)
  380. new_rdn = old_rdn
  381. new_superior = PRODUCTION_DN
  382. #
  383. # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
  384. #
  385. try:
  386. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  387. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  388. assert 0
  389. except AssertionError:
  390. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  391. except Exception as e:
  392. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  393. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  394. # successfull MOD with the ACI
  395. topology.master1.log.info("\n\n######## MOVE to and from equality filter ########\n")
  396. _bind_manager(topology)
  397. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  398. target_from=tfrom, target_to=tto)
  399. _bind_normal(topology)
  400. try:
  401. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  402. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  403. except Exception as e:
  404. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  405. if failure:
  406. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  407. # successfull MOD with the both ACI
  408. _bind_manager(topology)
  409. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  410. target_from=tfrom, target_to=tto)
  411. _bind_normal(topology)
  412. def test_moddn_staging_prod_9(topology, moddn_setup):
  413. """This test case disable the 'moddn' right so a MODDN requires a 'add' right
  414. to be successfull.
  415. It fails to MOVE entry NEW_ACCOUNT9 from staging to prod.
  416. Add a 'add' right to prod.
  417. Then it succeeds to MOVE NEW_ACCOUNT9 from staging to prod.
  418. Then enable the 'moddn' right so a MODDN requires a 'moddn' right
  419. It fails to MOVE entry NEW_ACCOUNT10 from staging to prod.
  420. Add a 'moddn' right to prod.
  421. Then it succeeds to MOVE NEW_ACCOUNT10 from staging to prod.
  422. """
  423. topology.master1.log.info("\n\n######## MOVE staging -> Prod (9) ########\n")
  424. _bind_normal(topology)
  425. old_rdn = "cn=%s9" % NEW_ACCOUNT
  426. old_dn = "%s,%s" % (old_rdn, STAGING_DN)
  427. new_rdn = old_rdn
  428. new_superior = PRODUCTION_DN
  429. #
  430. # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
  431. #
  432. try:
  433. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  434. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  435. assert 0
  436. except AssertionError:
  437. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  438. except Exception as e:
  439. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  440. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  441. #############
  442. # Now do tests with no support of moddn aci
  443. #############
  444. topology.master1.log.info("Disable the moddn right")
  445. _bind_manager(topology)
  446. mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'off')]
  447. topology.master1.modify_s(DN_CONFIG, mod)
  448. # Add the moddn aci that will not be evaluated because of the config flag
  449. topology.master1.log.info("\n\n######## MOVE to and from equality filter ########\n")
  450. _bind_manager(topology)
  451. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  452. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  453. _bind_normal(topology)
  454. # It will fail because it will test the ADD right
  455. try:
  456. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  457. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  458. assert 0
  459. except AssertionError:
  460. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  461. except Exception as e:
  462. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  463. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  464. # remove the moddn aci
  465. _bind_manager(topology)
  466. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  467. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  468. _bind_normal(topology)
  469. #
  470. # add the 'add' right to the production DN
  471. # Then do a successfull moddn
  472. #
  473. ACI_ALLOW = "(version 3.0; acl \"ADD rights to allow moddn\"; allow (add)"
  474. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  475. ACI_BODY = ACI_ALLOW + ACI_SUBJECT
  476. _bind_manager(topology)
  477. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  478. topology.master1.modify_s(PRODUCTION_DN, mod)
  479. _write_aci_staging(topology, mod_type=ldap.MOD_ADD)
  480. _bind_normal(topology)
  481. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  482. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  483. _bind_manager(topology)
  484. mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
  485. topology.master1.modify_s(PRODUCTION_DN, mod)
  486. _write_aci_staging(topology, mod_type=ldap.MOD_DELETE)
  487. _bind_normal(topology)
  488. #############
  489. # Now do tests with support of moddn aci
  490. #############
  491. topology.master1.log.info("Enable the moddn right")
  492. _bind_manager(topology)
  493. mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'on')]
  494. topology.master1.modify_s(DN_CONFIG, mod)
  495. topology.master1.log.info("\n\n######## MOVE staging -> Prod (10) ########\n")
  496. _bind_normal(topology)
  497. old_rdn = "cn=%s10" % NEW_ACCOUNT
  498. old_dn = "%s,%s" % (old_rdn, STAGING_DN)
  499. new_rdn = old_rdn
  500. new_superior = PRODUCTION_DN
  501. #
  502. # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
  503. #
  504. try:
  505. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  506. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  507. assert 0
  508. except AssertionError:
  509. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  510. except Exception as e:
  511. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  512. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  513. #
  514. # add the 'add' right to the production DN
  515. # Then do a failing moddn
  516. #
  517. ACI_ALLOW = "(version 3.0; acl \"ADD rights to allow moddn\"; allow (add)"
  518. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  519. ACI_BODY = ACI_ALLOW + ACI_SUBJECT
  520. _bind_manager(topology)
  521. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  522. topology.master1.modify_s(PRODUCTION_DN, mod)
  523. _write_aci_staging(topology, mod_type=ldap.MOD_ADD)
  524. _bind_normal(topology)
  525. try:
  526. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  527. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  528. assert 0
  529. except AssertionError:
  530. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  531. except Exception as e:
  532. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  533. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  534. _bind_manager(topology)
  535. mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
  536. topology.master1.modify_s(PRODUCTION_DN, mod)
  537. _write_aci_staging(topology, mod_type=ldap.MOD_DELETE)
  538. _bind_normal(topology)
  539. # Add the moddn aci that will be evaluated because of the config flag
  540. topology.master1.log.info("\n\n######## MOVE to and from equality filter ########\n")
  541. _bind_manager(topology)
  542. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  543. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  544. _bind_normal(topology)
  545. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  546. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  547. # remove the moddn aci
  548. _bind_manager(topology)
  549. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  550. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  551. _bind_normal(topology)
  552. def test_moddn_prod_staging(topology, moddn_setup):
  553. """This test checks that we can move ACCOUNT11 from staging to prod
  554. but not move back ACCOUNT11 from prod to staging
  555. """
  556. topology.master1.log.info("\n\n######## MOVE staging -> Prod (11) ########\n")
  557. _bind_normal(topology)
  558. old_rdn = "cn=%s11" % NEW_ACCOUNT
  559. old_dn = "%s,%s" % (old_rdn, STAGING_DN)
  560. new_rdn = old_rdn
  561. new_superior = PRODUCTION_DN
  562. #
  563. # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
  564. #
  565. try:
  566. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  567. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  568. assert 0
  569. except AssertionError:
  570. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  571. except Exception as e:
  572. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  573. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  574. # successfull MOD with the ACI
  575. topology.master1.log.info("\n\n######## MOVE to and from equality filter ########\n")
  576. _bind_manager(topology)
  577. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  578. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  579. _bind_normal(topology)
  580. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  581. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  582. # Now check we can not move back the entry to staging
  583. old_rdn = "cn=%s11" % NEW_ACCOUNT
  584. old_dn = "%s,%s" % (old_rdn, PRODUCTION_DN)
  585. new_rdn = old_rdn
  586. new_superior = STAGING_DN
  587. # add the write right because we want to check the moddn
  588. _bind_manager(topology)
  589. _write_aci_production(topology, mod_type=ldap.MOD_ADD)
  590. _bind_normal(topology)
  591. try:
  592. topology.master1.log.info("Try to move back MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  593. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  594. assert 0
  595. except AssertionError:
  596. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  597. except Exception as e:
  598. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  599. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  600. _bind_manager(topology)
  601. _write_aci_production(topology, mod_type=ldap.MOD_DELETE)
  602. _bind_normal(topology)
  603. # successfull MOD with the both ACI
  604. _bind_manager(topology)
  605. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  606. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  607. _bind_normal(topology)
  608. def test_check_repl_M2_to_M1(topology, moddn_setup):
  609. """Checks that replication is still working M2->M1, using ACCOUNT12"""
  610. topology.master1.log.info("Bind as %s (M2)" % DN_DM)
  611. topology.master2.simple_bind_s(DN_DM, PASSWORD)
  612. rdn = "cn=%s12" % NEW_ACCOUNT
  613. dn = "%s,%s" % (rdn, STAGING_DN)
  614. # First wait for the ACCOUNT19 entry being replicated on M2
  615. loop = 0
  616. while loop <= 10:
  617. try:
  618. ent = topology.master2.getEntry(dn, ldap.SCOPE_BASE, "(objectclass=*)")
  619. break
  620. except ldap.NO_SUCH_OBJECT:
  621. time.sleep(1)
  622. loop += 1
  623. assert loop <= 10
  624. attribute = 'description'
  625. tested_value = 'Hello world'
  626. mod = [(ldap.MOD_ADD, attribute, tested_value)]
  627. topology.master1.log.info("Update (M2) %s (%s)" % (dn, attribute))
  628. topology.master2.modify_s(dn, mod)
  629. loop = 0
  630. while loop <= 10:
  631. ent = topology.master1.getEntry(dn, ldap.SCOPE_BASE, "(objectclass=*)")
  632. assert ent is not None
  633. if ent.hasAttr(attribute) and (ent.getValue(attribute) == tested_value):
  634. break
  635. time.sleep(1)
  636. loop += 1
  637. assert loop < 10
  638. topology.master1.log.info("Update %s (%s) replicated on M1" % (dn, attribute))
  639. def test_moddn_staging_prod_except(topology, moddn_setup):
  640. """This test case MOVE entry NEW_ACCOUNT13 from staging to prod
  641. but fails to move entry NEW_ACCOUNT14 from staging to prod_except
  642. """
  643. topology.master1.log.info("\n\n######## MOVE staging -> Prod (13) ########\n")
  644. _bind_normal(topology)
  645. old_rdn = "cn=%s13" % NEW_ACCOUNT
  646. old_dn = "%s,%s" % (old_rdn, STAGING_DN)
  647. new_rdn = old_rdn
  648. new_superior = PRODUCTION_DN
  649. #
  650. # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
  651. #
  652. try:
  653. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  654. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  655. assert 0
  656. except AssertionError:
  657. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  658. except Exception as e:
  659. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  660. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  661. # successfull MOD with the ACI
  662. topology.master1.log.info("\n\n######## MOVE to and from equality filter ########\n")
  663. _bind_manager(topology)
  664. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  665. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  666. _moddn_aci_deny_tree(topology, mod_type=ldap.MOD_ADD)
  667. _bind_normal(topology)
  668. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  669. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  670. #
  671. # Now try to move an entry under except
  672. #
  673. topology.master1.log.info("\n\n######## MOVE staging -> Prod/Except (14) ########\n")
  674. old_rdn = "cn=%s14" % NEW_ACCOUNT
  675. old_dn = "%s,%s" % (old_rdn, STAGING_DN)
  676. new_rdn = old_rdn
  677. new_superior = PROD_EXCEPT_DN
  678. try:
  679. topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn, new_superior))
  680. topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
  681. assert 0
  682. except AssertionError:
  683. topology.master1.log.info("Exception (not really expected exception but that is fine as it fails to rename)")
  684. except Exception as e:
  685. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  686. assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
  687. # successfull MOD with the both ACI
  688. _bind_manager(topology)
  689. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  690. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  691. _moddn_aci_deny_tree(topology, mod_type=ldap.MOD_DELETE)
  692. _bind_normal(topology)
  693. def test_mode_default_ger_no_moddn(topology, moddn_setup):
  694. topology.master1.log.info("\n\n######## mode moddn_aci : GER no moddn ########\n")
  695. request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " + BIND_DN)
  696. msg_id = topology.master1.search_ext(PRODUCTION_DN,
  697. ldap.SCOPE_SUBTREE,
  698. "objectclass=*",
  699. serverctrls=[request_ctrl])
  700. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  701. #ger={}
  702. value = ''
  703. for dn, attrs in rdata:
  704. topology.master1.log.info("dn: %s" % dn)
  705. value = attrs['entryLevelRights'][0]
  706. topology.master1.log.info("######## entryLevelRights: %r" % value)
  707. assert 'n' not in value
  708. def test_mode_default_ger_with_moddn(topology, moddn_setup):
  709. """This test case adds the moddn aci and check ger contains 'n'"""
  710. topology.master1.log.info("\n\n######## mode moddn_aci: GER with moddn ########\n")
  711. # successfull MOD with the ACI
  712. _bind_manager(topology)
  713. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  714. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  715. _bind_normal(topology)
  716. request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " + BIND_DN)
  717. msg_id = topology.master1.search_ext(PRODUCTION_DN,
  718. ldap.SCOPE_SUBTREE,
  719. "objectclass=*",
  720. serverctrls=[request_ctrl])
  721. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  722. #ger={}
  723. value = ''
  724. for dn, attrs in rdata:
  725. topology.master1.log.info("dn: %s" % dn)
  726. value = attrs['entryLevelRights'][0]
  727. topology.master1.log.info("######## entryLevelRights: %r" % value)
  728. assert 'n' in value
  729. # successfull MOD with the both ACI
  730. _bind_manager(topology)
  731. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  732. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  733. _bind_normal(topology)
  734. def test_mode_switch_default_to_legacy(topology, moddn_setup):
  735. """This test switch the server from default mode to legacy"""
  736. topology.master1.log.info("\n\n######## Disable the moddn aci mod ########\n")
  737. _bind_manager(topology)
  738. mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'off')]
  739. topology.master1.modify_s(DN_CONFIG, mod)
  740. def test_mode_legacy_ger_no_moddn1(topology, moddn_setup):
  741. topology.master1.log.info("\n\n######## mode legacy 1: GER no moddn ########\n")
  742. request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " + BIND_DN)
  743. msg_id = topology.master1.search_ext(PRODUCTION_DN,
  744. ldap.SCOPE_SUBTREE,
  745. "objectclass=*",
  746. serverctrls=[request_ctrl])
  747. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  748. #ger={}
  749. value = ''
  750. for dn, attrs in rdata:
  751. topology.master1.log.info("dn: %s" % dn)
  752. value = attrs['entryLevelRights'][0]
  753. topology.master1.log.info("######## entryLevelRights: %r" % value)
  754. assert 'n' not in value
  755. def test_mode_legacy_ger_no_moddn2(topology, moddn_setup):
  756. topology.master1.log.info("\n\n######## mode legacy 2: GER no moddn ########\n")
  757. # successfull MOD with the ACI
  758. _bind_manager(topology)
  759. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
  760. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  761. _bind_normal(topology)
  762. request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " + BIND_DN)
  763. msg_id = topology.master1.search_ext(PRODUCTION_DN,
  764. ldap.SCOPE_SUBTREE,
  765. "objectclass=*",
  766. serverctrls=[request_ctrl])
  767. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  768. #ger={}
  769. value = ''
  770. for dn, attrs in rdata:
  771. topology.master1.log.info("dn: %s" % dn)
  772. value = attrs['entryLevelRights'][0]
  773. topology.master1.log.info("######## entryLevelRights: %r" % value)
  774. assert 'n' not in value
  775. # successfull MOD with the both ACI
  776. _bind_manager(topology)
  777. _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
  778. target_from=STAGING_DN, target_to=PRODUCTION_DN)
  779. _bind_normal(topology)
  780. def test_mode_legacy_ger_with_moddn(topology, moddn_setup):
  781. topology.master1.log.info("\n\n######## mode legacy : GER with moddn ########\n")
  782. # being allowed to read/write the RDN attribute use to allow the RDN
  783. ACI_TARGET = "(target = \"ldap:///%s\")(targetattr=\"cn\")" % (PRODUCTION_DN)
  784. ACI_ALLOW = "(version 3.0; acl \"MODDN production changing the RDN attribute\"; allow (read,search,write)"
  785. ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
  786. ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
  787. # successfull MOD with the ACI
  788. _bind_manager(topology)
  789. mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
  790. topology.master1.modify_s(SUFFIX, mod)
  791. _bind_normal(topology)
  792. request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " + BIND_DN)
  793. msg_id = topology.master1.search_ext(PRODUCTION_DN,
  794. ldap.SCOPE_SUBTREE,
  795. "objectclass=*",
  796. serverctrls=[request_ctrl])
  797. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  798. #ger={}
  799. value = ''
  800. for dn, attrs in rdata:
  801. topology.master1.log.info("dn: %s" % dn)
  802. value = attrs['entryLevelRights'][0]
  803. topology.master1.log.info("######## entryLevelRights: %r" % value)
  804. assert 'n' in value
  805. # successfull MOD with the both ACI
  806. _bind_manager(topology)
  807. mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
  808. topology.master1.modify_s(SUFFIX, mod)
  809. #_bind_normal(topology)
  810. @pytest.fixture(scope="module")
  811. def rdn_write_setup(topology):
  812. topology.master1.log.info("\n\n######## Add entry tuser ########\n")
  813. topology.master1.add_s(Entry((SRC_ENTRY_DN, {
  814. 'objectclass': "top person".split(),
  815. 'sn': SRC_ENTRY_CN,
  816. 'cn': SRC_ENTRY_CN})))
  817. def test_rdn_write_get_ger(topology, rdn_write_setup):
  818. ANONYMOUS_DN = ""
  819. topology.master1.log.info("\n\n######## GER rights for anonymous ########\n")
  820. request_ctrl = GetEffectiveRightsControl(criticality=True,
  821. authzId="dn:" + ANONYMOUS_DN)
  822. msg_id = topology.master1.search_ext(SUFFIX,
  823. ldap.SCOPE_SUBTREE,
  824. "objectclass=*",
  825. serverctrls=[request_ctrl])
  826. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  827. value = ''
  828. for dn, attrs in rdata:
  829. topology.master1.log.info("dn: %s" % dn)
  830. for value in attrs['entryLevelRights']:
  831. topology.master1.log.info("######## entryLevelRights: %r" % value)
  832. assert 'n' not in value
  833. def test_rdn_write_modrdn_anonymous(topology, rdn_write_setup):
  834. ANONYMOUS_DN = ""
  835. topology.master1.close()
  836. topology.master1.binddn = ANONYMOUS_DN
  837. topology.master1.open()
  838. msg_id = topology.master1.search_ext("", ldap.SCOPE_BASE, "objectclass=*")
  839. rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
  840. for dn, attrs in rdata:
  841. topology.master1.log.info("dn: %s" % dn)
  842. for attr in attrs:
  843. topology.master1.log.info("######## %r: %r" % (attr, attrs[attr]))
  844. try:
  845. topology.master1.rename_s(SRC_ENTRY_DN, "cn=%s" % DST_ENTRY_CN, delold=True)
  846. except Exception as e:
  847. topology.master1.log.info("Exception (expected): %s" % type(e).__name__)
  848. isinstance(e, ldap.INSUFFICIENT_ACCESS)
  849. try:
  850. topology.master1.getEntry(DST_ENTRY_DN, ldap.SCOPE_BASE, "objectclass=*")
  851. assert False
  852. except Exception as e:
  853. topology.master1.log.info("The entry was not renamed (expected)")
  854. isinstance(e, ldap.NO_SUCH_OBJECT)
  855. _bind_manager(topology)
  856. if __name__ == '__main__':
  857. # Run isolated
  858. # -s for DEBUG mode
  859. CURRENT_FILE = os.path.realpath(__file__)
  860. pytest.main("-s %s" % CURRENT_FILE)