parallel_test.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import unittest
  2. from threading import Lock
  3. import six
  4. from docker.errors import APIError
  5. from compose.parallel import GlobalLimit
  6. from compose.parallel import parallel_execute
  7. from compose.parallel import parallel_execute_iter
  8. from compose.parallel import ParallelStreamWriter
  9. from compose.parallel import UpstreamError
  10. web = 'web'
  11. db = 'db'
  12. data_volume = 'data_volume'
  13. cache = 'cache'
  14. objects = [web, db, data_volume, cache]
  15. deps = {
  16. web: [db, cache],
  17. db: [data_volume],
  18. data_volume: [],
  19. cache: [],
  20. }
  21. def get_deps(obj):
  22. return [(dep, None) for dep in deps[obj]]
  23. class ParallelTest(unittest.TestCase):
  24. def test_parallel_execute(self):
  25. results, errors = parallel_execute(
  26. objects=[1, 2, 3, 4, 5],
  27. func=lambda x: x * 2,
  28. get_name=six.text_type,
  29. msg="Doubling",
  30. )
  31. assert sorted(results) == [2, 4, 6, 8, 10]
  32. assert errors == {}
  33. def test_parallel_execute_with_limit(self):
  34. limit = 1
  35. tasks = 20
  36. lock = Lock()
  37. def f(obj):
  38. locked = lock.acquire(False)
  39. # we should always get the lock because we're the only thread running
  40. assert locked
  41. lock.release()
  42. return None
  43. results, errors = parallel_execute(
  44. objects=list(range(tasks)),
  45. func=f,
  46. get_name=six.text_type,
  47. msg="Testing",
  48. limit=limit,
  49. )
  50. assert results == tasks * [None]
  51. assert errors == {}
  52. def test_parallel_execute_with_global_limit(self):
  53. GlobalLimit.set_global_limit(1)
  54. self.addCleanup(GlobalLimit.set_global_limit, None)
  55. tasks = 20
  56. lock = Lock()
  57. def f(obj):
  58. locked = lock.acquire(False)
  59. # we should always get the lock because we're the only thread running
  60. assert locked
  61. lock.release()
  62. return None
  63. results, errors = parallel_execute(
  64. objects=list(range(tasks)),
  65. func=f,
  66. get_name=six.text_type,
  67. msg="Testing",
  68. )
  69. assert results == tasks * [None]
  70. assert errors == {}
  71. def test_parallel_execute_with_deps(self):
  72. log = []
  73. def process(x):
  74. log.append(x)
  75. parallel_execute(
  76. objects=objects,
  77. func=process,
  78. get_name=lambda obj: obj,
  79. msg="Processing",
  80. get_deps=get_deps,
  81. )
  82. assert sorted(log) == sorted(objects)
  83. assert log.index(data_volume) < log.index(db)
  84. assert log.index(db) < log.index(web)
  85. assert log.index(cache) < log.index(web)
  86. def test_parallel_execute_with_upstream_errors(self):
  87. log = []
  88. def process(x):
  89. if x is data_volume:
  90. raise APIError(None, None, "Something went wrong")
  91. log.append(x)
  92. parallel_execute(
  93. objects=objects,
  94. func=process,
  95. get_name=lambda obj: obj,
  96. msg="Processing",
  97. get_deps=get_deps,
  98. )
  99. assert log == [cache]
  100. events = [
  101. (obj, result, type(exception))
  102. for obj, result, exception
  103. in parallel_execute_iter(objects, process, get_deps, None)
  104. ]
  105. assert (cache, None, type(None)) in events
  106. assert (data_volume, None, APIError) in events
  107. assert (db, None, UpstreamError) in events
  108. assert (web, None, UpstreamError) in events
  109. def test_parallel_execute_alignment(capsys):
  110. ParallelStreamWriter.instance = None
  111. results, errors = parallel_execute(
  112. objects=["short", "a very long name"],
  113. func=lambda x: x,
  114. get_name=six.text_type,
  115. msg="Aligning",
  116. )
  117. assert errors == {}
  118. _, err = capsys.readouterr()
  119. a, b = err.split('\n')[:2]
  120. assert a.index('...') == b.index('...')
  121. def test_parallel_execute_ansi(capsys):
  122. ParallelStreamWriter.instance = None
  123. ParallelStreamWriter.set_noansi(value=False)
  124. results, errors = parallel_execute(
  125. objects=["something", "something more"],
  126. func=lambda x: x,
  127. get_name=six.text_type,
  128. msg="Control characters",
  129. )
  130. assert errors == {}
  131. _, err = capsys.readouterr()
  132. assert "\x1b" in err
  133. def test_parallel_execute_noansi(capsys):
  134. ParallelStreamWriter.instance = None
  135. ParallelStreamWriter.set_noansi()
  136. results, errors = parallel_execute(
  137. objects=["something", "something more"],
  138. func=lambda x: x,
  139. get_name=six.text_type,
  140. msg="Control characters",
  141. )
  142. assert errors == {}
  143. _, err = capsys.readouterr()
  144. assert "\x1b" not in err