| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 | from __future__ import absolute_importfrom __future__ import unicode_literalsimport unittestfrom threading import Lockimport sixfrom docker.errors import APIErrorfrom compose.parallel import GlobalLimitfrom compose.parallel import parallel_executefrom compose.parallel import parallel_execute_iterfrom compose.parallel import ParallelStreamWriterfrom compose.parallel import UpstreamErrorweb = 'web'db = 'db'data_volume = 'data_volume'cache = 'cache'objects = [web, db, data_volume, cache]deps = {    web: [db, cache],    db: [data_volume],    data_volume: [],    cache: [],}def get_deps(obj):    return [(dep, None) for dep in deps[obj]]class ParallelTest(unittest.TestCase):    def test_parallel_execute(self):        results, errors = parallel_execute(            objects=[1, 2, 3, 4, 5],            func=lambda x: x * 2,            get_name=six.text_type,            msg="Doubling",        )        assert sorted(results) == [2, 4, 6, 8, 10]        assert errors == {}    def test_parallel_execute_with_limit(self):        limit = 1        tasks = 20        lock = Lock()        def f(obj):            locked = lock.acquire(False)            # we should always get the lock because we're the only thread running            assert locked            lock.release()            return None        results, errors = parallel_execute(            objects=list(range(tasks)),            func=f,            get_name=six.text_type,            msg="Testing",            limit=limit,        )        assert results == tasks * [None]        assert errors == {}    def test_parallel_execute_with_global_limit(self):        GlobalLimit.set_global_limit(1)        self.addCleanup(GlobalLimit.set_global_limit, None)        tasks = 20        lock = Lock()        def f(obj):            locked = lock.acquire(False)            # we should always get the lock because we're the only thread running            assert locked            lock.release()            return None        results, errors = parallel_execute(            objects=list(range(tasks)),            func=f,            get_name=six.text_type,            msg="Testing",        )        assert results == tasks * [None]        assert errors == {}    def test_parallel_execute_with_deps(self):        log = []        def process(x):            log.append(x)        parallel_execute(            objects=objects,            func=process,            get_name=lambda obj: obj,            msg="Processing",            get_deps=get_deps,        )        assert sorted(log) == sorted(objects)        assert log.index(data_volume) < log.index(db)        assert log.index(db) < log.index(web)        assert log.index(cache) < log.index(web)    def test_parallel_execute_with_upstream_errors(self):        log = []        def process(x):            if x is data_volume:                raise APIError(None, None, "Something went wrong")            log.append(x)        parallel_execute(            objects=objects,            func=process,            get_name=lambda obj: obj,            msg="Processing",            get_deps=get_deps,        )        assert log == [cache]        events = [            (obj, result, type(exception))            for obj, result, exception            in parallel_execute_iter(objects, process, get_deps, None)        ]        assert (cache, None, type(None)) in events        assert (data_volume, None, APIError) in events        assert (db, None, UpstreamError) in events        assert (web, None, UpstreamError) in eventsdef test_parallel_execute_alignment(capsys):    results, errors = parallel_execute(        objects=["short", "a very long name"],        func=lambda x: x,        get_name=six.text_type,        msg="Aligning",    )    assert errors == {}    _, err = capsys.readouterr()    a, b = err.split('\n')[:2]    assert a.index('...') == b.index('...')def test_parallel_execute_ansi(capsys):    ParallelStreamWriter.set_noansi(value=False)    results, errors = parallel_execute(        objects=["something", "something more"],        func=lambda x: x,        get_name=six.text_type,        msg="Control characters",    )    assert errors == {}    _, err = capsys.readouterr()    assert "\x1b" in errdef test_parallel_execute_noansi(capsys):    ParallelStreamWriter.set_noansi()    results, errors = parallel_execute(        objects=["something", "something more"],        func=lambda x: x,        get_name=six.text_type,        msg="Control characters",    )    assert errors == {}    _, err = capsys.readouterr()    assert "\x1b" not in err
 |