ticket47664_test.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import pytest
  7. from lib389 import DirSrv, Entry, tools, tasks
  8. from lib389.tools import DirSrvTools
  9. from lib389._constants import *
  10. from lib389.properties import *
  11. from lib389.tasks import *
  12. from ldap.controls import SimplePagedResultsControl
  13. from ldap.controls.simple import GetEffectiveRightsControl
  14. log = logging.getLogger(__name__)
  15. installation_prefix = None
  16. MYSUFFIX = 'o=ticket47664.org'
  17. MYSUFFIXBE = 'ticket47664'
  18. _MYLDIF = 'ticket47664.ldif'
  19. SEARCHFILTER = '(objectclass=*)'
  20. class TopologyStandalone(object):
  21. def __init__(self, standalone):
  22. standalone.open()
  23. self.standalone = standalone
  24. @pytest.fixture(scope="module")
  25. def topology(request):
  26. '''
  27. This fixture is used to standalone topology for the 'module'.
  28. '''
  29. global installation_prefix
  30. if installation_prefix:
  31. args_instance[SER_DEPLOYED_DIR] = installation_prefix
  32. standalone = DirSrv(verbose=False)
  33. # Args for the standalone instance
  34. args_instance[SER_HOST] = HOST_STANDALONE
  35. args_instance[SER_PORT] = PORT_STANDALONE
  36. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  37. args_standalone = args_instance.copy()
  38. standalone.allocate(args_standalone)
  39. # Get the status of the instance and restart it if it exists
  40. instance_standalone = standalone.exists()
  41. # Remove the instance
  42. if instance_standalone:
  43. standalone.delete()
  44. # Create the instance
  45. standalone.create()
  46. # Used to retrieve configuration information (dbdir, confdir...)
  47. standalone.open()
  48. # clear the tmp directory
  49. standalone.clearTmpDir(__file__)
  50. # Here we have standalone instance up and running
  51. return TopologyStandalone(standalone)
  52. def test_ticket47664_run(topology):
  53. """
  54. Import 20 entries
  55. Search with Simple Paged Results Control (pagesize = 4) + Get Effective Rights Control (attrs list = ['cn'])
  56. If Get Effective Rights attribute (attributeLevelRights for 'cn') is returned 4 attrs / page AND
  57. the page count == 20/4, then the fix is verified.
  58. """
  59. log.info('Testing Ticket 47664 - paged results control is not working in some cases when we have a subsuffix')
  60. # bind as directory manager
  61. topology.standalone.log.info("Bind as %s" % DN_DM)
  62. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  63. topology.standalone.log.info("\n\n######################### SETUP SUFFIX o=ticket47664.org ######################\n")
  64. topology.standalone.backend.create(MYSUFFIX, {BACKEND_NAME: MYSUFFIXBE})
  65. topology.standalone.mappingtree.create(MYSUFFIX, bename=MYSUFFIXBE)
  66. topology.standalone.log.info("\n\n######################### Generate Test data ######################\n")
  67. # get tmp dir
  68. mytmp = topology.standalone.getDir(__file__, TMP_DIR)
  69. if mytmp is None:
  70. mytmp = "/tmp"
  71. MYLDIF = '%s%s' % (mytmp, _MYLDIF)
  72. os.system('ls %s' % MYLDIF)
  73. os.system('rm -f %s' % MYLDIF)
  74. if hasattr(topology.standalone, 'prefix'):
  75. prefix = topology.standalone.prefix
  76. else:
  77. prefix = None
  78. dbgen_prog = prefix + '/bin/dbgen.pl'
  79. topology.standalone.log.info('dbgen_prog: %s' % dbgen_prog)
  80. os.system('%s -s %s -o %s -n 14' % (dbgen_prog, MYSUFFIX, MYLDIF))
  81. cmdline = 'egrep dn: %s | wc -l' % MYLDIF
  82. p = os.popen(cmdline, "r")
  83. dnnumstr = p.readline()
  84. dnnum = int(dnnumstr)
  85. topology.standalone.log.info("We have %d entries.\n", dnnum)
  86. topology.standalone.log.info("\n\n######################### Import Test data ######################\n")
  87. args = {TASK_WAIT: True}
  88. importTask = Tasks(topology.standalone)
  89. importTask.importLDIF(MYSUFFIX, MYSUFFIXBE, MYLDIF, args)
  90. topology.standalone.log.info("\n\n######################### SEARCH ALL ######################\n")
  91. topology.standalone.log.info("Bind as %s and add the READ/SEARCH SELFDN aci" % DN_DM)
  92. topology.standalone.simple_bind_s(DN_DM, PASSWORD)
  93. entries = topology.standalone.search_s(MYSUFFIX, ldap.SCOPE_SUBTREE, SEARCHFILTER)
  94. topology.standalone.log.info("Returned %d entries.\n", len(entries))
  95. #print entries
  96. assert dnnum == len(entries)
  97. topology.standalone.log.info('%d entries are successfully imported.' % dnnum)
  98. topology.standalone.log.info("\n\n######################### SEARCH WITH SIMPLE PAGED RESULTS CONTROL ######################\n")
  99. page_size = 4
  100. spr_req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
  101. ger_req_ctrl = GetEffectiveRightsControl(True, "dn: " + DN_DM)
  102. known_ldap_resp_ctrls = {
  103. SimplePagedResultsControl.controlType: SimplePagedResultsControl,
  104. }
  105. topology.standalone.log.info("Calling search_ext...")
  106. msgid = topology.standalone.search_ext(MYSUFFIX,
  107. ldap.SCOPE_SUBTREE,
  108. SEARCHFILTER,
  109. ['cn'],
  110. serverctrls=[spr_req_ctrl, ger_req_ctrl])
  111. attrlevelrightscnt = 0
  112. pageddncnt = 0
  113. pages = 0
  114. while True:
  115. pages += 1
  116. topology.standalone.log.info("Getting page %d" % pages)
  117. rtype, rdata, rmsgid, responcectrls = topology.standalone.result3(msgid, resp_ctrl_classes=known_ldap_resp_ctrls)
  118. topology.standalone.log.info("%d results" % len(rdata))
  119. pageddncnt += len(rdata)
  120. topology.standalone.log.info("Results:")
  121. for dn, attrs in rdata:
  122. topology.standalone.log.info("dn: %s" % dn)
  123. topology.standalone.log.info("attributeLevelRights: %s" % attrs['attributeLevelRights'][0])
  124. if attrs['attributeLevelRights'][0] != "":
  125. attrlevelrightscnt += 1
  126. pctrls = [
  127. c for c in responcectrls if c.controlType == SimplePagedResultsControl.controlType
  128. ]
  129. if not pctrls:
  130. topology.standalone.log.info('Warning: Server ignores RFC 2696 control.')
  131. break
  132. if pctrls[0].cookie:
  133. spr_req_ctrl.cookie = pctrls[0].cookie
  134. topology.standalone.log.info("cookie: %s" % spr_req_ctrl.cookie)
  135. msgid = topology.standalone.search_ext(MYSUFFIX,
  136. ldap.SCOPE_SUBTREE,
  137. SEARCHFILTER,
  138. ['cn'],
  139. serverctrls=[spr_req_ctrl, ger_req_ctrl])
  140. else:
  141. topology.standalone.log.info("No cookie")
  142. break
  143. topology.standalone.log.info("Paged result search returned %d entries in %d pages.\n", pageddncnt, pages)
  144. assert dnnum == len(entries)
  145. assert dnnum == attrlevelrightscnt
  146. assert pages == (dnnum / page_size)
  147. topology.standalone.log.info("ticket47664 was successfully verified.")
  148. def test_ticket47664_final(topology):
  149. topology.standalone.delete()
  150. log.info('Testcase PASSED')
  151. def run_isolated():
  152. '''
  153. run_isolated is used to run these test cases independently of a test scheduler (xunit, py.test..)
  154. To run isolated without py.test, you need to
  155. - edit this file and comment '@pytest.fixture' line before 'topology' function.
  156. - set the installation prefix
  157. - run this program
  158. '''
  159. global installation_prefix
  160. installation_prefix = None
  161. topo = topology(True)
  162. test_ticket47664_run(topo)
  163. test_ticket47664_final(topo)
  164. if __name__ == '__main__':
  165. run_isolated()