parallel_test.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import six
  4. from docker.errors import APIError
  5. from compose.parallel import parallel_execute
  6. from compose.parallel import parallel_execute_iter
  7. from compose.parallel import UpstreamError
  8. web = 'web'
  9. db = 'db'
  10. data_volume = 'data_volume'
  11. cache = 'cache'
  12. objects = [web, db, data_volume, cache]
  13. deps = {
  14. web: [db, cache],
  15. db: [data_volume],
  16. data_volume: [],
  17. cache: [],
  18. }
  19. def get_deps(obj):
  20. return [(dep, None) for dep in deps[obj]]
  21. def test_parallel_execute():
  22. results, errors = parallel_execute(
  23. objects=[1, 2, 3, 4, 5],
  24. func=lambda x: x * 2,
  25. get_name=six.text_type,
  26. msg="Doubling",
  27. )
  28. assert sorted(results) == [2, 4, 6, 8, 10]
  29. assert errors == {}
  30. def test_parallel_execute_with_deps():
  31. log = []
  32. def process(x):
  33. log.append(x)
  34. parallel_execute(
  35. objects=objects,
  36. func=process,
  37. get_name=lambda obj: obj,
  38. msg="Processing",
  39. get_deps=get_deps,
  40. )
  41. assert sorted(log) == sorted(objects)
  42. assert log.index(data_volume) < log.index(db)
  43. assert log.index(db) < log.index(web)
  44. assert log.index(cache) < log.index(web)
  45. def test_parallel_execute_with_upstream_errors():
  46. log = []
  47. def process(x):
  48. if x is data_volume:
  49. raise APIError(None, None, "Something went wrong")
  50. log.append(x)
  51. parallel_execute(
  52. objects=objects,
  53. func=process,
  54. get_name=lambda obj: obj,
  55. msg="Processing",
  56. get_deps=get_deps,
  57. )
  58. assert log == [cache]
  59. events = [
  60. (obj, result, type(exception))
  61. for obj, result, exception
  62. in parallel_execute_iter(objects, process, get_deps, None)
  63. ]
  64. assert (cache, None, type(None)) in events
  65. assert (data_volume, None, APIError) in events
  66. assert (db, None, UpstreamError) in events
  67. assert (web, None, UpstreamError) in events