parallel_test.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. from threading import Lock
  4. import six
  5. from docker.errors import APIError
  6. from compose.parallel import parallel_execute
  7. from compose.parallel import parallel_execute_iter
  8. from compose.parallel import UpstreamError
  9. web = 'web'
  10. db = 'db'
  11. data_volume = 'data_volume'
  12. cache = 'cache'
  13. objects = [web, db, data_volume, cache]
  14. deps = {
  15. web: [db, cache],
  16. db: [data_volume],
  17. data_volume: [],
  18. cache: [],
  19. }
  20. def get_deps(obj):
  21. return [(dep, None) for dep in deps[obj]]
  22. def test_parallel_execute():
  23. results, errors = parallel_execute(
  24. objects=[1, 2, 3, 4, 5],
  25. func=lambda x: x * 2,
  26. get_name=six.text_type,
  27. msg="Doubling",
  28. )
  29. assert sorted(results) == [2, 4, 6, 8, 10]
  30. assert errors == {}
  31. def test_parallel_execute_with_limit():
  32. limit = 1
  33. tasks = 20
  34. lock = Lock()
  35. def f(obj):
  36. locked = lock.acquire(False)
  37. # we should always get the lock because we're the only thread running
  38. assert locked
  39. lock.release()
  40. return None
  41. results, errors = parallel_execute(
  42. objects=list(range(tasks)),
  43. func=f,
  44. get_name=six.text_type,
  45. msg="Testing",
  46. limit=limit,
  47. )
  48. assert results == tasks*[None]
  49. assert errors == {}
  50. def test_parallel_execute_with_deps():
  51. log = []
  52. def process(x):
  53. log.append(x)
  54. parallel_execute(
  55. objects=objects,
  56. func=process,
  57. get_name=lambda obj: obj,
  58. msg="Processing",
  59. get_deps=get_deps,
  60. )
  61. assert sorted(log) == sorted(objects)
  62. assert log.index(data_volume) < log.index(db)
  63. assert log.index(db) < log.index(web)
  64. assert log.index(cache) < log.index(web)
  65. def test_parallel_execute_with_upstream_errors():
  66. log = []
  67. def process(x):
  68. if x is data_volume:
  69. raise APIError(None, None, "Something went wrong")
  70. log.append(x)
  71. parallel_execute(
  72. objects=objects,
  73. func=process,
  74. get_name=lambda obj: obj,
  75. msg="Processing",
  76. get_deps=get_deps,
  77. )
  78. assert log == [cache]
  79. events = [
  80. (obj, result, type(exception))
  81. for obj, result, exception
  82. in parallel_execute_iter(objects, process, get_deps, None)
  83. ]
  84. assert (cache, None, type(None)) in events
  85. assert (data_volume, None, APIError) in events
  86. assert (db, None, UpstreamError) in events
  87. assert (web, None, UpstreamError) in events
  88. def test_parallel_execute_alignment(capsys):
  89. results, errors = parallel_execute(
  90. objects=["short", "a very long name"],
  91. func=lambda x: x,
  92. get_name=six.text_type,
  93. msg="Aligning",
  94. )
  95. assert errors == {}
  96. _, err = capsys.readouterr()
  97. a, b = err.split('\n')[:2]
  98. assert a.index('...') == b.index('...')
  99. def test_parallel_execute_alignment_noansi(capsys):
  100. results, errors = parallel_execute(
  101. objects=["short", "a very long name"],
  102. func=lambda x: x,
  103. get_name=six.text_type,
  104. msg="Aligning",
  105. noansi=True,
  106. )
  107. assert errors == {}
  108. _, err = capsys.readouterr()
  109. a, b, c, d = err.split('\n')[:4]
  110. assert a.index('...') == b.index('...') == c.index('...') == d.index('...')