regression_test.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # --- BEGIN COPYRIGHT BLOCK ---
  2. # Copyright (C) 2020 Red Hat, Inc.
  3. # All rights reserved.
  4. #
  5. # License: GPL (version 3 or any later version).
  6. # See LICENSE for details.
  7. # --- END COPYRIGHT BLOCK ---
  8. #
  9. import os
  10. import pytest
  11. from lib389.tasks import *
  12. from lib389.utils import *
  13. from lib389.topologies import topology_m2
  14. from lib389._constants import *
  15. from lib389.replica import ReplicationManager
  16. pytestmark = [pytest.mark.tier1,
  17. pytest.mark.skipif(ds_is_older('1.3.5'), reason="Not implemented")]
  18. logging.getLogger(__name__).setLevel(logging.DEBUG)
  19. log = logging.getLogger(__name__)
  20. ISSUER = 'cn=CAcert'
  21. CACERT = 'CAcertificate'
  22. M1SERVERCERT = 'Server-Cert1'
  23. M2SERVERCERT = 'Server-Cert2'
  24. M1LDAPSPORT = '41636'
  25. M2LDAPSPORT = '42636'
  26. M1SUBJECT = 'CN=' + os.uname()[1] + ',OU=389 Directory Server'
  27. M2SUBJECT = 'CN=' + os.uname()[1] + ',OU=390 Directory Server'
  28. def add_entry(server, name, rdntmpl, start, num):
  29. log.info("\n######################### Adding %d entries to %s ######################\n" % (num, name))
  30. for i in range(num):
  31. ii = start + i
  32. dn = '%s%d,%s' % (rdntmpl, ii, DEFAULT_SUFFIX)
  33. server.add_s(Entry((dn, {'objectclass': 'top person extensibleObject'.split(),
  34. 'uid': '%s%d' % (rdntmpl, ii),
  35. 'cn': '%s user%d' % (name, ii),
  36. 'sn': 'user%d' % (ii)})))
  37. def check_pems(confdir, mycacert, myservercert, myserverkey, notexist):
  38. log.info("\n######################### Check PEM files (%s, %s, %s)%s in %s ######################\n"
  39. % (mycacert, myservercert, myserverkey, notexist, confdir))
  40. global cacert
  41. cacert = f"{mycacert}.pem"
  42. if os.path.isfile(cacert):
  43. if notexist == "":
  44. log.info('%s is successfully generated.' % cacert)
  45. else:
  46. log.info('%s is incorrecly generated.' % cacert)
  47. assert False
  48. else:
  49. if notexist == "":
  50. log.fatal('%s is not generated.' % cacert)
  51. assert False
  52. else:
  53. log.info('%s is correctly not generated.' % cacert)
  54. servercert = f"{myservercert}.pem"
  55. if os.path.isfile(servercert):
  56. if notexist == "":
  57. log.info('%s is successfully generated.' % servercert)
  58. else:
  59. log.info('%s is incorrecly generated.' % servercert)
  60. assert False
  61. else:
  62. if notexist == "":
  63. log.fatal('%s was not generated.' % servercert)
  64. assert False
  65. else:
  66. log.info('%s is correctly not generated.' % servercert)
  67. serverkey = f"{myserverkey}.pem"
  68. if os.path.isfile(serverkey):
  69. if notexist == "":
  70. log.info('%s is successfully generated.' % serverkey)
  71. else:
  72. log.info('%s is incorrectly generated.' % serverkey)
  73. assert False
  74. else:
  75. if notexist == "":
  76. log.fatal('%s was not generated.' % serverkey)
  77. assert False
  78. else:
  79. log.info('%s is correctly not generated.' % serverkey)
  80. def relocate_pem_files(topology_m2):
  81. log.info("######################### Relocate PEM files on master1 ######################")
  82. certdir_prefix = "/dev/shm"
  83. mycacert = os.path.join(certdir_prefix, "MyCA")
  84. topology_m2.ms["master1"].encryption.set('CACertExtractFile', mycacert)
  85. myservercert = os.path.join(certdir_prefix, "MyServerCert1")
  86. myserverkey = os.path.join(certdir_prefix, "MyServerKey1")
  87. topology_m2.ms["master1"].rsa.apply_mods([(ldap.MOD_REPLACE, 'ServerCertExtractFile', myservercert),
  88. (ldap.MOD_REPLACE, 'ServerKeyExtractFile', myserverkey)])
  89. log.info("##### restart master1")
  90. topology_m2.ms["master1"].restart()
  91. check_pems(certdir_prefix, mycacert, myservercert, myserverkey, "")
  92. @pytest.mark.ds47536
  93. def test_openldap_no_nss_crypto(topology_m2):
  94. """Check that we allow usage of OpenLDAP libraries
  95. that don't use NSS for crypto
  96. :id: 0a622f3d-8ba5-4df2-a1de-1fb2237da40a
  97. :setup: Replication with two masters:
  98. master_1 ----- startTLS -----> master_2;
  99. master_1 <-- TLS_clientAuth -- master_2;
  100. nsslapd-extract-pemfiles set to 'on' on both masters
  101. without specifying cert names
  102. :steps:
  103. 1. Add 5 users to master 1 and 2
  104. 2. Check that the users were successfully replicated
  105. 3. Relocate PEM files on master 1
  106. 4. Check PEM files in master 1 config directory
  107. 5. Add 5 users more to master 1 and 2
  108. 6. Check that the users were successfully replicated
  109. 7. Export userRoot on master 1
  110. :expectedresults:
  111. 1. Users should be successfully added
  112. 2. Users should be successfully replicated
  113. 3. Operation should be successful
  114. 4. PEM files should be found
  115. 5. Users should be successfully added
  116. 6. Users should be successfully replicated
  117. 7. Operation should be successful
  118. """
  119. log.info("Ticket 47536 - Allow usage of OpenLDAP libraries that don't use NSS for crypto")
  120. m1 = topology_m2.ms["master1"]
  121. m2 = topology_m2.ms["master2"]
  122. [i.enable_tls() for i in topology_m2]
  123. repl = ReplicationManager(DEFAULT_SUFFIX)
  124. repl.test_replication(m1, m2)
  125. add_entry(m1, 'master1', 'uid=m1user', 0, 5)
  126. add_entry(m2, 'master2', 'uid=m2user', 0, 5)
  127. repl.wait_for_replication(m1, m2)
  128. repl.wait_for_replication(m2, m1)
  129. log.info('##### Searching for entries on master1...')
  130. entries = m1.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, '(uid=*)')
  131. assert 11 == len(entries)
  132. log.info('##### Searching for entries on master2...')
  133. entries = m2.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, '(uid=*)')
  134. assert 11 == len(entries)
  135. relocate_pem_files(topology_m2)
  136. add_entry(m1, 'master1', 'uid=m1user', 10, 5)
  137. add_entry(m2, 'master2', 'uid=m2user', 10, 5)
  138. repl.wait_for_replication(m1, m2)
  139. repl.wait_for_replication(m2, m1)
  140. log.info('##### Searching for entries on master1...')
  141. entries = m1.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, '(uid=*)')
  142. assert 21 == len(entries)
  143. log.info('##### Searching for entries on master2...')
  144. entries = m2.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, '(uid=*)')
  145. assert 21 == len(entries)
  146. output_file = os.path.join(m1.get_ldif_dir(), "master1.ldif")
  147. m1.tasks.exportLDIF(benamebase='userRoot', output_file=output_file, args={'wait': True})
  148. log.info("Ticket 47536 - PASSED")
  149. if __name__ == '__main__':
  150. # Run isolated
  151. # -s for DEBUG mode
  152. CURRENT_FILE = os.path.realpath(__file__)
  153. pytest.main("-s %s" % CURRENT_FILE)