12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- from __future__ import absolute_import
- from __future__ import unicode_literals
- import six
- from docker.errors import APIError
- from compose.parallel import parallel_execute
- web = 'web'
- db = 'db'
- data_volume = 'data_volume'
- cache = 'cache'
- objects = [web, db, data_volume, cache]
- deps = {
- web: [db, cache],
- db: [data_volume],
- data_volume: [],
- cache: [],
- }
- def test_parallel_execute():
- results = parallel_execute(
- objects=[1, 2, 3, 4, 5],
- func=lambda x: x * 2,
- get_name=six.text_type,
- msg="Doubling",
- )
- assert sorted(results) == [2, 4, 6, 8, 10]
- def test_parallel_execute_with_deps():
- log = []
- def process(x):
- log.append(x)
- parallel_execute(
- objects=objects,
- func=process,
- get_name=lambda obj: obj,
- msg="Processing",
- get_deps=lambda obj: deps[obj],
- )
- assert sorted(log) == sorted(objects)
- assert log.index(data_volume) < log.index(db)
- assert log.index(db) < log.index(web)
- assert log.index(cache) < log.index(web)
- def test_parallel_execute_with_upstream_errors():
- log = []
- def process(x):
- if x is data_volume:
- raise APIError(None, None, "Something went wrong")
- log.append(x)
- parallel_execute(
- objects=objects,
- func=process,
- get_name=lambda obj: obj,
- msg="Processing",
- get_deps=lambda obj: deps[obj],
- )
- assert log == [cache]
|