Browse Source

Fix bufferpool puts (ref #4976) (#6125)

* Fix bufferpool puts (ref #4976)

There was a logic error in Put() which made us put all large blocks into
segment zero, where we subsequently did not look for them.

I also added a lowest threshold, as we otherwise allocate a 128KiB
buffer when we need 24 bytes for a header and such.

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* smaller stress

* cap/len

* wip

* wip
Jakob Borg 6 years ago
parent
commit
6755a9ca63
2 changed files with 199 additions and 24 deletions
  1. 67 24
      lib/protocol/bufferpool.go
  2. 132 0
      lib/protocol/bufferpool_test.go

+ 67 - 24
lib/protocol/bufferpool.go

@@ -2,56 +2,71 @@
 
 package protocol
 
-import "sync"
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+)
 
 // Global pool to get buffers from. Requires Blocksizes to be initialised,
 // therefore it is initialized in the same init() as BlockSizes
 var BufferPool bufferPool
 
 type bufferPool struct {
-	pools []sync.Pool
+	puts   int64
+	skips  int64
+	misses int64
+	pools  []sync.Pool
+	hits   []int64 // start of slice allocation is always aligned
 }
 
 func newBufferPool() bufferPool {
-	return bufferPool{make([]sync.Pool, len(BlockSizes))}
+	return bufferPool{
+		pools: make([]sync.Pool, len(BlockSizes)),
+		hits:  make([]int64, len(BlockSizes)),
+	}
 }
 
 func (p *bufferPool) Get(size int) []byte {
 	// Too big, isn't pooled
 	if size > MaxBlockSize {
+		atomic.AddInt64(&p.skips, 1)
 		return make([]byte, size)
 	}
-	var i int
-	for i = range BlockSizes {
-		if size <= BlockSizes[i] {
-			break
-		}
-	}
-	var bs []byte
+
 	// Try the fitting and all bigger pools
-	for j := i; j < len(BlockSizes); j++ {
+	bkt := getBucketForLen(size)
+	for j := bkt; j < len(BlockSizes); j++ {
 		if intf := p.pools[j].Get(); intf != nil {
-			bs = *intf.(*[]byte)
+			atomic.AddInt64(&p.hits[j], 1)
+			bs := *intf.(*[]byte)
 			return bs[:size]
 		}
 	}
-	// All pools are empty, must allocate.
-	return make([]byte, BlockSizes[i])[:size]
+
+	atomic.AddInt64(&p.misses, 1)
+
+	// All pools are empty, must allocate. For very small slices where we
+	// didn't have a block to reuse, just allocate a small slice instead of
+	// a large one. We won't be able to reuse it, but avoid some overhead.
+	if size < MinBlockSize/64 {
+		return make([]byte, size)
+	}
+	return make([]byte, BlockSizes[bkt])[:size]
 }
 
-// Put makes the given byte slice availabe again in the global pool
+// Put makes the given byte slice available again in the global pool.
+// You must only Put() slices that were returned by Get() or Upgrade().
 func (p *bufferPool) Put(bs []byte) {
-	c := cap(bs)
-	// Don't buffer huge byte slices
-	if c > 2*MaxBlockSize {
+	// Don't buffer slices outside of our pool range
+	if cap(bs) > MaxBlockSize || cap(bs) < MinBlockSize {
+		atomic.AddInt64(&p.skips, 1)
 		return
 	}
-	for i := range BlockSizes {
-		if c >= BlockSizes[i] {
-			p.pools[i].Put(&bs)
-			return
-		}
-	}
+
+	atomic.AddInt64(&p.puts, 1)
+	bkt := putBucketForCap(cap(bs))
+	p.pools[bkt].Put(&bs)
 }
 
 // Upgrade grows the buffer to the requested size, while attempting to reuse
@@ -67,3 +82,31 @@ func (p *bufferPool) Upgrade(bs []byte, size int) []byte {
 	p.Put(bs)
 	return p.Get(size)
 }
+
+// getBucketForLen returns the bucket where we should get a slice of a
+// certain length. Each bucket is guaranteed to hold slices that are
+// precisely the block size for that bucket, so if the block size is larger
+// than our size we are good.
+func getBucketForLen(len int) int {
+	for i, blockSize := range BlockSizes {
+		if len <= blockSize {
+			return i
+		}
+	}
+
+	panic(fmt.Sprintf("bug: tried to get impossible block len %d", len))
+}
+
+// putBucketForCap returns the bucket where we should put a slice of a
+// certain capacity. Each bucket is guaranteed to hold slices that are
+// precisely the block size for that bucket, so we just find the matching
+// one.
+func putBucketForCap(cap int) int {
+	for i, blockSize := range BlockSizes {
+		if cap == blockSize {
+			return i
+		}
+	}
+
+	panic(fmt.Sprintf("bug: tried to put impossible block cap %d", cap))
+}

+ 132 - 0
lib/protocol/bufferpool_test.go

@@ -0,0 +1,132 @@
+// Copyright (C) 2019 The Protocol Authors.
+
+package protocol
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/syncthing/syncthing/lib/rand"
+)
+
+func TestGetBucketNumbers(t *testing.T) {
+	cases := []struct {
+		size   int
+		bkt    int
+		panics bool
+	}{
+		{size: 1024, bkt: 0},
+		{size: MinBlockSize, bkt: 0},
+		{size: MinBlockSize + 1, bkt: 1},
+		{size: 2*MinBlockSize - 1, bkt: 1},
+		{size: 2 * MinBlockSize, bkt: 1},
+		{size: 2*MinBlockSize + 1, bkt: 2},
+		{size: MaxBlockSize, bkt: len(BlockSizes) - 1},
+		{size: MaxBlockSize + 1, panics: true},
+	}
+
+	for _, tc := range cases {
+		if tc.panics {
+			shouldPanic(t, func() { getBucketForLen(tc.size) })
+		} else {
+			res := getBucketForLen(tc.size)
+			if res != tc.bkt {
+				t.Errorf("block of size %d should get from bucket %d, not %d", tc.size, tc.bkt, res)
+			}
+		}
+	}
+}
+
+func TestPutBucketNumbers(t *testing.T) {
+	cases := []struct {
+		size   int
+		bkt    int
+		panics bool
+	}{
+		{size: 1024, panics: true},
+		{size: MinBlockSize, bkt: 0},
+		{size: MinBlockSize + 1, panics: true},
+		{size: 2 * MinBlockSize, bkt: 1},
+		{size: MaxBlockSize, bkt: len(BlockSizes) - 1},
+		{size: MaxBlockSize + 1, panics: true},
+	}
+
+	for _, tc := range cases {
+		if tc.panics {
+			shouldPanic(t, func() { putBucketForCap(tc.size) })
+		} else {
+			res := putBucketForCap(tc.size)
+			if res != tc.bkt {
+				t.Errorf("block of size %d should put into bucket %d, not %d", tc.size, tc.bkt, res)
+			}
+		}
+	}
+}
+
+func TestStressBufferPool(t *testing.T) {
+	if testing.Short() {
+		t.Skip()
+	}
+
+	const routines = 10
+	const runtime = 2 * time.Second
+
+	bp := newBufferPool()
+	t0 := time.Now()
+
+	var wg sync.WaitGroup
+	fail := make(chan struct{}, routines)
+	for i := 0; i < routines; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for time.Since(t0) < runtime {
+				blocks := make([][]byte, 10)
+				for i := range blocks {
+					// Request a block of random size with the range
+					// covering smaller-than-min to larger-than-max and
+					// everything in between.
+					want := rand.Intn(1.5 * MaxBlockSize)
+					blocks[i] = bp.Get(want)
+					if len(blocks[i]) != want {
+						fail <- struct{}{}
+						return
+					}
+				}
+				for i := range blocks {
+					bp.Put(blocks[i])
+				}
+			}
+		}()
+	}
+
+	wg.Wait()
+	select {
+	case <-fail:
+		t.Fatal("a block was bad size")
+	default:
+	}
+
+	t.Log(bp.puts, bp.skips, bp.misses, bp.hits)
+	if bp.puts == 0 || bp.skips == 0 || bp.misses == 0 {
+		t.Error("didn't exercise some paths")
+	}
+	var hits int64
+	for _, h := range bp.hits {
+		hits += h
+	}
+	if hits == 0 {
+		t.Error("didn't exercise some paths")
+	}
+}
+
+func shouldPanic(t *testing.T, fn func()) {
+	defer func() {
+		if r := recover(); r == nil {
+			t.Errorf("did not panic")
+		}
+	}()
+
+	fn()
+}