Browse Source

lib/model, lib/protocol: Remove FileInfoBatch reuse behavior (#9399)

Jakob Borg 1 year ago
parent
commit
96c30f8387
4 changed files with 119 additions and 19 deletions
  1. 7 5
      lib/db/util.go
  2. 84 0
      lib/model/indexhandler_test.go
  3. 24 4
      lib/protocol/protocol.go
  4. 4 10
      lib/protocol/wireformat.go

+ 7 - 5
lib/db/util.go

@@ -22,11 +22,10 @@ type FileInfoBatch struct {
 	flushFn func([]protocol.FileInfo) error
 	flushFn func([]protocol.FileInfo) error
 }
 }
 
 
+// NewFileInfoBatch returns a new FileInfoBatch that calls fn when it's time
+// to flush.
 func NewFileInfoBatch(fn func([]protocol.FileInfo) error) *FileInfoBatch {
 func NewFileInfoBatch(fn func([]protocol.FileInfo) error) *FileInfoBatch {
-	return &FileInfoBatch{
-		infos:   make([]protocol.FileInfo, 0, MaxBatchSizeFiles),
-		flushFn: fn,
-	}
+	return &FileInfoBatch{flushFn: fn}
 }
 }
 
 
 func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) {
 func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) {
@@ -34,6 +33,9 @@ func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) {
 }
 }
 
 
 func (b *FileInfoBatch) Append(f protocol.FileInfo) {
 func (b *FileInfoBatch) Append(f protocol.FileInfo) {
+	if b.infos == nil {
+		b.infos = make([]protocol.FileInfo, 0, MaxBatchSizeFiles)
+	}
 	b.infos = append(b.infos, f)
 	b.infos = append(b.infos, f)
 	b.size += f.ProtoSize()
 	b.size += f.ProtoSize()
 }
 }
@@ -61,7 +63,7 @@ func (b *FileInfoBatch) Flush() error {
 }
 }
 
 
 func (b *FileInfoBatch) Reset() {
 func (b *FileInfoBatch) Reset() {
-	b.infos = b.infos[:0]
+	b.infos = nil
 	b.size = 0
 	b.size = 0
 }
 }
 
 

+ 84 - 0
lib/model/indexhandler_test.go

@@ -0,0 +1,84 @@
+// Copyright (C) 2024 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package model_test
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"testing"
+
+	"github.com/syncthing/syncthing/lib/db"
+	"github.com/syncthing/syncthing/lib/model/mocks"
+	"github.com/syncthing/syncthing/lib/protocol"
+	protomock "github.com/syncthing/syncthing/lib/protocol/mocks"
+	"github.com/syncthing/syncthing/lib/testutil"
+)
+
+func TestIndexhandlerConcurrency(t *testing.T) {
+	// Verify that sending a lot of index update messages using the
+	// FileInfoBatch works and doesn't trigger the race detector.
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	ar, aw := io.Pipe()
+	br, bw := io.Pipe()
+	ci := &protomock.ConnectionInfo{}
+
+	m1 := &mocks.Model{}
+	c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil, nil)
+	c1.Start()
+	defer c1.Close(io.EOF)
+
+	m2 := &mocks.Model{}
+	c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil, nil)
+	c2.Start()
+	defer c2.Close(io.EOF)
+
+	c1.ClusterConfig(protocol.ClusterConfig{})
+	c2.ClusterConfig(protocol.ClusterConfig{})
+	c1.Index(ctx, "foo", nil)
+	c2.Index(ctx, "foo", nil)
+
+	const msgs = 5e2
+	const files = 1e3
+
+	recvd := 0
+	m2.IndexUpdateCalls(func(_ protocol.Connection, idxUp *protocol.IndexUpdate) error {
+		for j := 0; j < files; j++ {
+			if n := idxUp.Files[j].Name; n != fmt.Sprintf("f%d-%d", recvd, j) {
+				t.Error("wrong filename", n)
+			}
+		}
+		recvd++
+		return nil
+	})
+
+	b1 := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
+		return c1.IndexUpdate(ctx, "foo", fs)
+	})
+	for i := 0; i < msgs; i++ {
+		for j := 0; j < files; j++ {
+			b1.Append(protocol.FileInfo{
+				Name:   fmt.Sprintf("f%d-%d", i, j),
+				Blocks: []protocol.BlockInfo{{Hash: make([]byte, 32)}},
+			})
+		}
+		if err := b1.Flush(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	c1.Close(io.EOF)
+	c2.Close(io.EOF)
+	<-c1.Closed()
+	<-c2.Closed()
+
+	if recvd != msgs-1 {
+		t.Error("didn't receive all expected messages")
+	}
+}

+ 24 - 4
lib/protocol/protocol.go

@@ -154,15 +154,35 @@ type RequestResponse interface {
 }
 }
 
 
 type Connection interface {
 type Connection interface {
-	Start()
-	SetFolderPasswords(passwords map[string]string)
-	Close(err error)
-	DeviceID() DeviceID
+	// Send an index message. The connection will read and marshal the
+	// parameters asynchronously, so they should not be modified after
+	// calling Index().
 	Index(ctx context.Context, folder string, files []FileInfo) error
 	Index(ctx context.Context, folder string, files []FileInfo) error
+
+	// Send an index update message. The connection will read and marshal
+	// the parameters asynchronously, so they should not be modified after
+	// calling IndexUpdate().
 	IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
 	IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
+
+	// Send a request message. The connection will read and marshal the
+	// parameters asynchronously, so they should not be modified after
+	// calling Request().
 	Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
 	Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
+
+	// Send a cluster configuration message. The connection will read and
+	// marshal the message asynchronously, so it should not be modified
+	// after calling ClusterConfig().
 	ClusterConfig(config ClusterConfig)
 	ClusterConfig(config ClusterConfig)
+
+	// Send a download progress message. The connection will read and
+	// marshal the parameters asynchronously, so they should not be modified
+	// after calling DownloadProgress().
 	DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
 	DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
+
+	Start()
+	SetFolderPasswords(passwords map[string]string)
+	Close(err error)
+	DeviceID() DeviceID
 	Statistics() Statistics
 	Statistics() Statistics
 	Closed() <-chan struct{}
 	Closed() <-chan struct{}
 	ConnectionInfo
 	ConnectionInfo

+ 4 - 10
lib/protocol/wireformat.go

@@ -14,25 +14,19 @@ type wireFormatConnection struct {
 }
 }
 
 
 func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error {
 func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error {
-	var myFs = make([]FileInfo, len(fs))
-	copy(myFs, fs)
-
 	for i := range fs {
 	for i := range fs {
-		myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
+		fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name))
 	}
 	}
 
 
-	return c.Connection.Index(ctx, folder, myFs)
+	return c.Connection.Index(ctx, folder, fs)
 }
 }
 
 
 func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error {
 func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error {
-	var myFs = make([]FileInfo, len(fs))
-	copy(myFs, fs)
-
 	for i := range fs {
 	for i := range fs {
-		myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
+		fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name))
 	}
 	}
 
 
-	return c.Connection.IndexUpdate(ctx, folder, myFs)
+	return c.Connection.IndexUpdate(ctx, folder, fs)
 }
 }
 
 
 func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
 func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {