| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- # --- BEGIN COPYRIGHT BLOCK ---
- # Copyright (C) 2016 Red Hat, Inc.
- # All rights reserved.
- #
- # License: GPL (version 3 or any later version).
- # See LICENSE for details.
- # --- END COPYRIGHT BLOCK ---
- #
- import logging
- import re
- import ldap.sasl
- import pytest
- from lib389.tasks import *
- from lib389.topologies import topology_st
- log = logging.getLogger(__name__)
- USER_DN = 'uid=user1,%s' % (DEFAULT_SUFFIX)
- SCHEMA_RELOAD_COUNT = 10
- def task_complete(conn, task_dn):
- finished = False
- try:
- task_entry = conn.search_s(task_dn, ldap.SCOPE_BASE, 'objectclass=*')
- if not task_entry:
- log.fatal('wait_for_task: Search failed to find task: ' + task_dn)
- assert False
- if task_entry[0].hasAttr('nstaskexitcode'):
- # task is done
- finished = True
- except ldap.LDAPError as e:
- log.fatal('wait_for_task: Search failed: ' + e.message['desc'])
- assert False
- return finished
- def test_ticket47973(topology_st):
- """
- During the schema reload task there is a small window where the new schema is not loaded
- into the asi hashtables - this results in searches not returning entries.
- """
- log.info('Testing Ticket 47973 - Test the searches still work as expected during schema reload tasks')
- #
- # Add a user
- #
- try:
- topology_st.standalone.add_s(Entry((USER_DN, {
- 'objectclass': 'top extensibleObject'.split(),
- 'uid': 'user1'
- })))
- except ldap.LDAPError as e:
- log.error('Failed to add user1: error ' + e.message['desc'])
- assert False
- #
- # Run a series of schema_reload tasks while searching for our user. Since
- # this is a race condition, run it several times.
- #
- task_count = 0
- while task_count < SCHEMA_RELOAD_COUNT:
- #
- # Add a schema reload task
- #
- TASK_DN = 'cn=task-' + str(task_count) + ',cn=schema reload task, cn=tasks, cn=config'
- try:
- topology_st.standalone.add_s(Entry((TASK_DN, {
- 'objectclass': 'top extensibleObject'.split(),
- 'cn': 'task-' + str(task_count)
- })))
- except ldap.LDAPError as e:
- log.error('Failed to add task entry: error ' + e.message['desc'])
- assert False
- #
- # While we wait for the task to complete keep searching for our user
- #
- search_count = 0
- while search_count < 100:
- #
- # Now check the user is still being returned
- #
- try:
- entries = topology_st.standalone.search_s(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- '(uid=user1)')
- if not entries or not entries[0]:
- log.fatal('User was not returned from search!')
- assert False
- except ldap.LDAPError as e:
- log.fatal('Unable to search for entry %s: error %s' % (USER_DN, e.message['desc']))
- assert False
- #
- # Check if task is complete
- #
- if task_complete(topology_st.standalone, TASK_DN):
- break
- search_count += 1
- task_count += 1
- def test_ticket47973_case(topology_st):
- log.info('Testing Ticket 47973 (case) - Test the cases in the original schema are preserved.')
- log.info('case 1 - Test the cases in the original schema are preserved.')
- tsfile = topology_st.standalone.schemadir + '/98test.ldif'
- tsfd = open(tsfile, "w")
- Mozattr0 = "MoZiLLaaTTRiBuTe"
- testschema = "dn: cn=schema\nattributetypes: ( 8.9.10.11.12.13.14 NAME '" + Mozattr0 + "' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 X-ORIGIN 'Mozilla Dummy Schema' )\nobjectclasses: ( 1.2.3.4.5.6.7 NAME 'MozillaObject' SUP top MUST ( objectclass $ cn ) MAY ( " + Mozattr0 + " ) X-ORIGIN 'user defined' )"
- tsfd.write(testschema)
- tsfd.close()
- try:
- # run the schema reload task with the default schemadir
- topology_st.standalone.tasks.schemaReload(schemadir=topology_st.standalone.schemadir,
- args={TASK_WAIT: False})
- except ValueError:
- log.error('Schema Reload task failed.')
- assert False
- time.sleep(5)
- try:
- schemaentry = topology_st.standalone.search_s("cn=schema", ldap.SCOPE_BASE,
- 'objectclass=top',
- ["objectclasses"])
- oclist = schemaentry[0].data.get("objectclasses")
- except ldap.LDAPError as e:
- log.error('Failed to get schema entry: error (%s)' % e.message['desc'])
- raise e
- found = 0
- for oc in oclist:
- log.info('OC: %s' % oc)
- moz = re.findall(Mozattr0, oc)
- if moz:
- found = 1
- log.info('case 1: %s is in the objectclasses list -- PASS' % Mozattr0)
- if found == 0:
- log.error('case 1: %s is not in the objectclasses list -- FAILURE' % Mozattr0)
- assert False
- log.info('case 2 - Duplicated schema except cases are not loaded.')
- tsfile = topology_st.standalone.schemadir + '/97test.ldif'
- tsfd = open(tsfile, "w")
- Mozattr1 = "MOZILLAATTRIBUTE"
- testschema = "dn: cn=schema\nattributetypes: ( 8.9.10.11.12.13.14 NAME '" + Mozattr1 + "' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 X-ORIGIN 'Mozilla Dummy Schema' )\nobjectclasses: ( 1.2.3.4.5.6.7 NAME 'MozillaObject' SUP top MUST ( objectclass $ cn ) MAY ( " + Mozattr1 + " ) X-ORIGIN 'user defined' )"
- tsfd.write(testschema)
- tsfd.close()
- try:
- # run the schema reload task with the default schemadir
- topology_st.standalone.tasks.schemaReload(schemadir=topology_st.standalone.schemadir,
- args={TASK_WAIT: False})
- except ValueError:
- log.error('Schema Reload task failed.')
- assert False
- time.sleep(5)
- try:
- schemaentry = topology_st.standalone.search_s("cn=schema", ldap.SCOPE_BASE,
- 'objectclass=top',
- ["objectclasses"])
- oclist = schemaentry[0].data.get("objectclasses")
- except ldap.LDAPError as e:
- log.error('Failed to get schema entry: error (%s)' % e.message['desc'])
- raise e
- for oc in oclist:
- log.info('OC: %s' % oc)
- moz = re.findall(Mozattr1, oc)
- if moz:
- log.error('case 2: %s is in the objectclasses list -- FAILURE' % Mozattr1)
- assert False
- log.info('case 2: %s is not in the objectclasses list -- PASS' % Mozattr1)
- Mozattr2 = "mozillaattribute"
- log.info('case 2-1: Use the custom schema with %s' % Mozattr2)
- name = "test user"
- try:
- topology_st.standalone.add_s(Entry(("cn=%s,%s" % (name, SUFFIX), {
- 'objectclass': "top person MozillaObject".split(),
- 'sn': name,
- 'cn': name,
- Mozattr2: name})))
- except ldap.LDAPError as e:
- log.error('Failed to add a test entry: error (%s)' % e.message['desc'])
- raise e
- try:
- testentry = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE,
- 'objectclass=mozillaobject',
- [Mozattr2])
- except ldap.LDAPError as e:
- log.error('Failed to get schema entry: error (%s)' % e.message['desc'])
- raise e
- mozattrval = testentry[0].data.get(Mozattr2)
- if mozattrval[0] == name:
- log.info('case 2-1: %s: %s found-- PASS' % (Mozattr2, name))
- else:
- log.info('case 2-1: %s: %s not found-- FAILURE' % (Mozattr2, mozattrval[0]))
- if __name__ == '__main__':
- # Run isolated
- # -s for DEBUG mode
- CURRENT_FILE = os.path.realpath(__file__)
- pytest.main("-s %s" % CURRENT_FILE)
|