Răsfoiți Sursa

lib/model, lib/protocol: Add contexts sending indexes and download-progress (#6176)

Simon Frei 5 ani în urmă
părinte
comite
4d368a37e2

+ 6 - 6
lib/model/fakeconns_test.go

@@ -32,7 +32,7 @@ type fakeConnection struct {
 	fileData                 map[string][]byte
 	folder                   string
 	model                    *model
-	indexFn                  func(string, []protocol.FileInfo)
+	indexFn                  func(context.Context, string, []protocol.FileInfo)
 	requestFn                func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
 	closeFn                  func(error)
 	mut                      sync.Mutex
@@ -64,20 +64,20 @@ func (f *fakeConnection) Option(string) string {
 	return ""
 }
 
-func (f *fakeConnection) Index(folder string, fs []protocol.FileInfo) error {
+func (f *fakeConnection) Index(ctx context.Context, folder string, fs []protocol.FileInfo) error {
 	f.mut.Lock()
 	defer f.mut.Unlock()
 	if f.indexFn != nil {
-		f.indexFn(folder, fs)
+		f.indexFn(ctx, folder, fs)
 	}
 	return nil
 }
 
-func (f *fakeConnection) IndexUpdate(folder string, fs []protocol.FileInfo) error {
+func (f *fakeConnection) IndexUpdate(ctx context.Context, folder string, fs []protocol.FileInfo) error {
 	f.mut.Lock()
 	defer f.mut.Unlock()
 	if f.indexFn != nil {
-		f.indexFn(folder, fs)
+		f.indexFn(ctx, folder, fs)
 	}
 	return nil
 }
@@ -109,7 +109,7 @@ func (f *fakeConnection) Statistics() protocol.Statistics {
 	return protocol.Statistics{}
 }
 
-func (f *fakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate) {
+func (f *fakeConnection) DownloadProgress(_ context.Context, folder string, updates []protocol.FileDownloadProgressUpdate) {
 	f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{
 		folder:  folder,
 		updates: updates,

+ 5 - 5
lib/model/model.go

@@ -1990,7 +1990,7 @@ func (s *indexSender) serve(ctx context.Context) {
 	defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err)
 
 	// We need to send one index, regardless of whether there is something to send or not
-	err = s.sendIndexTo()
+	err = s.sendIndexTo(ctx)
 
 	// Subscribe to LocalIndexUpdated (we have new information to send) and
 	// DeviceDisconnected (it might be us who disconnected, so we should
@@ -2028,7 +2028,7 @@ func (s *indexSender) serve(ctx context.Context) {
 			continue
 		}
 
-		err = s.sendIndexTo()
+		err = s.sendIndexTo(ctx)
 
 		// Wait a short amount of time before entering the next loop. If there
 		// are continuous changes happening to the local index, this gives us
@@ -2046,16 +2046,16 @@ func (s *indexSender) Complete() bool { return true }
 
 // sendIndexTo sends file infos with a sequence number higher than prevSequence and
 // returns the highest sent sequence number.
-func (s *indexSender) sendIndexTo() error {
+func (s *indexSender) sendIndexTo(ctx context.Context) error {
 	initial := s.prevSequence == 0
 	batch := newFileInfoBatch(nil)
 	batch.flushFn = func(fs []protocol.FileInfo) error {
 		l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size)
 		if initial {
 			initial = false
-			return s.conn.Index(s.folder, fs)
+			return s.conn.Index(ctx, s.folder, fs)
 		}
-		return s.conn.IndexUpdate(s.folder, fs)
+		return s.conn.IndexUpdate(ctx, s.folder, fs)
 	}
 
 	var err error

+ 4 - 4
lib/model/progressemitter.go

@@ -85,7 +85,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) {
 				lastCount = newCount
 				t.sendDownloadProgressEventLocked()
 				if len(t.connections) > 0 {
-					t.sendDownloadProgressMessagesLocked()
+					t.sendDownloadProgressMessagesLocked(ctx)
 				}
 			} else {
 				l.Debugln("progress emitter: nothing new")
@@ -114,7 +114,7 @@ func (t *ProgressEmitter) sendDownloadProgressEventLocked() {
 	l.Debugf("progress emitter: emitting %#v", output)
 }
 
-func (t *ProgressEmitter) sendDownloadProgressMessagesLocked() {
+func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context) {
 	for id, conn := range t.connections {
 		for _, folder := range t.foldersByConns[id] {
 			pullers, ok := t.registry[folder]
@@ -145,7 +145,7 @@ func (t *ProgressEmitter) sendDownloadProgressMessagesLocked() {
 			updates := state.update(folder, activePullers)
 
 			if len(updates) > 0 {
-				conn.DownloadProgress(folder, updates)
+				conn.DownloadProgress(ctx, folder, updates)
 			}
 		}
 	}
@@ -311,7 +311,7 @@ func (t *ProgressEmitter) clearLocked() {
 		}
 		for _, folder := range state.folders() {
 			if updates := state.cleanup(folder); len(updates) > 0 {
-				conn.DownloadProgress(folder, updates)
+				conn.DownloadProgress(context.Background(), folder, updates)
 			}
 		}
 	}

+ 2 - 1
lib/model/progressemitter_test.go

@@ -7,6 +7,7 @@
 package model
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -461,5 +462,5 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 func sendMsgs(p *ProgressEmitter) {
 	p.mut.Lock()
 	defer p.mut.Unlock()
-	p.sendDownloadProgressMessagesLocked()
+	p.sendDownloadProgressMessagesLocked(context.Background())
 }

+ 19 - 19
lib/model/requests_test.go

@@ -38,7 +38,7 @@ func TestRequestSimple(t *testing.T) {
 	// the expected test file.
 	done := make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-done:
 			t.Error("More than one index update sent")
@@ -80,7 +80,7 @@ func TestSymlinkTraversalRead(t *testing.T) {
 	// the expected test file.
 	done := make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-done:
 			t.Error("More than one index update sent")
@@ -125,7 +125,7 @@ func TestSymlinkTraversalWrite(t *testing.T) {
 	badReq := make(chan string, 1)
 	badIdx := make(chan string, 1)
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		for _, f := range fs {
 			if f.Name == "symlink" {
 				done <- struct{}{}
@@ -183,7 +183,7 @@ func TestRequestCreateTmpSymlink(t *testing.T) {
 	goodIdx := make(chan struct{})
 	name := fs.TempName("testlink")
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		for _, f := range fs {
 			if f.Name == name {
 				if f.IsInvalid() {
@@ -240,7 +240,7 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) {
 	// the expected test file.
 	idx := make(chan int)
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		idx <- len(fs)
 	}
 	fc.mut.Unlock()
@@ -339,7 +339,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
 
 	done := make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		expected := map[string]struct{}{invIgn: {}, ign: {}, ignExisting: {}}
 		for _, f := range fs {
 			if _, ok := expected[f.Name]; !ok {
@@ -375,7 +375,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
 	// The indexes will normally arrive in one update, but it is possible
 	// that they arrive in separate ones.
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		for _, f := range fs {
 			if _, ok := expected[f.Name]; !ok {
 				t.Errorf("Unexpected file %v was updated in index", f.Name)
@@ -434,7 +434,7 @@ func TestIssue4841(t *testing.T) {
 
 	received := make(chan []protocol.FileInfo)
 	fc.mut.Lock()
-	fc.indexFn = func(_ string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, _ string, fs []protocol.FileInfo) {
 		received <- fs
 	}
 	fc.mut.Unlock()
@@ -483,7 +483,7 @@ func TestRescanIfHaveInvalidContent(t *testing.T) {
 
 	received := make(chan []protocol.FileInfo)
 	fc.mut.Lock()
-	fc.indexFn = func(_ string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, _ string, fs []protocol.FileInfo) {
 		received <- fs
 	}
 	fc.mut.Unlock()
@@ -550,7 +550,7 @@ func TestParentDeletion(t *testing.T) {
 	fc.addFile(parent, 0777, protocol.FileInfoTypeDirectory, nil)
 	fc.addFile(child, 0777, protocol.FileInfoTypeDirectory, nil)
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		received <- fs
 	}
 	fc.mut.Unlock()
@@ -623,7 +623,7 @@ func TestRequestSymlinkWindows(t *testing.T) {
 
 	received := make(chan []protocol.FileInfo)
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-received:
 			t.Error("More than one index update sent")
@@ -693,7 +693,7 @@ func TestRequestRemoteRenameChanged(t *testing.T) {
 
 	received := make(chan []protocol.FileInfo)
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-received:
 			t.Error("More than one index update sent")
@@ -733,7 +733,7 @@ func TestRequestRemoteRenameChanged(t *testing.T) {
 	bFinalVersion := bIntermediateVersion.Copy().Update(fc.id.Short())
 	done := make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-done:
 			t.Error("Received more index updates than expected")
@@ -834,7 +834,7 @@ func TestRequestRemoteRenameConflict(t *testing.T) {
 
 	recv := make(chan int)
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		recv <- len(fs)
 	}
 	fc.mut.Unlock()
@@ -924,7 +924,7 @@ func TestRequestDeleteChanged(t *testing.T) {
 
 	done := make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-done:
 			t.Error("More than one index update sent")
@@ -947,7 +947,7 @@ func TestRequestDeleteChanged(t *testing.T) {
 	}
 
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		select {
 		case <-done:
 			t.Error("More than one index update sent")
@@ -1069,7 +1069,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) {
 
 	done := make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		basicCheck(fs)
 		close(done)
 	}
@@ -1088,7 +1088,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) {
 
 	done = make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		basicCheck(fs)
 		f := fs[0]
 		if !f.IsInvalid() {
@@ -1110,7 +1110,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) {
 
 	done = make(chan struct{})
 	fc.mut.Lock()
-	fc.indexFn = func(folder string, fs []protocol.FileInfo) {
+	fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
 		basicCheck(fs)
 		f := fs[0]
 		if f.IsInvalid() {

+ 9 - 9
lib/protocol/protocol.go

@@ -135,11 +135,11 @@ type Connection interface {
 	Close(err error)
 	ID() DeviceID
 	Name() string
-	Index(folder string, files []FileInfo) error
-	IndexUpdate(folder string, files []FileInfo) error
+	Index(ctx context.Context, folder string, files []FileInfo) error
+	IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
 	Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
 	ClusterConfig(config ClusterConfig)
-	DownloadProgress(folder string, updates []FileDownloadProgressUpdate)
+	DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
 	Statistics() Statistics
 	Closed() bool
 }
@@ -249,14 +249,14 @@ func (c *rawConnection) Name() string {
 }
 
 // Index writes the list of file information to the connected peer device
-func (c *rawConnection) Index(folder string, idx []FileInfo) error {
+func (c *rawConnection) Index(ctx context.Context, folder string, idx []FileInfo) error {
 	select {
 	case <-c.closed:
 		return ErrClosed
 	default:
 	}
 	c.idxMut.Lock()
-	c.send(context.TODO(), &Index{
+	c.send(ctx, &Index{
 		Folder: folder,
 		Files:  idx,
 	}, nil)
@@ -265,14 +265,14 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error {
 }
 
 // IndexUpdate writes the list of file information to the connected peer device as an update
-func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
+func (c *rawConnection) IndexUpdate(ctx context.Context, folder string, idx []FileInfo) error {
 	select {
 	case <-c.closed:
 		return ErrClosed
 	default:
 	}
 	c.idxMut.Lock()
-	c.send(context.TODO(), &IndexUpdate{
+	c.send(ctx, &IndexUpdate{
 		Folder: folder,
 		Files:  idx,
 	}, nil)
@@ -340,8 +340,8 @@ func (c *rawConnection) Closed() bool {
 }
 
 // DownloadProgress sends the progress updates for the files that are currently being downloaded.
-func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate) {
-	c.send(context.TODO(), &DownloadProgress{
+func (c *rawConnection) DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) {
+	c.send(ctx, &DownloadProgress{
 		Folder:  folder,
 		Updates: updates,
 	}, nil)

+ 6 - 4
lib/protocol/protocol_test.go

@@ -75,10 +75,12 @@ func TestClose(t *testing.T) {
 		t.Error("Ping should not return true")
 	}
 
-	c0.Index("default", nil)
-	c0.Index("default", nil)
+	ctx := context.Background()
 
-	if _, err := c0.Request(context.Background(), "default", "foo", 0, 0, nil, 0, false); err == nil {
+	c0.Index(ctx, "default", nil)
+	c0.Index(ctx, "default", nil)
+
+	if _, err := c0.Request(ctx, "default", "foo", 0, 0, nil, 0, false); err == nil {
 		t.Error("Request should return an error")
 	}
 }
@@ -152,7 +154,7 @@ func TestCloseRace(t *testing.T) {
 	c0.ClusterConfig(ClusterConfig{})
 	c1.ClusterConfig(ClusterConfig{})
 
-	c1.Index("default", nil)
+	c1.Index(context.Background(), "default", nil)
 	select {
 	case <-indexReceived:
 	case <-time.After(time.Second):

+ 4 - 4
lib/protocol/wireformat.go

@@ -13,7 +13,7 @@ type wireFormatConnection struct {
 	Connection
 }
 
-func (c wireFormatConnection) Index(folder string, fs []FileInfo) error {
+func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error {
 	var myFs = make([]FileInfo, len(fs))
 	copy(myFs, fs)
 
@@ -21,10 +21,10 @@ func (c wireFormatConnection) Index(folder string, fs []FileInfo) error {
 		myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
 	}
 
-	return c.Connection.Index(folder, myFs)
+	return c.Connection.Index(ctx, folder, myFs)
 }
 
-func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error {
+func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error {
 	var myFs = make([]FileInfo, len(fs))
 	copy(myFs, fs)
 
@@ -32,7 +32,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error {
 		myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
 	}
 
-	return c.Connection.IndexUpdate(folder, myFs)
+	return c.Connection.IndexUpdate(ctx, folder, myFs)
 }
 
 func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {