瀏覽代碼

Revamp ParallelStreamWriter to fix display issues.

Signed-off-by: Matthieu Nottale <[email protected]>
Matthieu Nottale 7 年之前
父節點
當前提交
d2b5d59dd8
共有 5 個文件被更改,包括 66 次插入51 次删除
  1. 57 49
      compose/parallel.py
  2. 1 2
      compose/service.py
  3. 3 0
      tests/integration/service_test.py
  4. 3 0
      tests/unit/parallel_test.py
  5. 2 0
      tests/unit/service_test.py

+ 57 - 49
compose/parallel.py

@@ -43,55 +43,60 @@ class GlobalLimit(object):
         cls.global_limiter = Semaphore(value)
 
 
-def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
-    """Runs func on objects in parallel while ensuring that func is
-    ran on object only after it is ran on all its dependencies.
-
-    get_deps called on object must return a collection with its dependencies.
-    get_name called on object must return its name.
+def parallel_execute_watch(events, writer, errors, results, msg, get_name):
+    """ Watch events from a parallel execution, update status and fill errors and results.
+        Returns exception to re-raise.
     """
-    objects = list(objects)
-    stream = get_output_stream(sys.stderr)
-
-    writer = ParallelStreamWriter(stream, msg)
-
-    display_objects = list(parent_objects) if parent_objects else objects
-
-    for obj in display_objects:
-        writer.add_object(get_name(obj))
-
-    # write data in a second loop to consider all objects for width alignment
-    # and avoid duplicates when parent_objects exists
-    for obj in objects:
-        writer.write_initial(get_name(obj))
-
-    events = parallel_execute_iter(objects, func, get_deps, limit)
-
-    errors = {}
-    results = []
     error_to_reraise = None
-
     for obj, result, exception in events:
         if exception is None:
-            writer.write(get_name(obj), 'done', green)
+            writer.write(msg, get_name(obj), 'done', green)
             results.append(result)
         elif isinstance(exception, ImageNotFound):
             # This is to bubble up ImageNotFound exceptions to the client so we
             # can prompt the user if they want to rebuild.
             errors[get_name(obj)] = exception.explanation
-            writer.write(get_name(obj), 'error', red)
+            writer.write(msg, get_name(obj), 'error', red)
             error_to_reraise = exception
         elif isinstance(exception, APIError):
             errors[get_name(obj)] = exception.explanation
-            writer.write(get_name(obj), 'error', red)
+            writer.write(msg, get_name(obj), 'error', red)
         elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)):
             errors[get_name(obj)] = exception.msg
-            writer.write(get_name(obj), 'error', red)
+            writer.write(msg, get_name(obj), 'error', red)
         elif isinstance(exception, UpstreamError):
-            writer.write(get_name(obj), 'error', red)
+            writer.write(msg, get_name(obj), 'error', red)
         else:
             errors[get_name(obj)] = exception
             error_to_reraise = exception
+    return error_to_reraise
+
+
+def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
+    """Runs func on objects in parallel while ensuring that func is
+    ran on object only after it is ran on all its dependencies.
+
+    get_deps called on object must return a collection with its dependencies.
+    get_name called on object must return its name.
+    """
+    objects = list(objects)
+    stream = get_output_stream(sys.stderr)
+
+    if ParallelStreamWriter.instance:
+        writer = ParallelStreamWriter.instance
+    else:
+        writer = ParallelStreamWriter(stream)
+
+    for obj in objects:
+        writer.add_object(msg, get_name(obj))
+    for obj in objects:
+        writer.write_initial(msg, get_name(obj))
+
+    events = parallel_execute_iter(objects, func, get_deps, limit)
+
+    errors = {}
+    results = []
+    error_to_reraise = parallel_execute_watch(events, writer, errors, results, msg, get_name)
 
     for obj_name, error in errors.items():
         stream.write("\nERROR: for {}  {}\n".format(obj_name, error))
@@ -253,55 +258,58 @@ class ParallelStreamWriter(object):
 
     noansi = False
     lock = Lock()
+    instance = None
 
     @classmethod
     def set_noansi(cls, value=True):
         cls.noansi = value
 
-    def __init__(self, stream, msg):
+    def __init__(self, stream):
         self.stream = stream
-        self.msg = msg
         self.lines = []
         self.width = 0
+        ParallelStreamWriter.instance = self
 
-    def add_object(self, obj_index):
-        self.lines.append(obj_index)
-        self.width = max(self.width, len(obj_index))
+    def add_object(self, msg, obj_index):
+        if msg is None:
+            return
+        self.lines.append(msg + obj_index)
+        self.width = max(self.width, len(msg + ' ' + obj_index))
 
-    def write_initial(self, obj_index):
-        if self.msg is None:
+    def write_initial(self, msg, obj_index):
+        if msg is None:
             return
-        self.stream.write("{} {:<{width}} ... \r\n".format(
-            self.msg, self.lines[self.lines.index(obj_index)], width=self.width))
+        self.stream.write("{:<{width}} ... \r\n".format(
+            msg + ' ' + obj_index, width=self.width))
         self.stream.flush()
 
-    def _write_ansi(self, obj_index, status):
+    def _write_ansi(self, msg, obj_index, status):
         self.lock.acquire()
-        position = self.lines.index(obj_index)
+        position = self.lines.index(msg + obj_index)
         diff = len(self.lines) - position
         # move up
         self.stream.write("%c[%dA" % (27, diff))
         # erase
         self.stream.write("%c[2K\r" % 27)
-        self.stream.write("{} {:<{width}} ... {}\r".format(self.msg, obj_index,
+        self.stream.write("{:<{width}} ... {}\r".format(msg + ' ' + obj_index,
                           status, width=self.width))
         # move back down
         self.stream.write("%c[%dB" % (27, diff))
         self.stream.flush()
         self.lock.release()
 
-    def _write_noansi(self, obj_index, status):
-        self.stream.write("{} {:<{width}} ... {}\r\n".format(self.msg, obj_index,
+    def _write_noansi(self, msg, obj_index, status):
+        self.stream.write("{:<{width}} ... {}\r\n".format(msg + ' ' + obj_index,
                           status, width=self.width))
         self.stream.flush()
 
-    def write(self, obj_index, status, color_func):
-        if self.msg is None:
+    def write(self, msg, obj_index, status, color_func):
+        if msg is None:
             return
         if self.noansi:
-            self._write_noansi(obj_index, status)
+            self._write_noansi(msg, obj_index, status)
         else:
-            self._write_ansi(obj_index, color_func(status))
+            self._write_ansi(msg, obj_index, color_func(status))
 
 
 def parallel_operation(containers, operation, options, message):

+ 1 - 2
compose/service.py

@@ -402,8 +402,7 @@ class Service(object):
                 [ServiceName(self.project, self.name, index) for index in range(i, i + scale)],
                 lambda service_name: create_and_start(self, service_name.number),
                 lambda service_name: self.get_container_name(service_name.service, service_name.number),
-                "Creating",
-                parent_objects=project_services
+                "Creating"
             )
             for error in errors.values():
                 raise OperationFailedError(error)

+ 3 - 0
tests/integration/service_test.py

@@ -35,6 +35,7 @@ from compose.const import LABEL_SERVICE
 from compose.const import LABEL_VERSION
 from compose.container import Container
 from compose.errors import OperationFailedError
+from compose.parallel import ParallelStreamWriter
 from compose.project import OneOffFilter
 from compose.service import ConvergencePlan
 from compose.service import ConvergenceStrategy
@@ -1197,6 +1198,7 @@ class ServiceTest(DockerClientTestCase):
         service.create_container(number=next_number)
         service.create_container(number=next_number + 1)
 
+        ParallelStreamWriter.instance = None
         with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:
             service.scale(2)
         for container in service.containers():
@@ -1220,6 +1222,7 @@ class ServiceTest(DockerClientTestCase):
         for container in service.containers():
             assert not container.is_running
 
+        ParallelStreamWriter.instance = None
         with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:
             service.scale(2)
 

+ 3 - 0
tests/unit/parallel_test.py

@@ -143,6 +143,7 @@ class ParallelTest(unittest.TestCase):
 
 
 def test_parallel_execute_alignment(capsys):
+    ParallelStreamWriter.instance = None
     results, errors = parallel_execute(
         objects=["short", "a very long name"],
         func=lambda x: x,
@@ -158,6 +159,7 @@ def test_parallel_execute_alignment(capsys):
 
 
 def test_parallel_execute_ansi(capsys):
+    ParallelStreamWriter.instance = None
     ParallelStreamWriter.set_noansi(value=False)
     results, errors = parallel_execute(
         objects=["something", "something more"],
@@ -173,6 +175,7 @@ def test_parallel_execute_ansi(capsys):
 
 
 def test_parallel_execute_noansi(capsys):
+    ParallelStreamWriter.instance = None
     ParallelStreamWriter.set_noansi()
     results, errors = parallel_execute(
         objects=["something", "something more"],

+ 2 - 0
tests/unit/service_test.py

@@ -20,6 +20,7 @@ from compose.const import LABEL_PROJECT
 from compose.const import LABEL_SERVICE
 from compose.const import SECRETS_PATH
 from compose.container import Container
+from compose.parallel import ParallelStreamWriter
 from compose.project import OneOffFilter
 from compose.service import build_ulimits
 from compose.service import build_volume_binding
@@ -727,6 +728,7 @@ class ServiceTest(unittest.TestCase):
     @mock.patch('compose.service.log', autospec=True)
     def test_only_log_warning_when_host_ports_clash(self, mock_log):
         self.mock_client.inspect_image.return_value = {'Id': 'abcd'}
+        ParallelStreamWriter.instance = None
         name = 'foo'
         service = Service(
             name,