浏览代码

Merge pull request #2132 from dnephin/fix_build_against_swarm

Fix build against swarm
Aanand Prasad 10 年之前
父节点
当前提交
bf2faf6cdb

+ 2 - 2
compose/cli/log_printer.py

@@ -6,8 +6,8 @@ from itertools import cycle
 
 from . import colors
 from .multiplexer import Multiplexer
-from .utils import split_buffer
 from compose import utils
+from compose.utils import split_buffer
 
 
 class LogPrinter(object):
@@ -75,7 +75,7 @@ def build_no_log_generator(container, prefix, color_func):
 def build_log_generator(container, prefix, color_func):
     # Attach to container before log printer starts running
     stream = container.attach(stdout=True, stderr=True,  stream=True, logs=True)
-    line_generator = split_buffer(stream, u'\n')
+    line_generator = split_buffer(stream)
 
     for line in line_generator:
         yield prefix + line

+ 0 - 26
compose/cli/utils.py

@@ -7,7 +7,6 @@ import platform
 import ssl
 import subprocess
 
-import six
 from docker import version as docker_py_version
 from six.moves import input
 
@@ -36,31 +35,6 @@ def yesno(prompt, default=None):
         return None
 
 
-def split_buffer(reader, separator):
-    """
-    Given a generator which yields strings and a separator string,
-    joins all input, splits on the separator and yields each chunk.
-
-    Unlike string.split(), each chunk includes the trailing
-    separator, except for the last one if none was found on the end
-    of the input.
-    """
-    buffered = six.text_type('')
-    separator = six.text_type(separator)
-
-    for data in reader:
-        buffered += data.decode('utf-8')
-        while True:
-            index = buffered.find(separator)
-            if index == -1:
-                break
-            yield buffered[:index + 1]
-            buffered = buffered[index + 1:]
-
-    if len(buffered) > 0:
-        yield buffered
-
-
 def call_silently(*args, **kwargs):
     """
     Like subprocess.call(), but redirects stdout and stderr to /dev/null.

+ 0 - 3
compose/container.py

@@ -212,9 +212,6 @@ class Container(object):
     def attach(self, *args, **kwargs):
         return self.client.attach(self.id, *args, **kwargs)
 
-    def attach_socket(self, **kwargs):
-        return self.client.attach_socket(self.id, **kwargs)
-
     def __repr__(self):
         return '<Container: %s (%s)>' % (self.name, self.id[:6])
 

+ 1 - 8
compose/progress_stream.py

@@ -1,7 +1,3 @@
-import json
-
-import six
-
 from compose import utils
 
 
@@ -16,10 +12,7 @@ def stream_output(output, stream):
     lines = {}
     diff = 0
 
-    for chunk in output:
-        if six.PY3:
-            chunk = chunk.decode('utf-8')
-        event = json.loads(chunk)
+    for event in utils.json_stream(output):
         all_events.append(event)
 
         if 'progress' in event or 'progressDetail' in event:

+ 1 - 0
compose/service.py

@@ -34,6 +34,7 @@ from .progress_stream import StreamOutputError
 from .utils import json_hash
 from .utils import parallel_execute
 
+
 log = logging.getLogger(__name__)
 
 

+ 68 - 0
compose/utils.py

@@ -1,6 +1,7 @@
 import codecs
 import hashlib
 import json
+import json.decoder
 import logging
 import sys
 from threading import Thread
@@ -13,6 +14,8 @@ from six.moves.queue import Queue
 
 log = logging.getLogger(__name__)
 
+json_decoder = json.JSONDecoder()
+
 
 def parallel_execute(objects, obj_callable, msg_index, msg):
     """
@@ -83,6 +86,71 @@ def get_output_stream(stream):
     return codecs.getwriter('utf-8')(stream)
 
 
+def stream_as_text(stream):
+    """Given a stream of bytes or text, if any of the items in the stream
+    are bytes convert them to text.
+
+    This function can be removed once docker-py returns text streams instead
+    of byte streams.
+    """
+    for data in stream:
+        if not isinstance(data, six.text_type):
+            data = data.decode('utf-8')
+        yield data
+
+
+def line_splitter(buffer, separator=u'\n'):
+    index = buffer.find(six.text_type(separator))
+    if index == -1:
+        return None, None
+    return buffer[:index + 1], buffer[index + 1:]
+
+
+def split_buffer(stream, splitter=None, decoder=lambda a: a):
+    """Given a generator which yields strings and a splitter function,
+    joins all input, splits on the separator and yields each chunk.
+
+    Unlike string.split(), each chunk includes the trailing
+    separator, except for the last one if none was found on the end
+    of the input.
+    """
+    splitter = splitter or line_splitter
+    buffered = six.text_type('')
+
+    for data in stream_as_text(stream):
+        buffered += data
+        while True:
+            item, rest = splitter(buffered)
+            if not item:
+                break
+
+            buffered = rest
+            yield item
+
+    if buffered:
+        yield decoder(buffered)
+
+
+def json_splitter(buffer):
+    """Attempt to parse a json object from a buffer. If there is at least one
+    object, return it and the rest of the buffer, otherwise return None.
+    """
+    try:
+        obj, index = json_decoder.raw_decode(buffer)
+        rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
+        return obj, rest
+    except ValueError:
+        return None, None
+
+
+def json_stream(stream):
+    """Given a stream of text, return a stream of json objects.
+    This handles streams which are inconsistently buffered (some entries may
+    be newline delimited, and others are not).
+    """
+    return split_buffer(stream_as_text(stream), json_splitter, json_decoder.decode)
+
+
 def write_out_msg(stream, lines, msg_index, msg, status="done"):
     """
     Using special ANSI code characters we can write out the msg over the top of

+ 2 - 2
tests/unit/split_buffer_test.py

@@ -2,7 +2,7 @@ from __future__ import absolute_import
 from __future__ import unicode_literals
 
 from .. import unittest
-from compose.cli.utils import split_buffer
+from compose.utils import split_buffer
 
 
 class SplitBufferTest(unittest.TestCase):
@@ -47,7 +47,7 @@ class SplitBufferTest(unittest.TestCase):
         self.assert_produces(reader, [string])
 
     def assert_produces(self, reader, expectations):
-        split = split_buffer(reader(), u'\n')
+        split = split_buffer(reader())
 
         for (actual, expected) in zip(split, expectations):
             self.assertEqual(type(actual), type(expected))

+ 16 - 0
tests/unit/utils_test.py

@@ -0,0 +1,16 @@
+from .. import unittest
+from compose import utils
+
+
+class JsonSplitterTestCase(unittest.TestCase):
+
+    def test_json_splitter_no_object(self):
+        data = '{"foo": "bar'
+        self.assertEqual(utils.json_splitter(data), (None, None))
+
+    def test_json_splitter_with_object(self):
+        data = '{"foo": "bar"}\n  \n{"next": "obj"}'
+        self.assertEqual(
+            utils.json_splitter(data),
+            ({'foo': 'bar'}, '{"next": "obj"}')
+        )