Browse Source

Don't leak request slots (fixes #483)

Jakob Borg 11 years ago
parent
commit
e1f1ae041f
3 changed files with 17 additions and 13 deletions
  1. 1 4
      integration/common_test.go
  2. 9 8
      integration/reconnect_test.go
  3. 7 1
      model/puller.go

+ 1 - 4
integration/common_test.go

@@ -152,10 +152,7 @@ func (i *inifiteReader) Read(bs []byte) (int, error) {
 // rm -rf
 func removeAll(dirs ...string) error {
 	for _, dir := range dirs {
-		err := os.RemoveAll(dir)
-		if err != nil {
-			return err
-		}
+		os.RemoveAll(dir)
 	}
 	return nil
 }

+ 9 - 8
integration/reconnect_test.go

@@ -7,6 +7,7 @@
 package integration_test
 
 import (
+	"log"
 	"sync"
 	"testing"
 	"time"
@@ -39,19 +40,19 @@ func TestRestartSenderDuringTransfer(t *testing.T) {
 }
 
 func testRestartDuringTransfer(t *testing.T, restartSender, restartReceiver bool, senderDelay, receiverDelay time.Duration) {
-	t.Log("Cleaning...")
+	log.Println("Cleaning...")
 	err := removeAll("s1", "s2", "f1/index", "f2/index")
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	t.Log("Generating files...")
+	log.Println("Generating files...")
 	err = generateFiles("s1", 1000, 20, "../bin/syncthing")
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	t.Log("Starting up...")
+	log.Println("Starting up...")
 	sender := syncthingProcess{ // id1
 		log:  "1.out",
 		argv: []string{"-home", "f1"},
@@ -94,12 +95,12 @@ func testRestartDuringTransfer(t *testing.T, restartSender, restartReceiver bool
 
 		if curComp > prevComp {
 			if restartReceiver {
-				t.Logf("Stopping receiver...")
+				log.Printf("Stopping receiver...")
 				receiver.stop()
 			}
 
 			if restartSender {
-				t.Logf("Stopping sender...")
+				log.Printf("Stopping sender...")
 				sender.stop()
 			}
 
@@ -109,7 +110,7 @@ func testRestartDuringTransfer(t *testing.T, restartSender, restartReceiver bool
 				wg.Add(1)
 				go func() {
 					time.Sleep(receiverDelay)
-					t.Logf("Starting receiver...")
+					log.Printf("Starting receiver...")
 					receiver.start()
 					wg.Done()
 				}()
@@ -119,7 +120,7 @@ func testRestartDuringTransfer(t *testing.T, restartSender, restartReceiver bool
 				wg.Add(1)
 				go func() {
 					time.Sleep(senderDelay)
-					t.Logf("Starting sender...")
+					log.Printf("Starting sender...")
 					sender.start()
 					wg.Done()
 				}()
@@ -133,7 +134,7 @@ func testRestartDuringTransfer(t *testing.T, restartSender, restartReceiver bool
 		time.Sleep(1 * time.Second)
 	}
 
-	t.Log("Comparing directories...")
+	log.Println("Comparing directories...")
 	err = compareDirectories("s1", "s2")
 	if err != nil {
 		t.Fatal(err)

+ 7 - 1
model/puller.go

@@ -26,6 +26,7 @@ package model
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"math/rand"
 	"os"
 	"path/filepath"
@@ -145,8 +146,11 @@ func (p *puller) run() {
 	}
 
 	for {
-		// Run the pulling loop as long as there are blocks to fetch
+		if sc, sl := cap(p.requestSlots), len(p.requestSlots); sl != sc {
+			panic(fmt.Sprintf("Incorrect number of slots; %d != %d", sl, sc))
+		}
 
+		// Run the pulling loop as long as there are blocks to fetch
 		prevVer, queued = p.queueNeededBlocks(prevVer)
 		if queued > 0 {
 			p.errors = 0
@@ -169,6 +173,7 @@ func (p *puller) run() {
 						}
 
 						if p.errors > 0 && p.errors >= queued {
+							p.requestSlots <- true
 							break pull
 						}
 
@@ -181,6 +186,7 @@ func (p *puller) run() {
 						if debug {
 							l.Debugf("%q: pulling loop done", p.repoCfg.ID)
 						}
+						p.requestSlots <- true
 						break pull
 					}