| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- # --- BEGIN COPYRIGHT BLOCK ---
- # Copyright (C) 2015 Red Hat, Inc.
- # All rights reserved.
- #
- # License: GPL (version 3 or any later version).
- # See LICENSE for details.
- # --- END COPYRIGHT BLOCK ---
- #
- import os
- import sys
- import time
- import ldap
- import logging
- import pytest
- from lib389 import DirSrv, Entry, tools, tasks
- from lib389.tools import DirSrvTools
- from lib389._constants import *
- from lib389.properties import *
- from lib389.tasks import *
- from lib389.utils import *
- logging.getLogger(__name__).setLevel(logging.DEBUG)
- log = logging.getLogger(__name__)
- installation1_prefix = None
- UID_INDEX = 'cn=uid,cn=index,cn=userRoot,cn=ldbm database,cn=plugins,cn=config'
- class TopologyStandalone(object):
- def __init__(self, standalone):
- standalone.open()
- self.standalone = standalone
- @pytest.fixture(scope="module")
- def topology(request):
- global installation1_prefix
- if installation1_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation1_prefix
- # Creating standalone instance ...
- standalone = DirSrv(verbose=False)
- args_instance[SER_HOST] = HOST_STANDALONE
- args_instance[SER_PORT] = PORT_STANDALONE
- args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
- args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
- args_standalone = args_instance.copy()
- standalone.allocate(args_standalone)
- instance_standalone = standalone.exists()
- if instance_standalone:
- standalone.delete()
- standalone.create()
- standalone.open()
- # Clear out the tmp dir
- standalone.clearTmpDir(__file__)
- return TopologyStandalone(standalone)
- def test_ticket48109_0(topology):
- '''
- Set SubStr lengths to cn=uid,cn=index,...
- objectClass: extensibleObject
- nsIndexType: sub
- nsSubStrBegin: 2
- nsSubStrEnd: 2
- '''
- log.info('Test case 0')
- # add substr setting to UID_INDEX
- try:
- topology.standalone.modify_s(UID_INDEX,
- [(ldap.MOD_ADD, 'objectClass', 'extensibleObject'),
- (ldap.MOD_ADD, 'nsIndexType', 'sub'),
- (ldap.MOD_ADD, 'nsSubStrBegin', '2'),
- (ldap.MOD_ADD, 'nsSubStrEnd', '2')])
- except ldap.LDAPError as e:
- log.error('Failed to add substr lengths: error ' + e.message['desc'])
- assert False
- # restart the server to apply the indexing
- topology.standalone.restart(timeout=10)
- # add a test user
- UID = 'auser0'
- USER_DN = 'uid=%s,%s' % (UID, SUFFIX)
- try:
- topology.standalone.add_s(Entry((USER_DN, {
- 'objectclass': 'top person organizationalPerson inetOrgPerson'.split(),
- 'cn': 'a user0',
- 'sn': 'user0',
- 'givenname': 'a',
- 'mail': UID})))
- except ldap.LDAPError as e:
- log.error('Failed to add ' + USER_DN + ': error ' + e.message['desc'])
- assert False
- entries = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, '(uid=a*)')
- assert len(entries) == 1
- # restart the server to check the access log
- topology.standalone.restart(timeout=10)
- cmdline = 'egrep %s %s | egrep "uid=a\*"' % (SUFFIX, topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l0 = p.readline()
- if l0 == "":
- log.error('Search with "(uid=a*)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- #regex = re.compile('\(conn=[0-9]* op=[0-9]*\) SRCH .*')
- regex = re.compile(r'.*\s+(conn=\d+ op=\d+)\s+SRCH .*')
- match = regex.match(l0)
- log.info('match: %s' % match.group(1))
- cmdline = 'egrep "%s" %s | egrep "RESULT"' % (match.group(1), topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l1 = p.readline()
- if l1 == "":
- log.error('Search result of "(uid=a*)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- log.info('l1: %s' % l1)
- regex = re.compile(r'.*nentries=(\d+)\s+.*')
- match = regex.match(l1)
- log.info('match: nentires=%s' % match.group(1))
- if match.group(1) == "0":
- log.error('Entry uid=a* not found.')
- assert False
- else:
- log.info('Entry uid=a* found.')
- regex = re.compile(r'.*(notes=[AU]).*')
- match = regex.match(l1)
- if match:
- log.error('%s - substr index was not used' % match.group(1))
- assert False
- else:
- log.info('Test case 0 - OK - substr index used')
- # clean up substr setting to UID_INDEX
- try:
- topology.standalone.modify_s(UID_INDEX,
- [(ldap.MOD_DELETE, 'objectClass', 'extensibleObject'),
- (ldap.MOD_DELETE, 'nsIndexType', 'sub'),
- (ldap.MOD_DELETE, 'nsSubStrBegin', '2'),
- (ldap.MOD_DELETE, 'nsSubStrEnd', '2')])
- except ldap.LDAPError as e:
- log.error('Failed to delete substr lengths: error ' + e.message['desc'])
- assert False
- def test_ticket48109_1(topology):
- '''
- Set SubStr lengths to cn=uid,cn=index,...
- nsIndexType: sub
- nsMatchingRule: nsSubStrBegin=2
- nsMatchingRule: nsSubStrEnd=2
- '''
- log.info('Test case 1')
- # add substr setting to UID_INDEX
- try:
- topology.standalone.modify_s(UID_INDEX,
- [(ldap.MOD_ADD, 'nsIndexType', 'sub'),
- (ldap.MOD_ADD, 'nsMatchingRule', 'nssubstrbegin=2'),
- (ldap.MOD_ADD, 'nsMatchingRule', 'nssubstrend=2')])
- except ldap.LDAPError as e:
- log.error('Failed to add substr lengths: error ' + e.message['desc'])
- assert False
- # restart the server to apply the indexing
- topology.standalone.restart(timeout=10)
- # add a test user
- UID = 'buser1'
- USER_DN = 'uid=%s,%s' % (UID, SUFFIX)
- try:
- topology.standalone.add_s(Entry((USER_DN, {
- 'objectclass': 'top person organizationalPerson inetOrgPerson'.split(),
- 'cn': 'b user1',
- 'sn': 'user1',
- 'givenname': 'b',
- 'mail': UID})))
- except ldap.LDAPError as e:
- log.error('Failed to add ' + USER_DN + ': error ' + e.message['desc'])
- assert False
- entries = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, '(uid=b*)')
- assert len(entries) == 1
- # restart the server to check the access log
- topology.standalone.restart(timeout=10)
- cmdline = 'egrep %s %s | egrep "uid=b\*"' % (SUFFIX, topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l0 = p.readline()
- if l0 == "":
- log.error('Search with "(uid=b*)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- #regex = re.compile('\(conn=[0-9]* op=[0-9]*\) SRCH .*')
- regex = re.compile(r'.*\s+(conn=\d+ op=\d+)\s+SRCH .*')
- match = regex.match(l0)
- log.info('match: %s' % match.group(1))
- cmdline = 'egrep "%s" %s | egrep "RESULT"' % (match.group(1), topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l1 = p.readline()
- if l1 == "":
- log.error('Search result of "(uid=*b)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- log.info('l1: %s' % l1)
- regex = re.compile(r'.*nentries=(\d+)\s+.*')
- match = regex.match(l1)
- log.info('match: nentires=%s' % match.group(1))
- if match.group(1) == "0":
- log.error('Entry uid=*b not found.')
- assert False
- else:
- log.info('Entry uid=*b found.')
- regex = re.compile(r'.*(notes=[AU]).*')
- match = regex.match(l1)
- if match:
- log.error('%s - substr index was not used' % match.group(1))
- assert False
- else:
- log.info('Test case 1 - OK - substr index used')
- # clean up substr setting to UID_INDEX
- try:
- topology.standalone.modify_s(UID_INDEX,
- [(ldap.MOD_DELETE, 'nsIndexType', 'sub'),
- (ldap.MOD_DELETE, 'nsMatchingRule', 'nssubstrbegin=2'),
- (ldap.MOD_DELETE, 'nsMatchingRule', 'nssubstrend=2')])
- except ldap.LDAPError as e:
- log.error('Failed to delete substr lengths: error ' + e.message['desc'])
- assert False
- def test_ticket48109_2(topology):
- '''
- Set SubStr conflict formats/lengths to cn=uid,cn=index,...
- objectClass: extensibleObject
- nsIndexType: sub
- nsMatchingRule: nsSubStrBegin=3
- nsMatchingRule: nsSubStrEnd=3
- nsSubStrBegin: 2
- nsSubStrEnd: 2
- nsSubStr{Begin,End} are honored.
- '''
- log.info('Test case 2')
- # add substr setting to UID_INDEX
- try:
- topology.standalone.modify_s(UID_INDEX,
- [(ldap.MOD_ADD, 'nsIndexType', 'sub'),
- (ldap.MOD_ADD, 'nsMatchingRule', 'nssubstrbegin=3'),
- (ldap.MOD_ADD, 'nsMatchingRule', 'nssubstrend=3'),
- (ldap.MOD_ADD, 'objectClass', 'extensibleObject'),
- (ldap.MOD_ADD, 'nsSubStrBegin', '2'),
- (ldap.MOD_ADD, 'nsSubStrEnd', '2')])
- except ldap.LDAPError as e:
- log.error('Failed to add substr lengths: error ' + e.message['desc'])
- assert False
- # restart the server to apply the indexing
- topology.standalone.restart(timeout=10)
- # add a test user
- UID = 'cuser2'
- USER_DN = 'uid=%s,%s' % (UID, SUFFIX)
- try:
- topology.standalone.add_s(Entry((USER_DN, {
- 'objectclass': 'top person organizationalPerson inetOrgPerson'.split(),
- 'cn': 'c user2',
- 'sn': 'user2',
- 'givenname': 'c',
- 'mail': UID})))
- except ldap.LDAPError as e:
- log.error('Failed to add ' + USER_DN + ': error ' + e.message['desc'])
- assert False
- entries = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, '(uid=c*)')
- assert len(entries) == 1
- entries = topology.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, '(uid=*2)')
- assert len(entries) == 1
- # restart the server to check the access log
- topology.standalone.restart(timeout=10)
- cmdline = 'egrep %s %s | egrep "uid=c\*"' % (SUFFIX, topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l0 = p.readline()
- if l0 == "":
- log.error('Search with "(uid=c*)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- #regex = re.compile('\(conn=[0-9]* op=[0-9]*\) SRCH .*')
- regex = re.compile(r'.*\s+(conn=\d+ op=\d+)\s+SRCH .*')
- match = regex.match(l0)
- log.info('match: %s' % match.group(1))
- cmdline = 'egrep "%s" %s | egrep "RESULT"' % (match.group(1), topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l1 = p.readline()
- if l1 == "":
- log.error('Search result of "(uid=c*)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- log.info('l1: %s' % l1)
- regex = re.compile(r'.*nentries=(\d+)\s+.*')
- match = regex.match(l1)
- log.info('match: nentires=%s' % match.group(1))
- if match.group(1) == "0":
- log.error('Entry uid=c* not found.')
- assert False
- else:
- log.info('Entry uid=c* found.')
- regex = re.compile(r'.*(notes=[AU]).*')
- match = regex.match(l1)
- if match:
- log.error('%s - substr index was not used' % match.group(1))
- assert False
- else:
- log.info('Test case 2-1 - OK - correct substr index used')
- cmdline = 'egrep %s %s | egrep "uid=\*2"' % (SUFFIX, topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l0 = p.readline()
- if l0 == "":
- log.error('Search with "(uid=*2)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- #regex = re.compile('\(conn=[0-9]* op=[0-9]*\) SRCH .*')
- regex = re.compile(r'.*\s+(conn=\d+ op=\d+)\s+SRCH .*')
- match = regex.match(l0)
- log.info('match: %s' % match.group(1))
- cmdline = 'egrep "%s" %s | egrep "RESULT"' % (match.group(1), topology.standalone.accesslog)
- p = os.popen(cmdline, "r")
- l1 = p.readline()
- if l1 == "":
- log.error('Search result of "(uid=*2)" is not logged in ' + topology.standalone.accesslog)
- assert False
- else:
- log.info('l1: %s' % l1)
- regex = re.compile(r'.*nentries=(\d+)\s+.*')
- match = regex.match(l1)
- log.info('match: nentires=%s' % match.group(1))
- if match.group(1) == "0":
- log.error('Entry uid=*2 not found.')
- assert False
- else:
- log.info('Entry uid=*2 found.')
- regex = re.compile(r'.*(notes=[AU]).*')
- match = regex.match(l1)
- if match:
- log.error('%s - substr index was not used' % match.group(1))
- assert False
- else:
- log.info('Test case 2-2 - OK - correct substr index used')
- # clean up substr setting to UID_INDEX
- try:
- topology.standalone.modify_s(UID_INDEX,
- [(ldap.MOD_DELETE, 'nsIndexType', 'sub'),
- (ldap.MOD_DELETE, 'nsMatchingRule', 'nssubstrbegin=3'),
- (ldap.MOD_DELETE, 'nsMatchingRule', 'nssubstrend=3'),
- (ldap.MOD_DELETE, 'objectClass', 'extensibleObject'),
- (ldap.MOD_DELETE, 'nsSubStrBegin', '2'),
- (ldap.MOD_DELETE, 'nsSubStrEnd', '2')])
- except ldap.LDAPError as e:
- log.error('Failed to delete substr lengths: error ' + e.message['desc'])
- assert False
- log.info('Test complete')
- def test_ticket48109_final(topology):
- topology.standalone.delete()
- log.info('Testcase PASSED')
- def run_isolated():
- global installation1_prefix
- installation1_prefix = None
- topo = topology(True)
- test_ticket48109_0(topo)
- test_ticket48109_1(topo)
- test_ticket48109_2(topo)
- test_ticket48109_final(topo)
- if __name__ == '__main__':
- run_isolated()
|