ticket48013_test.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import os
  2. import sys
  3. import time
  4. import ldap
  5. import logging
  6. import pytest
  7. import pyasn1
  8. import pyasn1_modules
  9. import ldap,ldapurl
  10. from ldap.ldapobject import SimpleLDAPObject
  11. from ldap.syncrepl import SyncreplConsumer
  12. from lib389 import DirSrv, Entry, tools, tasks
  13. from lib389.tools import DirSrvTools
  14. from lib389._constants import *
  15. from lib389.properties import *
  16. from lib389.tasks import *
  17. from lib389.utils import *
  18. logging.getLogger(__name__).setLevel(logging.DEBUG)
  19. log = logging.getLogger(__name__)
  20. installation1_prefix = None
  21. class TopologyStandalone(object):
  22. def __init__(self, standalone):
  23. standalone.open()
  24. self.standalone = standalone
  25. class SyncObject(SimpleLDAPObject, SyncreplConsumer):
  26. def __init__(self, uri):
  27. # Init the ldap connection
  28. SimpleLDAPObject.__init__(self, uri)
  29. def sync_search(self, test_cookie):
  30. self.syncrepl_search('dc=example,dc=com', ldap.SCOPE_SUBTREE,
  31. filterstr='(objectclass=*)', mode='refreshOnly',
  32. cookie=test_cookie)
  33. def poll(self):
  34. self.syncrepl_poll(all=1)
  35. @pytest.fixture(scope="module")
  36. def topology(request):
  37. global installation1_prefix
  38. if installation1_prefix:
  39. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  40. # Creating standalone instance ...
  41. standalone = DirSrv(verbose=False)
  42. args_instance[SER_HOST] = HOST_STANDALONE
  43. args_instance[SER_PORT] = PORT_STANDALONE
  44. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  45. args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
  46. args_standalone = args_instance.copy()
  47. standalone.allocate(args_standalone)
  48. instance_standalone = standalone.exists()
  49. if instance_standalone:
  50. standalone.delete()
  51. standalone.create()
  52. standalone.open()
  53. # Clear out the tmp dir
  54. standalone.clearTmpDir(__file__)
  55. return TopologyStandalone(standalone)
  56. def test_ticket48013(topology):
  57. '''
  58. Content Synchonization: Test that invalid cookies are caught
  59. '''
  60. cookies = ('#', '##', 'a#a#a', 'a#a#1')
  61. # Enable dynamic plugins
  62. try:
  63. topology.standalone.modify_s(DN_CONFIG, [(ldap.MOD_REPLACE, 'nsslapd-dynamic-plugins', 'on')])
  64. except ldap.LDAPError as e:
  65. ldap.error('Failed to enable dynamic plugin!' + e.message['desc'])
  66. assert False
  67. # Enable retro changelog
  68. topology.standalone.plugins.enable(name=PLUGIN_RETRO_CHANGELOG)
  69. # Enbale content sync plugin
  70. topology.standalone.plugins.enable(name=PLUGIN_REPL_SYNC)
  71. # Set everything up
  72. ldap_url = ldapurl.LDAPUrl('ldap://localhost:31389')
  73. ldap_connection = SyncObject(ldap_url.initializeUrl())
  74. # Authenticate
  75. try:
  76. ldap_connection.simple_bind_s(DN_DM, PASSWORD)
  77. except ldap.LDAPError as e:
  78. print('Login to LDAP server failed: %s' % e.message['desc'])
  79. assert False
  80. # Test invalid cookies
  81. for invalid_cookie in cookies:
  82. log.info('Testing cookie: %s' % invalid_cookie)
  83. try:
  84. ldap_connection.sync_search(invalid_cookie)
  85. ldap_connection.poll()
  86. log.fatal('Invalid cookie accepted!')
  87. assert False
  88. except Exception as e:
  89. log.info('Invalid cookie correctly rejected: %s' % e.message['info'])
  90. pass
  91. # Success
  92. log.info('Test complete')
  93. def test_ticket48013_final(topology):
  94. topology.standalone.delete()
  95. log.info('Testcase PASSED')
  96. def run_isolated():
  97. global installation1_prefix
  98. installation1_prefix = None
  99. topo = topology(True)
  100. test_ticket48013(topo)
  101. test_ticket48013_final(topo)
  102. if __name__ == '__main__':
  103. run_isolated()