浏览代码

Fix split buffer with inconsistently delimited json objects.

Signed-off-by: Daniel Nephin <[email protected]>
Daniel Nephin 10 年之前
父节点
当前提交
15d0c60a73
共有 6 个文件被更改,包括 62 次插入22 次删除
  1. 1 4
      compose/progress_stream.py
  2. 1 2
      compose/service.py
  3. 41 11
      compose/utils.py
  4. 2 4
      tests/integration/testcases.py
  5. 1 1
      tests/unit/split_buffer_test.py
  6. 16 0
      tests/unit/utils_test.py

+ 1 - 4
compose/progress_stream.py

@@ -1,5 +1,3 @@
-import json
-
 from compose import utils
 
 
@@ -14,8 +12,7 @@ def stream_output(output, stream):
     lines = {}
     diff = 0
 
-    for chunk in utils.stream_as_text(output):
-        event = json.loads(chunk)
+    for event in utils.json_stream(output):
         all_events.append(event)
 
         if 'progress' in event or 'progressDetail' in event:

+ 1 - 2
compose/service.py

@@ -33,7 +33,6 @@ from .progress_stream import stream_output
 from .progress_stream import StreamOutputError
 from .utils import json_hash
 from .utils import parallel_execute
-from .utils import split_buffer
 
 
 log = logging.getLogger(__name__)
@@ -724,7 +723,7 @@ class Service(object):
         )
 
         try:
-            all_events = stream_output(split_buffer(build_output), sys.stdout)
+            all_events = stream_output(build_output, sys.stdout)
         except StreamOutputError as e:
             raise BuildError(self, six.text_type(e))
 

+ 41 - 11
compose/utils.py

@@ -1,6 +1,7 @@
 import codecs
 import hashlib
 import json
+import json.decoder
 import logging
 import sys
 from threading import Thread
@@ -13,6 +14,8 @@ from six.moves.queue import Queue
 
 log = logging.getLogger(__name__)
 
+json_decoder = json.JSONDecoder()
+
 
 def parallel_execute(objects, obj_callable, msg_index, msg):
     """
@@ -96,29 +99,56 @@ def stream_as_text(stream):
         yield data
 
 
-def split_buffer(reader, separator=u'\n'):
-    """
-    Given a generator which yields strings and a separator string,
+def line_splitter(buffer, separator=u'\n'):
+    index = buffer.find(six.text_type(separator))
+    if index == -1:
+        return None, None
+    return buffer[:index + 1], buffer[index + 1:]
+
+
+def split_buffer(stream, splitter=None, decoder=lambda a: a):
+    """Given a generator which yields strings and a splitter function,
     joins all input, splits on the separator and yields each chunk.
 
     Unlike string.split(), each chunk includes the trailing
     separator, except for the last one if none was found on the end
     of the input.
     """
+    splitter = splitter or line_splitter
     buffered = six.text_type('')
-    separator = six.text_type(separator)
 
-    for data in stream_as_text(reader):
+    for data in stream_as_text(stream):
         buffered += data
         while True:
-            index = buffered.find(separator)
-            if index == -1:
+            item, rest = splitter(buffered)
+            if not item:
                 break
-            yield buffered[:index + 1]
-            buffered = buffered[index + 1:]
 
-    if len(buffered) > 0:
-        yield buffered
+            buffered = rest
+            yield item
+
+    if buffered:
+        yield decoder(buffered)
+
+
+def json_splitter(buffer):
+    """Attempt to parse a json object from a buffer. If there is at least one
+    object, return it and the rest of the buffer, otherwise return None.
+    """
+    try:
+        obj, index = json_decoder.raw_decode(buffer)
+        rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
+        return obj, rest
+    except ValueError:
+        return None, None
+
+
+def json_stream(stream):
+    """Given a stream of text, return a stream of json objects.
+    This handles streams which are inconsistently buffered (some entries may
+    be newline delimited, and others are not).
+    """
+    return split_buffer(stream_as_text(stream), json_splitter, json_decoder.decode)
 
 
 def write_out_msg(stream, lines, msg_index, msg, status="done"):

+ 2 - 4
tests/integration/testcases.py

@@ -9,8 +9,6 @@ from compose.config.config import ServiceLoader
 from compose.const import LABEL_PROJECT
 from compose.progress_stream import stream_output
 from compose.service import Service
-from compose.utils import split_buffer
-from compose.utils import stream_as_text
 
 
 def pull_busybox(client):
@@ -73,5 +71,5 @@ class DockerClientTestCase(unittest.TestCase):
 
     def check_build(self, *args, **kwargs):
         kwargs.setdefault('rm', True)
-        build_output = stream_as_text(self.client.build(*args, **kwargs))
-        stream_output(split_buffer(build_output), open('/dev/null', 'w'))
+        build_output = self.client.build(*args, **kwargs)
+        stream_output(build_output, open('/dev/null', 'w'))

+ 1 - 1
tests/unit/split_buffer_test.py

@@ -47,7 +47,7 @@ class SplitBufferTest(unittest.TestCase):
         self.assert_produces(reader, [string])
 
     def assert_produces(self, reader, expectations):
-        split = split_buffer(reader(), u'\n')
+        split = split_buffer(reader())
 
         for (actual, expected) in zip(split, expectations):
             self.assertEqual(type(actual), type(expected))

+ 16 - 0
tests/unit/utils_test.py

@@ -0,0 +1,16 @@
+from .. import unittest
+from compose import utils
+
+
+class JsonSplitterTestCase(unittest.TestCase):
+
+    def test_json_splitter_no_object(self):
+        data = '{"foo": "bar'
+        self.assertEqual(utils.json_splitter(data), (None, None))
+
+    def test_json_splitter_with_object(self):
+        data = '{"foo": "bar"}\n  \n{"next": "obj"}'
+        self.assertEqual(
+            utils.json_splitter(data),
+            ({'foo': 'bar'}, '{"next": "obj"}')
+        )