| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- /*
- Copyright 2023 Docker Compose CLI authors
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package sync
- import (
- "context"
- "errors"
- "io"
- "sync"
- "testing"
- "time"
- "github.com/stretchr/testify/require"
- )
- func TestLossyMultiWriter(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- t.Cleanup(cancel)
- const count = 5
- readers := make([]*bufReader, count)
- writers := make([]*io.PipeWriter, count)
- for i := 0; i < count; i++ {
- r, w := io.Pipe()
- readers[i] = newBufReader(ctx, r)
- writers[i] = w
- }
- w := newLossyMultiWriter(writers...)
- t.Cleanup(w.Close)
- n, err := w.Write([]byte("hello world"))
- require.Equal(t, 11, n)
- require.NoError(t, err)
- for i := range readers {
- readers[i].waitForWrite(t)
- require.Equal(t, "hello world", string(readers[i].contents()))
- readers[i].reset()
- }
- // even if a writer fails (in this case simulated by closing the receiving end of the pipe),
- // write operations should continue to return nil error but the writer should be closed
- // with an error
- const failIndex = 3
- require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no")))
- n, err = w.Write([]byte("hello"))
- require.Equal(t, 5, n)
- require.NoError(t, err)
- for i := range readers {
- readers[i].waitForWrite(t)
- if i == failIndex {
- err := readers[i].error()
- require.EqualError(t, err, "io: read/write on closed pipe")
- require.Empty(t, readers[i].contents())
- } else {
- require.Equal(t, "hello", string(readers[i].contents()))
- }
- }
- // perform another write, verify there's still no errors
- n, err = w.Write([]byte(" world"))
- require.Equal(t, 6, n)
- require.NoError(t, err)
- }
- type bufReader struct {
- ctx context.Context
- r *io.PipeReader
- mu sync.Mutex
- err error
- data []byte
- writeSync chan struct{}
- }
- func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader {
- b := &bufReader{
- ctx: ctx,
- r: r,
- writeSync: make(chan struct{}),
- }
- go b.consume()
- return b
- }
- func (b *bufReader) waitForWrite(t testing.TB) {
- t.Helper()
- select {
- case <-b.writeSync:
- return
- case <-time.After(50 * time.Millisecond):
- t.Fatal("timed out waiting for write")
- }
- }
- func (b *bufReader) consume() {
- defer close(b.writeSync)
- for {
- buf := make([]byte, 512)
- n, err := b.r.Read(buf)
- if n != 0 {
- b.mu.Lock()
- b.data = append(b.data, buf[:n]...)
- b.mu.Unlock()
- }
- if errors.Is(err, io.EOF) {
- return
- }
- if err != nil {
- b.mu.Lock()
- b.err = err
- b.mu.Unlock()
- return
- }
- // prevent goroutine leak, tie lifetime to the test
- select {
- case b.writeSync <- struct{}{}:
- case <-b.ctx.Done():
- return
- }
- }
- }
- func (b *bufReader) contents() []byte {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.data
- }
- func (b *bufReader) reset() {
- b.mu.Lock()
- defer b.mu.Unlock()
- b.data = nil
- }
- func (b *bufReader) error() error {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.err
- }
|