ticket48013_test.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2016 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import sys
  11. import time
  12. import ldap
  13. import logging
  14. import pytest
  15. import pyasn1
  16. import pyasn1_modules
  17. import ldap,ldapurl
  18. from ldap.ldapobject import SimpleLDAPObject
  19. from ldap.syncrepl import SyncreplConsumer
  20. from lib389 import DirSrv, Entry, tools, tasks
  21. from lib389.tools import DirSrvTools
  22. from lib389._constants import *
  23. from lib389.properties import *
  24. from lib389.tasks import *
  25. from lib389.utils import *
  26. logging.getLogger(__name__).setLevel(logging.DEBUG)
  27. log = logging.getLogger(__name__)
  28. installation1_prefix = None
  29. class TopologyStandalone(object):
  30. def __init__(self, standalone):
  31. standalone.open()
  32. self.standalone = standalone
  33. class SyncObject(SimpleLDAPObject, SyncreplConsumer):
  34. def __init__(self, uri):
  35. # Init the ldap connection
  36. SimpleLDAPObject.__init__(self, uri)
  37. def sync_search(self, test_cookie):
  38. self.syncrepl_search('dc=example,dc=com', ldap.SCOPE_SUBTREE,
  39. filterstr='(objectclass=*)', mode='refreshOnly',
  40. cookie=test_cookie)
  41. def poll(self):
  42. self.syncrepl_poll(all=1)
  43. @pytest.fixture(scope="module")
  44. def topology(request):
  45. global installation1_prefix
  46. if installation1_prefix:
  47. args_instance[SER_DEPLOYED_DIR] = installation1_prefix
  48. # Creating standalone instance ...
  49. standalone = DirSrv(verbose=False)
  50. args_instance[SER_HOST] = HOST_STANDALONE
  51. args_instance[SER_PORT] = PORT_STANDALONE
  52. args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
  53. args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
  54. args_standalone = args_instance.copy()
  55. standalone.allocate(args_standalone)
  56. instance_standalone = standalone.exists()
  57. if instance_standalone:
  58. standalone.delete()
  59. standalone.create()
  60. standalone.open()
  61. # Clear out the tmp dir
  62. standalone.clearTmpDir(__file__)
  63. return TopologyStandalone(standalone)
  64. def test_ticket48013(topology):
  65. '''
  66. Content Synchonization: Test that invalid cookies are caught
  67. '''
  68. cookies = ('#', '##', 'a#a#a', 'a#a#1')
  69. # Enable dynamic plugins
  70. try:
  71. topology.standalone.modify_s(DN_CONFIG, [(ldap.MOD_REPLACE, 'nsslapd-dynamic-plugins', 'on')])
  72. except ldap.LDAPError as e:
  73. ldap.error('Failed to enable dynamic plugin!' + e.message['desc'])
  74. assert False
  75. # Enable retro changelog
  76. topology.standalone.plugins.enable(name=PLUGIN_RETRO_CHANGELOG)
  77. # Enbale content sync plugin
  78. topology.standalone.plugins.enable(name=PLUGIN_REPL_SYNC)
  79. # Set everything up
  80. ldap_url = ldapurl.LDAPUrl('ldap://%s:%s' % (HOST_STANDALONE,
  81. PORT_STANDALONE))
  82. ldap_connection = SyncObject(ldap_url.initializeUrl())
  83. # Authenticate
  84. try:
  85. ldap_connection.simple_bind_s(DN_DM, PASSWORD)
  86. except ldap.LDAPError as e:
  87. print('Login to LDAP server failed: %s' % e.message['desc'])
  88. assert False
  89. # Test invalid cookies
  90. for invalid_cookie in cookies:
  91. log.info('Testing cookie: %s' % invalid_cookie)
  92. try:
  93. ldap_connection.sync_search(invalid_cookie)
  94. ldap_connection.poll()
  95. log.fatal('Invalid cookie accepted!')
  96. assert False
  97. except Exception as e:
  98. log.info('Invalid cookie correctly rejected: %s' % e.message['info'])
  99. pass
  100. # Success
  101. log.info('Test complete')
  102. def test_ticket48013_final(topology):
  103. topology.standalone.delete()
  104. log.info('Testcase PASSED')
  105. def run_isolated():
  106. global installation1_prefix
  107. installation1_prefix = None
  108. topo = topology(True)
  109. test_ticket48013(topo)
  110. test_ticket48013_final(topo)
  111. if __name__ == '__main__':
  112. run_isolated()