parallel_test.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import os
  4. from threading import Lock
  5. import six
  6. from docker.errors import APIError
  7. from compose.parallel import get_configured_limit
  8. from compose.parallel import parallel_execute
  9. from compose.parallel import parallel_execute_iter
  10. from compose.parallel import ParallelStreamWriter
  11. from compose.parallel import UpstreamError
  12. web = 'web'
  13. db = 'db'
  14. data_volume = 'data_volume'
  15. cache = 'cache'
  16. objects = [web, db, data_volume, cache]
  17. deps = {
  18. web: [db, cache],
  19. db: [data_volume],
  20. data_volume: [],
  21. cache: [],
  22. }
  23. def get_deps(obj):
  24. return [(dep, None) for dep in deps[obj]]
  25. def test_parallel_execute():
  26. results, errors = parallel_execute(
  27. objects=[1, 2, 3, 4, 5],
  28. func=lambda x: x * 2,
  29. get_name=six.text_type,
  30. msg="Doubling",
  31. )
  32. assert sorted(results) == [2, 4, 6, 8, 10]
  33. assert errors == {}
  34. def test_parallel_execute_with_limit():
  35. limit = 1
  36. tasks = 20
  37. lock = Lock()
  38. def f(obj):
  39. locked = lock.acquire(False)
  40. # we should always get the lock because we're the only thread running
  41. assert locked
  42. lock.release()
  43. return None
  44. results, errors = parallel_execute(
  45. objects=list(range(tasks)),
  46. func=f,
  47. get_name=six.text_type,
  48. msg="Testing",
  49. limit=limit,
  50. )
  51. assert results == tasks * [None]
  52. assert errors == {}
  53. def test_parallel_execute_with_global_limit():
  54. os.environ['COMPOSE_PARALLEL_LIMIT'] = '1'
  55. tasks = 20
  56. lock = Lock()
  57. assert get_configured_limit() == 1
  58. def f(obj):
  59. locked = lock.acquire(False)
  60. # we should always get the lock because we're the only thread running
  61. assert locked
  62. lock.release()
  63. return None
  64. results, errors = parallel_execute(
  65. objects=list(range(tasks)),
  66. func=f,
  67. get_name=six.text_type,
  68. msg="Testing",
  69. )
  70. assert results == tasks * [None]
  71. assert errors == {}
  72. def test_parallel_execute_with_deps():
  73. log = []
  74. def process(x):
  75. log.append(x)
  76. parallel_execute(
  77. objects=objects,
  78. func=process,
  79. get_name=lambda obj: obj,
  80. msg="Processing",
  81. get_deps=get_deps,
  82. )
  83. assert sorted(log) == sorted(objects)
  84. assert log.index(data_volume) < log.index(db)
  85. assert log.index(db) < log.index(web)
  86. assert log.index(cache) < log.index(web)
  87. def test_parallel_execute_with_upstream_errors():
  88. log = []
  89. def process(x):
  90. if x is data_volume:
  91. raise APIError(None, None, "Something went wrong")
  92. log.append(x)
  93. parallel_execute(
  94. objects=objects,
  95. func=process,
  96. get_name=lambda obj: obj,
  97. msg="Processing",
  98. get_deps=get_deps,
  99. )
  100. assert log == [cache]
  101. events = [
  102. (obj, result, type(exception))
  103. for obj, result, exception
  104. in parallel_execute_iter(objects, process, get_deps, None)
  105. ]
  106. assert (cache, None, type(None)) in events
  107. assert (data_volume, None, APIError) in events
  108. assert (db, None, UpstreamError) in events
  109. assert (web, None, UpstreamError) in events
  110. def test_parallel_execute_alignment(capsys):
  111. results, errors = parallel_execute(
  112. objects=["short", "a very long name"],
  113. func=lambda x: x,
  114. get_name=six.text_type,
  115. msg="Aligning",
  116. )
  117. assert errors == {}
  118. _, err = capsys.readouterr()
  119. a, b = err.split('\n')[:2]
  120. assert a.index('...') == b.index('...')
  121. def test_parallel_execute_ansi(capsys):
  122. ParallelStreamWriter.set_noansi(value=False)
  123. results, errors = parallel_execute(
  124. objects=["something", "something more"],
  125. func=lambda x: x,
  126. get_name=six.text_type,
  127. msg="Control characters",
  128. )
  129. assert errors == {}
  130. _, err = capsys.readouterr()
  131. assert "\x1b" in err
  132. def test_parallel_execute_noansi(capsys):
  133. ParallelStreamWriter.set_noansi()
  134. results, errors = parallel_execute(
  135. objects=["something", "something more"],
  136. func=lambda x: x,
  137. get_name=six.text_type,
  138. msg="Control characters",
  139. )
  140. assert errors == {}
  141. _, err = capsys.readouterr()
  142. assert "\x1b" not in err