Browse Source

Hand incoming indexes on main goroutine (this should be fine now)

Jakob Borg 11 years ago
parent
commit
91cc84c4e6
1 changed files with 17 additions and 62 deletions
  1. 17 62
      protocol/protocol.go

+ 17 - 62
protocol/protocol.go

@@ -11,6 +11,7 @@ import (
 	"io"
 	"sync"
 	"time"
+
 	"github.com/calmh/syncthing/xdr"
 )
 
@@ -95,15 +96,6 @@ type rawConnection struct {
 	outbox chan []encodable
 	closed chan struct{}
 	once   sync.Once
-
-	incomingIndexes chan incomingIndex
-}
-
-type incomingIndex struct {
-	update bool
-	id     NodeID
-	repo   string
-	files  []FileInfo
 }
 
 type asyncResult struct {
@@ -124,23 +116,21 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M
 	wb := bufio.NewWriterSize(cw, 65536)
 
 	c := rawConnection{
-		id:              nodeID,
-		name:            name,
-		receiver:        nativeModel{receiver},
-		state:           stateInitial,
-		cr:              cr,
-		xr:              xdr.NewReader(rb),
-		cw:              cw,
-		wb:              wb,
-		xw:              xdr.NewWriter(wb),
-		awaiting:        make([]chan asyncResult, 0x1000),
-		outbox:          make(chan []encodable),
-		nextID:          make(chan int),
-		closed:          make(chan struct{}),
-		incomingIndexes: make(chan incomingIndex, 100), // should be enough for anyone, right?
+		id:       nodeID,
+		name:     name,
+		receiver: nativeModel{receiver},
+		state:    stateInitial,
+		cr:       cr,
+		xr:       xdr.NewReader(rb),
+		cw:       cw,
+		wb:       wb,
+		xw:       xdr.NewWriter(wb),
+		awaiting: make([]chan asyncResult, 0x1000),
+		outbox:   make(chan []encodable),
+		nextID:   make(chan int),
+		closed:   make(chan struct{}),
 	}
 
-	go c.indexSerializerLoop()
 	go c.readerLoop()
 	go c.writerLoop()
 	go c.pingerLoop()
@@ -316,51 +306,16 @@ func (c *rawConnection) readerLoop() (err error) {
 	}
 }
 
-func (c *rawConnection) indexSerializerLoop() {
-	// We must avoid blocking the reader loop when processing large indexes.
-	// There is otherwise a potential deadlock where both sides has the model
-	// locked because it's sending a large index update and can't receive the
-	// large index update from the other side. But we must also ensure to
-	// process the indexes in the order they are received, hence the separate
-	// routine and buffered channel.
-	for {
-		select {
-		case ii := <-c.incomingIndexes:
-			if ii.update {
-				if debug {
-					l.Debugf("calling IndexUpdate(%v, %v, %d files)", ii.id, ii.repo, len(ii.files))
-				}
-				c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
-			} else {
-				if debug {
-					l.Debugf("calling Index(%v, %v, %d files)", ii.id, ii.repo, len(ii.files))
-				}
-				c.receiver.Index(ii.id, ii.repo, ii.files)
-			}
-		case <-c.closed:
-			return
-		}
-	}
-}
-
 func (c *rawConnection) handleIndex() error {
 	var im IndexMessage
 	im.decodeXDR(c.xr)
 	if err := c.xr.Error(); err != nil {
 		return err
 	} else {
-
-		// We run this (and the corresponding one for update, below)
-		// in a separate goroutine to avoid blocking the read loop.
-		// There is otherwise a potential deadlock where both sides
-		// has the model locked because it's sending a large index
-		// update and can't receive the large index update from the
-		// other side.
-
 		if debug {
-			l.Debugf("queueing Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
+			l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
 		}
-		c.incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
+		c.receiver.Index(c.id, im.Repository, im.Files)
 	}
 	return nil
 }
@@ -374,7 +329,7 @@ func (c *rawConnection) handleIndexUpdate() error {
 		if debug {
 			l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
 		}
-		c.incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
+		c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
 	}
 	return nil
 }