123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- from __future__ import absolute_import
- from __future__ import unicode_literals
- import unittest
- from threading import Lock
- import six
- from docker.errors import APIError
- from compose.parallel import GlobalLimit
- from compose.parallel import parallel_execute
- from compose.parallel import parallel_execute_iter
- from compose.parallel import ParallelStreamWriter
- from compose.parallel import UpstreamError
- web = 'web'
- db = 'db'
- data_volume = 'data_volume'
- cache = 'cache'
- objects = [web, db, data_volume, cache]
- deps = {
- web: [db, cache],
- db: [data_volume],
- data_volume: [],
- cache: [],
- }
- def get_deps(obj):
- return [(dep, None) for dep in deps[obj]]
- class ParallelTest(unittest.TestCase):
- def test_parallel_execute(self):
- results, errors = parallel_execute(
- objects=[1, 2, 3, 4, 5],
- func=lambda x: x * 2,
- get_name=six.text_type,
- msg="Doubling",
- )
- assert sorted(results) == [2, 4, 6, 8, 10]
- assert errors == {}
- def test_parallel_execute_with_limit(self):
- limit = 1
- tasks = 20
- lock = Lock()
- def f(obj):
- locked = lock.acquire(False)
- # we should always get the lock because we're the only thread running
- assert locked
- lock.release()
- return None
- results, errors = parallel_execute(
- objects=list(range(tasks)),
- func=f,
- get_name=six.text_type,
- msg="Testing",
- limit=limit,
- )
- assert results == tasks * [None]
- assert errors == {}
- def test_parallel_execute_with_global_limit(self):
- GlobalLimit.set_global_limit(1)
- self.addCleanup(GlobalLimit.set_global_limit, None)
- tasks = 20
- lock = Lock()
- def f(obj):
- locked = lock.acquire(False)
- # we should always get the lock because we're the only thread running
- assert locked
- lock.release()
- return None
- results, errors = parallel_execute(
- objects=list(range(tasks)),
- func=f,
- get_name=six.text_type,
- msg="Testing",
- )
- assert results == tasks * [None]
- assert errors == {}
- def test_parallel_execute_with_deps(self):
- log = []
- def process(x):
- log.append(x)
- parallel_execute(
- objects=objects,
- func=process,
- get_name=lambda obj: obj,
- msg="Processing",
- get_deps=get_deps,
- )
- assert sorted(log) == sorted(objects)
- assert log.index(data_volume) < log.index(db)
- assert log.index(db) < log.index(web)
- assert log.index(cache) < log.index(web)
- def test_parallel_execute_with_upstream_errors(self):
- log = []
- def process(x):
- if x is data_volume:
- raise APIError(None, None, "Something went wrong")
- log.append(x)
- parallel_execute(
- objects=objects,
- func=process,
- get_name=lambda obj: obj,
- msg="Processing",
- get_deps=get_deps,
- )
- assert log == [cache]
- events = [
- (obj, result, type(exception))
- for obj, result, exception
- in parallel_execute_iter(objects, process, get_deps, None)
- ]
- assert (cache, None, type(None)) in events
- assert (data_volume, None, APIError) in events
- assert (db, None, UpstreamError) in events
- assert (web, None, UpstreamError) in events
- def test_parallel_execute_alignment(capsys):
- ParallelStreamWriter.instance = None
- results, errors = parallel_execute(
- objects=["short", "a very long name"],
- func=lambda x: x,
- get_name=six.text_type,
- msg="Aligning",
- )
- assert errors == {}
- _, err = capsys.readouterr()
- a, b = err.split('\n')[:2]
- assert a.index('...') == b.index('...')
- def test_parallel_execute_ansi(capsys):
- ParallelStreamWriter.instance = None
- ParallelStreamWriter.set_noansi(value=False)
- results, errors = parallel_execute(
- objects=["something", "something more"],
- func=lambda x: x,
- get_name=six.text_type,
- msg="Control characters",
- )
- assert errors == {}
- _, err = capsys.readouterr()
- assert "\x1b" in err
- def test_parallel_execute_noansi(capsys):
- ParallelStreamWriter.instance = None
- ParallelStreamWriter.set_noansi()
- results, errors = parallel_execute(
- objects=["something", "something more"],
- func=lambda x: x,
- get_name=six.text_type,
- msg="Control characters",
- )
- assert errors == {}
- _, err = capsys.readouterr()
- assert "\x1b" not in err
|