parallel_test.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import unittest
  4. from threading import Lock
  5. import six
  6. from docker.errors import APIError
  7. from compose.parallel import GlobalLimit
  8. from compose.parallel import parallel_execute
  9. from compose.parallel import parallel_execute_iter
  10. from compose.parallel import ParallelStreamWriter
  11. from compose.parallel import UpstreamError
  12. web = 'web'
  13. db = 'db'
  14. data_volume = 'data_volume'
  15. cache = 'cache'
  16. objects = [web, db, data_volume, cache]
  17. deps = {
  18. web: [db, cache],
  19. db: [data_volume],
  20. data_volume: [],
  21. cache: [],
  22. }
  23. def get_deps(obj):
  24. return [(dep, None) for dep in deps[obj]]
  25. class ParallelTest(unittest.TestCase):
  26. def test_parallel_execute(self):
  27. results, errors = parallel_execute(
  28. objects=[1, 2, 3, 4, 5],
  29. func=lambda x: x * 2,
  30. get_name=six.text_type,
  31. msg="Doubling",
  32. )
  33. assert sorted(results) == [2, 4, 6, 8, 10]
  34. assert errors == {}
  35. def test_parallel_execute_with_limit(self):
  36. limit = 1
  37. tasks = 20
  38. lock = Lock()
  39. def f(obj):
  40. locked = lock.acquire(False)
  41. # we should always get the lock because we're the only thread running
  42. assert locked
  43. lock.release()
  44. return None
  45. results, errors = parallel_execute(
  46. objects=list(range(tasks)),
  47. func=f,
  48. get_name=six.text_type,
  49. msg="Testing",
  50. limit=limit,
  51. )
  52. assert results == tasks * [None]
  53. assert errors == {}
  54. def test_parallel_execute_with_global_limit(self):
  55. GlobalLimit.set_global_limit(1)
  56. self.addCleanup(GlobalLimit.set_global_limit, None)
  57. tasks = 20
  58. lock = Lock()
  59. def f(obj):
  60. locked = lock.acquire(False)
  61. # we should always get the lock because we're the only thread running
  62. assert locked
  63. lock.release()
  64. return None
  65. results, errors = parallel_execute(
  66. objects=list(range(tasks)),
  67. func=f,
  68. get_name=six.text_type,
  69. msg="Testing",
  70. )
  71. assert results == tasks * [None]
  72. assert errors == {}
  73. def test_parallel_execute_with_deps(self):
  74. log = []
  75. def process(x):
  76. log.append(x)
  77. parallel_execute(
  78. objects=objects,
  79. func=process,
  80. get_name=lambda obj: obj,
  81. msg="Processing",
  82. get_deps=get_deps,
  83. )
  84. assert sorted(log) == sorted(objects)
  85. assert log.index(data_volume) < log.index(db)
  86. assert log.index(db) < log.index(web)
  87. assert log.index(cache) < log.index(web)
  88. def test_parallel_execute_with_upstream_errors(self):
  89. log = []
  90. def process(x):
  91. if x is data_volume:
  92. raise APIError(None, None, "Something went wrong")
  93. log.append(x)
  94. parallel_execute(
  95. objects=objects,
  96. func=process,
  97. get_name=lambda obj: obj,
  98. msg="Processing",
  99. get_deps=get_deps,
  100. )
  101. assert log == [cache]
  102. events = [
  103. (obj, result, type(exception))
  104. for obj, result, exception
  105. in parallel_execute_iter(objects, process, get_deps, None)
  106. ]
  107. assert (cache, None, type(None)) in events
  108. assert (data_volume, None, APIError) in events
  109. assert (db, None, UpstreamError) in events
  110. assert (web, None, UpstreamError) in events
  111. def test_parallel_execute_alignment(capsys):
  112. ParallelStreamWriter.instance = None
  113. results, errors = parallel_execute(
  114. objects=["short", "a very long name"],
  115. func=lambda x: x,
  116. get_name=six.text_type,
  117. msg="Aligning",
  118. )
  119. assert errors == {}
  120. _, err = capsys.readouterr()
  121. a, b = err.split('\n')[:2]
  122. assert a.index('...') == b.index('...')
  123. def test_parallel_execute_ansi(capsys):
  124. ParallelStreamWriter.instance = None
  125. ParallelStreamWriter.set_noansi(value=False)
  126. results, errors = parallel_execute(
  127. objects=["something", "something more"],
  128. func=lambda x: x,
  129. get_name=six.text_type,
  130. msg="Control characters",
  131. )
  132. assert errors == {}
  133. _, err = capsys.readouterr()
  134. assert "\x1b" in err
  135. def test_parallel_execute_noansi(capsys):
  136. ParallelStreamWriter.instance = None
  137. ParallelStreamWriter.set_noansi()
  138. results, errors = parallel_execute(
  139. objects=["something", "something more"],
  140. func=lambda x: x,
  141. get_name=six.text_type,
  142. msg="Control characters",
  143. )
  144. assert errors == {}
  145. _, err = capsys.readouterr()
  146. assert "\x1b" not in err