|
@@ -5,11 +5,15 @@ import codecs
|
|
|
import hashlib
|
|
|
import json
|
|
|
import json.decoder
|
|
|
+import logging
|
|
|
|
|
|
import six
|
|
|
|
|
|
+from .errors import StreamParseError
|
|
|
+
|
|
|
|
|
|
json_decoder = json.JSONDecoder()
|
|
|
+log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
def get_output_stream(stream):
|
|
@@ -60,13 +64,21 @@ def split_buffer(stream, splitter=None, decoder=lambda a: a):
|
|
|
yield item
|
|
|
|
|
|
if buffered:
|
|
|
- yield decoder(buffered)
|
|
|
+ try:
|
|
|
+ yield decoder(buffered)
|
|
|
+ except Exception as e:
|
|
|
+ log.error(
|
|
|
+ 'Compose tried decoding the following data chunk, but failed:'
|
|
|
+ '\n%s' % repr(buffered)
|
|
|
+ )
|
|
|
+ raise StreamParseError(e)
|
|
|
|
|
|
|
|
|
def json_splitter(buffer):
|
|
|
"""Attempt to parse a json object from a buffer. If there is at least one
|
|
|
object, return it and the rest of the buffer, otherwise return None.
|
|
|
"""
|
|
|
+ buffer = buffer.strip()
|
|
|
try:
|
|
|
obj, index = json_decoder.raw_decode(buffer)
|
|
|
rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
|