parallel_test.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import six
  4. from docker.errors import APIError
  5. from compose.parallel import parallel_execute
  6. web = 'web'
  7. db = 'db'
  8. data_volume = 'data_volume'
  9. cache = 'cache'
  10. objects = [web, db, data_volume, cache]
  11. deps = {
  12. web: [db, cache],
  13. db: [data_volume],
  14. data_volume: [],
  15. cache: [],
  16. }
  17. def test_parallel_execute():
  18. results = parallel_execute(
  19. objects=[1, 2, 3, 4, 5],
  20. func=lambda x: x * 2,
  21. get_name=six.text_type,
  22. msg="Doubling",
  23. )
  24. assert sorted(results) == [2, 4, 6, 8, 10]
  25. def test_parallel_execute_with_deps():
  26. log = []
  27. def process(x):
  28. log.append(x)
  29. parallel_execute(
  30. objects=objects,
  31. func=process,
  32. get_name=lambda obj: obj,
  33. msg="Processing",
  34. get_deps=lambda obj: deps[obj],
  35. )
  36. assert sorted(log) == sorted(objects)
  37. assert log.index(data_volume) < log.index(db)
  38. assert log.index(db) < log.index(web)
  39. assert log.index(cache) < log.index(web)
  40. def test_parallel_execute_with_upstream_errors():
  41. log = []
  42. def process(x):
  43. if x is data_volume:
  44. raise APIError(None, None, "Something went wrong")
  45. log.append(x)
  46. parallel_execute(
  47. objects=objects,
  48. func=process,
  49. get_name=lambda obj: obj,
  50. msg="Processing",
  51. get_deps=lambda obj: deps[obj],
  52. )
  53. assert log == [cache]