| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- # --- BEGIN COPYRIGHT BLOCK ---
- # Copyright (C) 2016 Red Hat, Inc.
- # All rights reserved.
- #
- # License: GPL (version 3 or any later version).
- # See LICENSE for details.
- # --- END COPYRIGHT BLOCK ---
- #
- import os
- import sys
- import time
- import ldap
- import logging
- import pytest
- from lib389 import DirSrv, Entry, tools, tasks
- from lib389.tools import DirSrvTools
- from lib389._constants import *
- from lib389.properties import *
- from lib389.tasks import *
- from lib389.utils import *
- logging.getLogger(__name__).setLevel(logging.DEBUG)
- log = logging.getLogger(__name__)
- installation1_prefix = None
- CONFIG_DN = 'cn=config'
- BOU = 'BOU'
- BINDOU = 'ou=%s,%s' % (BOU, DEFAULT_SUFFIX)
- BUID = 'buser123'
- TUID = 'tuser0'
- BINDDN = 'uid=%s,%s' % (BUID, BINDOU)
- BINDPW = BUID
- TESTDN = 'uid=%s,ou=people,%s' % (TUID, DEFAULT_SUFFIX)
- TESTPW = TUID
- BOGUSDN = 'uid=bogus,%s' % DEFAULT_SUFFIX
- BOGUSDN2 = 'uid=bogus,ou=people,%s' % DEFAULT_SUFFIX
- BOGUSSUFFIX = 'uid=bogus,ou=people,dc=bogus'
- GROUPOU = 'ou=groups,%s' % DEFAULT_SUFFIX
- BOGUSOU = 'ou=OU,%s' % DEFAULT_SUFFIX
- logging.getLogger(__name__).setLevel(logging.DEBUG)
- log = logging.getLogger(__name__)
- installation1_prefix = None
- class TopologyStandalone(object):
- def __init__(self, standalone):
- standalone.open()
- self.standalone = standalone
- @pytest.fixture(scope="module")
- def topology(request):
- global installation1_prefix
- if installation1_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation1_prefix
- # Creating standalone instance ...
- standalone = DirSrv(verbose=False)
- args_instance[SER_HOST] = HOST_STANDALONE
- args_instance[SER_PORT] = PORT_STANDALONE
- args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
- args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
- args_standalone = args_instance.copy()
- standalone.allocate(args_standalone)
- instance_standalone = standalone.exists()
- if instance_standalone:
- standalone.delete()
- standalone.create()
- standalone.open()
- # Delete each instance in the end
- def fin():
- standalone.delete()
- request.addfinalizer(fin)
- # Clear out the tmp dir
- standalone.clearTmpDir(__file__)
- return TopologyStandalone(standalone)
- def pattern_accesslog(file, log_pattern):
- try:
- pattern_accesslog.last_pos += 1
- except AttributeError:
- pattern_accesslog.last_pos = 0
- found = None
- file.seek(pattern_accesslog.last_pos)
- # Use a while true iteration because 'for line in file: hit a
- # python bug that break file.tell()
- while True:
- line = file.readline()
- found = log_pattern.search(line)
- if ((line == '') or (found)):
- break
- pattern_accesslog.last_pos = file.tell()
- if found:
- return line
- else:
- return None
- def check_op_result(server, op, dn, superior, exists, rc):
- targetdn = dn
- if op == 'search':
- if exists:
- opstr = 'Searching existing entry'
- else:
- opstr = 'Searching non-existing entry'
- elif op == 'add':
- if exists:
- opstr = 'Adding existing entry'
- else:
- opstr = 'Adding non-existing entry'
- elif op == 'modify':
- if exists:
- opstr = 'Modifying existing entry'
- else:
- opstr = 'Modifying non-existing entry'
- elif op == 'modrdn':
- if superior != None:
- targetdn = superior
- if exists:
- opstr = 'Moving to existing superior'
- else:
- opstr = 'Moving to non-existing superior'
- else:
- if exists:
- opstr = 'Renaming existing entry'
- else:
- opstr = 'Renaming non-existing entry'
- elif op == 'delete':
- if exists:
- opstr = 'Deleting existing entry'
- else:
- opstr = 'Deleting non-existing entry'
- if ldap.SUCCESS == rc:
- expstr = 'be ok'
- else:
- expstr = 'fail with %s' % rc.__name__
- log.info('%s %s, which should %s.' % (opstr, targetdn, expstr))
- hit = 0
- try:
- if op == 'search':
- centry = server.search_s(dn, ldap.SCOPE_BASE, 'objectclass=*')
- elif op == 'add':
- server.add_s(Entry((dn, {'objectclass': 'top extensibleObject'.split(),
- 'cn': 'test entry'})))
- elif op == 'modify':
- server.modify_s(dn, [(ldap.MOD_REPLACE, 'description', 'test')])
- elif op == 'modrdn':
- if superior != None:
- server.rename_s(dn, 'uid=new', newsuperior=superior, delold=1)
- else:
- server.rename_s(dn, 'uid=new', delold=1)
- elif op == 'delete':
- server.delete_s(dn)
- else:
- log.fatal('Unknown operation %s' % op)
- assert False
- except ldap.LDAPError as e:
- hit = 1
- log.info("Exception (expected): %s" % type(e).__name__)
- log.info('Desc ' + e.message['desc'])
- assert isinstance(e, rc)
- if e.message.has_key('matched'):
- log.info('Matched is returned: ' + e.message['matched'])
- if rc != ldap.NO_SUCH_OBJECT:
- assert False
- if ldap.SUCCESS == rc:
- if op == 'search':
- log.info('Search should return none')
- assert len(centry) == 0
- else:
- if 0 == hit:
- log.info('Expected to fail with %s, but passed' % rc.__name__)
- assert False
- log.info('PASSED\n')
- def test_ticket1347760(topology):
- """
- Prevent revealing the entry info to whom has no access rights.
- """
- log.info('Testing Bug 1347760 - Information disclosure via repeated use of LDAP ADD operation, etc.')
- log.info('Disabling accesslog logbuffering')
- topology.standalone.modify_s(CONFIG_DN, [(ldap.MOD_REPLACE, 'nsslapd-accesslog-logbuffering', 'off')])
- log.info('Bind as {%s,%s}' % (DN_DM, PASSWORD))
- topology.standalone.simple_bind_s(DN_DM, PASSWORD)
- log.info('Adding ou=%s a bind user belongs to.' % BOU)
- topology.standalone.add_s(Entry((BINDOU, {
- 'objectclass': 'top organizationalunit'.split(),
- 'ou': BOU})))
- log.info('Adding a bind user.')
- topology.standalone.add_s(Entry((BINDDN,
- {'objectclass': "top person organizationalPerson inetOrgPerson".split(),
- 'cn': 'bind user',
- 'sn': 'user',
- 'userPassword': BINDPW})))
- log.info('Adding a test user.')
- topology.standalone.add_s(Entry((TESTDN,
- {'objectclass': "top person organizationalPerson inetOrgPerson".split(),
- 'cn': 'test user',
- 'sn': 'user',
- 'userPassword': TESTPW})))
- log.info('Deleting aci in %s.' % DEFAULT_SUFFIX)
- topology.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_DELETE, 'aci', None)])
- log.info('Bind case 1. the bind user has no rights to read the entry itself, bind should be successful.')
- log.info('Bind as {%s,%s} who has no access rights.' % (BINDDN, BINDPW))
- try:
- topology.standalone.simple_bind_s(BINDDN, BINDPW)
- except ldap.LDAPError as e:
- log.info('Desc ' + e.message['desc'])
- assert False
- file_path = os.path.join(topology.standalone.prefix, 'var/log/dirsrv/slapd-%s/access' % topology.standalone.serverid)
- file_obj = open(file_path, "r")
- log.info('Access log path: %s' % file_path)
- log.info('Bind case 2-1. the bind user does not exist, bind should fail with error %s' % ldap.INVALID_CREDENTIALS.__name__)
- log.info('Bind as {%s,%s} who does not exist.' % (BOGUSDN, 'bogus'))
- try:
- topology.standalone.simple_bind_s(BOGUSDN, 'bogus')
- except ldap.LDAPError as e:
- log.info("Exception (expected): %s" % type(e).__name__)
- log.info('Desc ' + e.message['desc'])
- assert isinstance(e, ldap.INVALID_CREDENTIALS)
- regex = re.compile('No such entry')
- cause = pattern_accesslog(file_obj, regex)
- if cause == None:
- log.fatal('Cause not found - %s' % cause)
- assert False
- else:
- log.info('Cause found - %s' % cause)
- log.info('Bind case 2-2. the bind user\'s suffix does not exist, bind should fail with error %s' % ldap.INVALID_CREDENTIALS.__name__)
- log.info('Bind as {%s,%s} who does not exist.' % (BOGUSSUFFIX, 'bogus'))
- try:
- topology.standalone.simple_bind_s(BOGUSSUFFIX, 'bogus')
- except ldap.LDAPError as e:
- log.info("Exception (expected): %s" % type(e).__name__)
- log.info('Desc ' + e.message['desc'])
- assert isinstance(e, ldap.INVALID_CREDENTIALS)
- regex = re.compile('No such suffix')
- cause = pattern_accesslog(file_obj, regex)
- if cause == None:
- log.fatal('Cause not found - %s' % cause)
- assert False
- else:
- log.info('Cause found - %s' % cause)
- log.info('Bind case 2-3. the bind user\'s password is wrong, bind should fail with error %s' % ldap.INVALID_CREDENTIALS.__name__)
- log.info('Bind as {%s,%s} who does not exist.' % (BINDDN, 'bogus'))
- try:
- topology.standalone.simple_bind_s(BINDDN, 'bogus')
- except ldap.LDAPError as e:
- log.info("Exception (expected): %s" % type(e).__name__)
- log.info('Desc ' + e.message['desc'])
- assert isinstance(e, ldap.INVALID_CREDENTIALS)
- regex = re.compile('Invalid credentials')
- cause = pattern_accesslog(file_obj, regex)
- if cause == None:
- log.fatal('Cause not found - %s' % cause)
- assert False
- else:
- log.info('Cause found - %s' % cause)
- log.info('Adding aci for %s to %s.' % (BINDDN, BINDOU))
- acival = '(targetattr="*")(version 3.0; acl "%s"; allow(all) userdn = "ldap:///%s";)' % (BUID, BINDDN)
- log.info('aci: %s' % acival)
- log.info('Bind as {%s,%s}' % (DN_DM, PASSWORD))
- topology.standalone.simple_bind_s(DN_DM, PASSWORD)
- topology.standalone.modify_s(BINDOU, [(ldap.MOD_ADD, 'aci', acival)])
- log.info('Bind case 3. the bind user has the right to read the entry itself, bind should be successful.')
- log.info('Bind as {%s,%s} which should be ok.\n' % (BINDDN, BINDPW))
- topology.standalone.simple_bind_s(BINDDN, BINDPW)
- log.info('The following operations are against the subtree the bind user %s has no rights.' % BINDDN)
- # Search
- exists = True
- rc = ldap.SUCCESS
- log.info('Search case 1. the bind user has no rights to read the search entry, it should return no search results with %s' % rc)
- check_op_result(topology.standalone, 'search', TESTDN, None, exists, rc)
- exists = False
- rc = ldap.SUCCESS
- log.info('Search case 2-1. the search entry does not exist, the search should return no search results with %s' % rc.__name__)
- check_op_result(topology.standalone, 'search', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.SUCCESS
- log.info('Search case 2-2. the search entry does not exist, the search should return no search results with %s' % rc.__name__)
- check_op_result(topology.standalone, 'search', BOGUSDN2, None, exists, rc)
- # Add
- exists = True
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Add case 1. the bind user has no rights AND the adding entry exists, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'add', TESTDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Add case 2-1. the bind user has no rights AND the adding entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'add', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Add case 2-2. the bind user has no rights AND the adding entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'add', BOGUSDN2, None, exists, rc)
- # Modify
- exists = True
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modify case 1. the bind user has no rights AND the modifying entry exists, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modify', TESTDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modify case 2-1. the bind user has no rights AND the modifying entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modify', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modify case 2-2. the bind user has no rights AND the modifying entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modify', BOGUSDN2, None, exists, rc)
- # Modrdn
- exists = True
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modrdn case 1. the bind user has no rights AND the renaming entry exists, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', TESTDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modrdn case 2-1. the bind user has no rights AND the renaming entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modrdn case 2-2. the bind user has no rights AND the renaming entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', BOGUSDN2, None, exists, rc)
- exists = True
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modrdn case 3. the bind user has no rights AND the node moving an entry to exists, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', TESTDN, GROUPOU, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modrdn case 4-1. the bind user has no rights AND the node moving an entry to does not, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', TESTDN, BOGUSOU, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Modrdn case 4-2. the bind user has no rights AND the node moving an entry to does not, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', TESTDN, BOGUSOU, exists, rc)
- # Delete
- exists = True
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Delete case 1. the bind user has no rights AND the deleting entry exists, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'delete', TESTDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Delete case 2-1. the bind user has no rights AND the deleting entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'delete', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.INSUFFICIENT_ACCESS
- log.info('Delete case 2-2. the bind user has no rights AND the deleting entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'delete', BOGUSDN2, None, exists, rc)
- log.info('EXTRA: Check no regressions')
- log.info('Adding aci for %s to %s.' % (BINDDN, DEFAULT_SUFFIX))
- acival = '(targetattr="*")(version 3.0; acl "%s-all"; allow(all) userdn = "ldap:///%s";)' % (BUID, BINDDN)
- log.info('Bind as {%s,%s}' % (DN_DM, PASSWORD))
- topology.standalone.simple_bind_s(DN_DM, PASSWORD)
- topology.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', acival)])
- log.info('Bind as {%s,%s}.' % (BINDDN, BINDPW))
- try:
- topology.standalone.simple_bind_s(BINDDN, BINDPW)
- except ldap.LDAPError as e:
- log.info('Desc ' + e.message['desc'])
- assert False
- exists = False
- rc = ldap.NO_SUCH_OBJECT
- log.info('Search case. the search entry does not exist, the search should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'search', BOGUSDN2, None, exists, rc)
- file_obj.close()
- exists = True
- rc = ldap.ALREADY_EXISTS
- log.info('Add case. the adding entry already exists, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'add', TESTDN, None, exists, rc)
- exists = False
- rc = ldap.NO_SUCH_OBJECT
- log.info('Modify case. the modifying entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modify', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.NO_SUCH_OBJECT
- log.info('Modrdn case 1. the renaming entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', BOGUSDN, None, exists, rc)
- exists = False
- rc = ldap.NO_SUCH_OBJECT
- log.info('Modrdn case 2. the node moving an entry to does not, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'modrdn', TESTDN, BOGUSOU, exists, rc)
- exists = False
- rc = ldap.NO_SUCH_OBJECT
- log.info('Delete case. the deleting entry does not exist, it should fail with %s' % rc.__name__)
- check_op_result(topology.standalone, 'delete', BOGUSDN, None, exists, rc)
- log.info('SUCCESS')
- if __name__ == '__main__':
- # Run isolated
- # -s for DEBUG mode
- CURRENT_FILE = os.path.realpath(__file__)
- pytest.main("-s %s" % CURRENT_FILE)
|