Sfoglia il codice sorgente

Progress markers are not shown correctly for docker-compose up (fixes #4801)

Signed-off-by: Guillermo Arribas <[email protected]>
Guillermo Arribas 8 anni fa
parent
commit
57eb1c463f
4 ha cambiato i file con 53 aggiunte e 20 eliminazioni
  1. 16 7
      compose/parallel.py
  2. 24 1
      compose/project.py
  3. 12 11
      compose/service.py
  4. 1 1
      tests/unit/service_test.py

+ 16 - 7
compose/parallel.py

@@ -26,7 +26,7 @@ log = logging.getLogger(__name__)
 STOP = object()
 
 
-def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
+def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
     """Runs func on objects in parallel while ensuring that func is
     ran on object only after it is ran on all its dependencies.
 
@@ -37,9 +37,19 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
     stream = get_output_stream(sys.stderr)
 
     writer = ParallelStreamWriter(stream, msg)
-    for obj in objects:
+
+    if parent_objects:
+        display_objects = list(parent_objects)
+    else:
+        display_objects = objects
+
+    for obj in display_objects:
         writer.add_object(get_name(obj))
-    writer.write_initial()
+
+    # write data in a second loop to consider all objects for width alignment
+    # and avoid duplicates when parent_objects exists
+    for obj in objects:
+        writer.write_initial(get_name(obj))
 
     events = parallel_execute_iter(objects, func, get_deps, limit)
 
@@ -237,12 +247,11 @@ class ParallelStreamWriter(object):
         self.lines.append(obj_index)
         self.width = max(self.width, len(obj_index))
 
-    def write_initial(self):
+    def write_initial(self, obj_index):
         if self.msg is None:
             return
-        for line in self.lines:
-            self.stream.write("{} {:<{width}} ... \r\n".format(self.msg, line,
-                              width=self.width))
+        self.stream.write("{} {:<{width}} ... \r\n".format(
+            self.msg, self.lines[self.lines.index(obj_index)], width=self.width))
         self.stream.flush()
 
     def _write_ansi(self, obj_index, status):

+ 24 - 1
compose/project.py

@@ -29,6 +29,7 @@ from .service import ConvergenceStrategy
 from .service import NetworkMode
 from .service import PidMode
 from .service import Service
+from .service import ServiceName
 from .service import ServiceNetworkMode
 from .service import ServicePidMode
 from .utils import microseconds_from_time_nano
@@ -190,6 +191,25 @@ class Project(object):
             service.remove_duplicate_containers()
         return services
 
+    def get_scaled_services(self, services, scale_override):
+        """
+        Returns a list of this project's services as scaled ServiceName objects.
+
+        services: a list of Service objects
+        scale_override: a dict with the scale to apply to each service (k: service_name, v: scale)
+        """
+        service_names = []
+        for service in services:
+            if service.name in scale_override:
+                scale = scale_override[service.name]
+            else:
+                scale = service.scale_num
+
+            for i in range(1, scale + 1):
+                service_names.append(ServiceName(self.name, service.name, i))
+
+        return service_names
+
     def get_links(self, service_dict):
         links = []
         if 'links' in service_dict:
@@ -430,15 +450,18 @@ class Project(object):
         for svc in services:
             svc.ensure_image_exists(do_build=do_build)
         plans = self._get_convergence_plans(services, strategy)
+        scaled_services = self.get_scaled_services(services, scale_override)
 
         def do(service):
+
             return service.execute_convergence_plan(
                 plans[service.name],
                 timeout=timeout,
                 detached=detached,
                 scale_override=scale_override.get(service.name),
                 rescale=rescale,
-                start=start
+                start=start,
+                project_services=scaled_services
             )
 
         def get_deps(service):

+ 12 - 11
compose/service.py

@@ -378,11 +378,11 @@ class Service(object):
 
         return has_diverged
 
-    def _execute_convergence_create(self, scale, detached, start):
+    def _execute_convergence_create(self, scale, detached, start, project_services=None):
             i = self._next_container_number()
 
             def create_and_start(service, n):
-                container = service.create_container(number=n)
+                container = service.create_container(number=n, quiet=True)
                 if not detached:
                     container.attach_log_stream()
                 if start:
@@ -390,10 +390,11 @@ class Service(object):
                 return container
 
             containers, errors = parallel_execute(
-                range(i, i + scale),
-                lambda n: create_and_start(self, n),
-                lambda n: self.get_container_name(n),
+                [ServiceName(self.project, self.name, index) for index in range(i, i + scale)],
+                lambda service_name: create_and_start(self, service_name.number),
+                lambda service_name: self.get_container_name(service_name.service, service_name.number),
                 "Creating",
+                parent_objects=project_services
             )
             for error in errors.values():
                 raise OperationFailedError(error)
@@ -432,7 +433,7 @@ class Service(object):
             if start:
                 _, errors = parallel_execute(
                     containers,
-                    lambda c: self.start_container_if_stopped(c, attach_logs=not detached),
+                    lambda c: self.start_container_if_stopped(c, attach_logs=not detached, quiet=True),
                     lambda c: c.name,
                     "Starting",
                 )
@@ -459,7 +460,7 @@ class Service(object):
         )
 
     def execute_convergence_plan(self, plan, timeout=None, detached=False,
-                                 start=True, scale_override=None, rescale=True):
+                                 start=True, scale_override=None, rescale=True, project_services=None):
         (action, containers) = plan
         scale = scale_override if scale_override is not None else self.scale_num
         containers = sorted(containers, key=attrgetter('number'))
@@ -468,7 +469,7 @@ class Service(object):
 
         if action == 'create':
             return self._execute_convergence_create(
-                scale, detached, start
+                scale, detached, start, project_services
             )
 
         # The create action needs always needs an initial scale, but otherwise,
@@ -741,7 +742,7 @@ class Service(object):
         container_options.update(override_options)
 
         if not container_options.get('name'):
-            container_options['name'] = self.get_container_name(number, one_off)
+            container_options['name'] = self.get_container_name(self.name, number, one_off)
 
         container_options.setdefault('detach', True)
 
@@ -960,12 +961,12 @@ class Service(object):
     def custom_container_name(self):
         return self.options.get('container_name')
 
-    def get_container_name(self, number, one_off=False):
+    def get_container_name(self, service_name, number, one_off=False):
         if self.custom_container_name and not one_off:
             return self.custom_container_name
 
         container_name = build_container_name(
-            self.project, self.name, number, one_off,
+            self.project, service_name, number, one_off,
         )
         ext_links_origins = [l.split(':')[0] for l in self.options.get('external_links', [])]
         if container_name in ext_links_origins:

+ 1 - 1
tests/unit/service_test.py

@@ -179,7 +179,7 @@ class ServiceTest(unittest.TestCase):
             external_links=['default_foo_1']
         )
         with self.assertRaises(DependencyError):
-            service.get_container_name(1)
+            service.get_container_name('foo', 1)
 
     def test_mem_reservation(self):
         self.mock_client.create_host_config.return_value = {}