psearch_test.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2020 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import pytest
  10. from lib389._constants import DEFAULT_SUFFIX
  11. from lib389.topologies import topology_st
  12. from lib389.idm.group import Groups
  13. import ldap
  14. from ldap.controls.psearch import PersistentSearchControl,EntryChangeNotificationControl
  15. pytestmark = pytest.mark.tier1
  16. def _run_psearch(inst, msg_id):
  17. """Run a search with EntryChangeNotificationControl"""
  18. results = []
  19. while True:
  20. try:
  21. _, data, _, _, _, _ = inst.result4(msgid=msg_id, all=0, timeout=1.0, add_ctrls=1, add_intermediates=1,
  22. resp_ctrl_classes={EntryChangeNotificationControl.controlType:EntryChangeNotificationControl})
  23. # See if there are any entry changes
  24. for dn, entry, srv_ctrls in data:
  25. ecn_ctrls = filter(lambda c: c.controlType == EntryChangeNotificationControl.controlType, srv_ctrls)
  26. if ecn_ctrls:
  27. inst.log.info('%s has changed!' % dn)
  28. results.append(dn)
  29. except ldap.TIMEOUT:
  30. # There are no more results, so we timeout.
  31. inst.log.info('No more results')
  32. return results
  33. def test_psearch(topology_st):
  34. """Check basic Persistent Search control functionality
  35. :id: 4b395ef4-c3ff-49d1-a680-b9fdffa633bd
  36. :setup: Standalone instance
  37. :steps:
  38. 1. Run an extended search with a Persistent Search control
  39. 2. Create a new group (could be any entry)
  40. 3. Run an extended search with a Persistent Search control again
  41. 4. Check that entry DN is in the result
  42. :expectedresults:
  43. 1. Operation should be successful
  44. 2. Group should be successfully created
  45. 3. Operation should be successful
  46. 4. Entry DN should be in the result
  47. """
  48. # Create the search control
  49. psc = PersistentSearchControl()
  50. # do a search extended with the control
  51. msg_id = topology_st.standalone.search_ext(base=DEFAULT_SUFFIX, scope=ldap.SCOPE_SUBTREE, attrlist=['*'], serverctrls=[psc])
  52. # Get the result for the message id with result4
  53. _run_psearch(topology_st.standalone, msg_id)
  54. # Change an entry / add one
  55. groups = Groups(topology_st.standalone, DEFAULT_SUFFIX)
  56. group = groups.create(properties={'cn': 'group1', 'description': 'testgroup'})
  57. # Now run the result again and see what's there.
  58. results = _run_psearch(topology_st.standalone, msg_id)
  59. # assert our group is in the changeset.
  60. assert(group.dn.lower() == results[0])
  61. if __name__ == '__main__':
  62. # Run isolated
  63. # -s for DEBUG mode
  64. CURRENT_FILE = os.path.realpath(__file__)
  65. pytest.main("-s %s" % CURRENT_FILE)