parallel_test.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. from threading import Lock
  4. import six
  5. from docker.errors import APIError
  6. from compose.parallel import parallel_execute
  7. from compose.parallel import parallel_execute_iter
  8. from compose.parallel import ParallelStreamWriter
  9. from compose.parallel import UpstreamError
  10. web = 'web'
  11. db = 'db'
  12. data_volume = 'data_volume'
  13. cache = 'cache'
  14. objects = [web, db, data_volume, cache]
  15. deps = {
  16. web: [db, cache],
  17. db: [data_volume],
  18. data_volume: [],
  19. cache: [],
  20. }
  21. def get_deps(obj):
  22. return [(dep, None) for dep in deps[obj]]
  23. def test_parallel_execute():
  24. results, errors = parallel_execute(
  25. objects=[1, 2, 3, 4, 5],
  26. func=lambda x: x * 2,
  27. get_name=six.text_type,
  28. msg="Doubling",
  29. )
  30. assert sorted(results) == [2, 4, 6, 8, 10]
  31. assert errors == {}
  32. def test_parallel_execute_with_limit():
  33. limit = 1
  34. tasks = 20
  35. lock = Lock()
  36. def f(obj):
  37. locked = lock.acquire(False)
  38. # we should always get the lock because we're the only thread running
  39. assert locked
  40. lock.release()
  41. return None
  42. results, errors = parallel_execute(
  43. objects=list(range(tasks)),
  44. func=f,
  45. get_name=six.text_type,
  46. msg="Testing",
  47. limit=limit,
  48. )
  49. assert results == tasks * [None]
  50. assert errors == {}
  51. def test_parallel_execute_with_deps():
  52. log = []
  53. def process(x):
  54. log.append(x)
  55. parallel_execute(
  56. objects=objects,
  57. func=process,
  58. get_name=lambda obj: obj,
  59. msg="Processing",
  60. get_deps=get_deps,
  61. )
  62. assert sorted(log) == sorted(objects)
  63. assert log.index(data_volume) < log.index(db)
  64. assert log.index(db) < log.index(web)
  65. assert log.index(cache) < log.index(web)
  66. def test_parallel_execute_with_upstream_errors():
  67. log = []
  68. def process(x):
  69. if x is data_volume:
  70. raise APIError(None, None, "Something went wrong")
  71. log.append(x)
  72. parallel_execute(
  73. objects=objects,
  74. func=process,
  75. get_name=lambda obj: obj,
  76. msg="Processing",
  77. get_deps=get_deps,
  78. )
  79. assert log == [cache]
  80. events = [
  81. (obj, result, type(exception))
  82. for obj, result, exception
  83. in parallel_execute_iter(objects, process, get_deps, None)
  84. ]
  85. assert (cache, None, type(None)) in events
  86. assert (data_volume, None, APIError) in events
  87. assert (db, None, UpstreamError) in events
  88. assert (web, None, UpstreamError) in events
  89. def test_parallel_execute_alignment(capsys):
  90. results, errors = parallel_execute(
  91. objects=["short", "a very long name"],
  92. func=lambda x: x,
  93. get_name=six.text_type,
  94. msg="Aligning",
  95. )
  96. assert errors == {}
  97. _, err = capsys.readouterr()
  98. a, b = err.split('\n')[:2]
  99. assert a.index('...') == b.index('...')
  100. def test_parallel_execute_alignment_noansi(capsys):
  101. ParallelStreamWriter.set_noansi()
  102. results, errors = parallel_execute(
  103. objects=["short", "a very long name"],
  104. func=lambda x: x,
  105. get_name=six.text_type,
  106. msg="Aligning",
  107. )
  108. assert errors == {}
  109. _, err = capsys.readouterr()
  110. a, b, c, d = err.split('\n')[:4]
  111. assert a.index('...') == b.index('...') == c.index('...') == d.index('...')