Browse Source

logs: fix for missing output on container exit (#10925)

We can't assume we receive container logs line by line. Some framework won't buffer output and will send char by char, and we also can receive looong lines which get buffered to 32kb and then cut into multiple logs.

This assumes we will catch container streams being closed before we receive a die event for container, which could be subject to race condition, but at least the impact here is minimal and the fix works for reproduction examples provided in linked issues.

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De loof 2 years ago
parent
commit
6204fb1c94
4 changed files with 16 additions and 28 deletions
  1. 3 1
      pkg/compose/attach.go
  2. 3 3
      pkg/compose/logs_test.go
  3. 3 9
      pkg/utils/writer.go
  4. 7 15
      pkg/utils/writer_test.go

+ 3 - 1
pkg/compose/attach.go

@@ -98,7 +98,7 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
 	return err
 }
 
-func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, stdin io.ReadCloser, stdout, stderr io.Writer) (func(), chan bool, error) {
+func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (func(), chan bool, error) {
 	detached := make(chan bool)
 	var (
 		restore = func() { /* noop */ }
@@ -140,6 +140,8 @@ func (s *composeService) attachContainerStreams(ctx context.Context, container s
 
 	if stdout != nil {
 		go func() {
+			defer stdout.Close() //nolint:errcheck
+			defer stderr.Close() //nolint:errcheck
 			if tty {
 				io.Copy(stdout, streamOut) //nolint:errcheck
 			} else {

+ 3 - 3
pkg/compose/logs_test.go

@@ -71,9 +71,9 @@ func TestComposeService_Logs_Demux(t *testing.T) {
 	c1Stdout := stdcopy.NewStdWriter(c1Writer, stdcopy.Stdout)
 	c1Stderr := stdcopy.NewStdWriter(c1Writer, stdcopy.Stderr)
 	go func() {
-		_, err := c1Stdout.Write([]byte("hello\n stdout"))
+		_, err := c1Stdout.Write([]byte("hello stdout\n"))
 		assert.NoError(t, err, "Writing to fake stdout")
-		_, err = c1Stderr.Write([]byte("hello\n stderr"))
+		_, err = c1Stderr.Write([]byte("hello stderr\n"))
 		assert.NoError(t, err, "Writing to fake stderr")
 		_ = c1Writer.Close()
 	}()
@@ -94,7 +94,7 @@ func TestComposeService_Logs_Demux(t *testing.T) {
 
 	require.Equal(
 		t,
-		[]string{"hello", " stdout", "hello", " stderr"},
+		[]string{"hello stdout", "hello stderr"},
 		consumer.LogsForContainer("c"),
 	)
 }

+ 3 - 9
pkg/utils/writer.go

@@ -43,17 +43,11 @@ func (s *splitWriter) Write(b []byte) (int, error) {
 	for {
 		b = s.buffer.Bytes()
 		index := bytes.Index(b, []byte{'\n'})
-		if index > 0 {
-			line := s.buffer.Next(index + 1)
-			s.consumer(string(line[:len(line)-1]))
-		} else {
-			line := s.buffer.String()
-			s.buffer.Reset()
-			if len(line) > 0 {
-				s.consumer(line)
-			}
+		if index < 0 {
 			break
 		}
+		line := s.buffer.Next(index + 1)
+		s.consumer(string(line[:len(line)-1]))
 	}
 	return n, nil
 }

+ 7 - 15
pkg/utils/writer_test.go

@@ -28,21 +28,13 @@ func TestSplitWriter(t *testing.T) {
 	w := GetWriter(func(line string) {
 		lines = append(lines, line)
 	})
-	w.Write([]byte("hello\n"))
-	w.Write([]byte("world\n"))
-	w.Write([]byte("!"))
-	assert.DeepEqual(t, lines, []string{"hello", "world", "!"})
-
-}
-
-//nolint:errcheck
-func TestSplitWriterNoEOL(t *testing.T) {
-	var lines []string
-	w := GetWriter(func(line string) {
-		lines = append(lines, line)
-	})
-	w.Write([]byte("hello\n"))
-	w.Write([]byte("world!"))
+	w.Write([]byte("h"))
+	w.Write([]byte("e"))
+	w.Write([]byte("l"))
+	w.Write([]byte("l"))
+	w.Write([]byte("o"))
+	w.Write([]byte("\n"))
+	w.Write([]byte("world!\n"))
 	assert.DeepEqual(t, lines, []string{"hello", "world!"})
 
 }