Browse Source

Test and fix reconnects during pull

Jakob Borg 11 years ago
parent
commit
08ce9b09ec
5 changed files with 104 additions and 7 deletions
  1. 1 0
      integration/all.sh
  2. 2 2
      integration/f1/config.xml
  3. 2 2
      integration/f2/config.xml
  4. 85 0
      integration/test-reconnect.sh
  5. 14 3
      model/puller.go

+ 1 - 0
integration/all.sh

@@ -4,3 +4,4 @@
 ./test-merge.sh || exit
 ./test-delupd.sh || exit
 ./test-folders.sh || exit
+./test-reconnect.sh || exit

+ 2 - 2
integration/f1/config.xml

@@ -3,7 +3,6 @@
         <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA"></node>
         <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ"></node>
         <versioning></versioning>
-        <syncorder></syncorder>
     </repository>
     <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="f1">
         <address>127.0.0.1:22001</address>
@@ -22,11 +21,12 @@
         <localAnnounceEnabled>true</localAnnounceEnabled>
         <localAnnouncePort>21025</localAnnouncePort>
         <parallelRequests>16</parallelRequests>
-        <maxSendKbps>0</maxSendKbps>
+        <maxSendKbps>1000</maxSendKbps>
         <rescanIntervalS>10</rescanIntervalS>
         <reconnectionIntervalS>5</reconnectionIntervalS>
         <maxChangeKbps>10000</maxChangeKbps>
         <startBrowser>false</startBrowser>
         <upnpEnabled>true</upnpEnabled>
+        <urAccepted>-1</urAccepted>
     </options>
 </configuration>

+ 2 - 2
integration/f2/config.xml

@@ -5,7 +5,6 @@
         <versioning type="simple">
             <param key="keep" val="5"></param>
         </versioning>
-        <syncorder></syncorder>
     </repository>
     <node id="I6KAH76-66SLLLB-5PFXSOA-UFJCDZC-YAOMLEK-CP2GB32-BV5RQST-3PSROAU" name="f1">
         <address>127.0.0.1:22001</address>
@@ -19,7 +18,7 @@
     </gui>
     <options>
         <listenAddress>127.0.0.1:22002</listenAddress>
-        <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer>
+        <globalAnnounceServer>announce.syncthing.net:22026</globalAnnounceServer>
         <globalAnnounceEnabled>false</globalAnnounceEnabled>
         <localAnnounceEnabled>true</localAnnounceEnabled>
         <localAnnouncePort>21025</localAnnouncePort>
@@ -30,5 +29,6 @@
         <maxChangeKbps>10000</maxChangeKbps>
         <startBrowser>false</startBrowser>
         <upnpEnabled>true</upnpEnabled>
+        <urAccepted>-1</urAccepted>
     </options>
 </configuration>

+ 85 - 0
integration/test-reconnect.sh

@@ -0,0 +1,85 @@
+#!/bin/bash
+
+# Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
+# Use of this source code is governed by an MIT-style license that can be
+# found in the LICENSE file.
+
+id1=I6KAH76-66SLLLB-5PFXSOA-UFJCDZC-YAOMLEK-CP2GB32-BV5RQST-3PSROAU
+id2=JMFJCXB-GZDE4BN-OCJE3VF-65GYZNU-AIVJRET-3J6HMRQ-AUQIGJO-FKNHMQU
+
+go build json.go
+go build md5r.go
+go build genfiles.go
+
+start() {
+	echo "Starting..."
+	STTRACE=model,scanner STPROFILER=":9091" syncthing -home "f1" > 1.out 2>&1 &
+	STTRACE=model,scanner STPROFILER=":9092" syncthing -home "f2" > 2.out 2>&1 &
+	sleep 1
+}
+
+stop() {
+	echo "Stopping..."
+	for i in 1 2 ; do
+		curl -HX-API-Key:abc123 -X POST "http://localhost:808$i/rest/shutdown"
+	done
+	sleep 1
+}
+
+setup() {
+	echo "Setting up..."
+	rm -rf s? s??-?
+	rm -rf f?/*.idx.gz f?/index
+	mkdir -p s1
+	pushd s1 >/dev/null
+	../genfiles
+	../md5r > ../md5-1
+	popd >/dev/null
+}
+
+testConvergence() {
+	torestart="$1"
+	prevcomp=0
+
+	while true ; do
+		sleep 5
+		comp=$(curl -HX-API-Key:abc123 -s "http://localhost:8081/rest/connections" | ./json "$id2/Completion")
+		comp=${comp:-0}
+		echo $comp / 100
+
+		if [[ $comp == 100 ]] ; then
+			echo Done
+			break
+		fi
+
+		# Restart if the destination has made some progress
+		if [[ $comp -gt $prevcomp ]] ; then
+			prevcomp=$comp
+			curl -HX-API-Key:abc123 -X POST "http://localhost:$torestart/rest/restart"
+		fi
+	done
+
+	echo "Verifying..."
+
+	pushd s2 >/dev/null
+	../md5r | grep -v .stversions > ../md5-2
+	popd >/dev/null
+
+	if ! cmp md5-1 md5-2 ; then
+		echo Repos differ
+		stop
+		exit 1
+	fi
+}
+
+echo Testing reconnects during pull where the source node restarts
+setup
+start
+testConvergence 8081
+stop
+
+echo Testing reconnects during pull where the destination node restarts
+setup
+start
+testConvergence 8082
+stop

+ 14 - 3
model/puller.go

@@ -315,17 +315,19 @@ func (p *puller) handleRequestResult(res requestResult) {
 	f := res.file
 
 	of, ok := p.openFiles[f.Name]
-	if !ok || of.err != nil {
+	if !ok {
 		// no entry in openFiles means there was an error and we've cancelled the operation
 		return
 	}
 
 	if res.err != nil {
+		// This request resulted in an error
 		of.err = res.err
 		if debug {
-			l.Debugf("pull: not writing %q / %q offset %d: %v", p.repoCfg.ID, f.Name, res.offset, res.err)
+			l.Debugf("pull: not writing %q / %q offset %d: %v; (done=%v, outstanding=%d)", p.repoCfg.ID, f.Name, res.offset, res.err, of.done, of.outstanding)
 		}
-	} else {
+	} else if of.err == nil {
+		// This request was sucessfull and nothing has failed previously either
 		_, of.err = of.file.WriteAt(res.data, res.offset)
 		if debug {
 			l.Debugf("pull: wrote %q / %q offset %d len %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, len(res.data), of.outstanding, of.done)
@@ -523,10 +525,19 @@ func (p *puller) handleRequestBlock(b bqBlock) bool {
 			of.file.Close()
 			of.file = nil
 			os.Remove(of.temp)
+			if debug {
+				l.Debugf("pull: no source for %q / %q; closed", p.repoCfg.ID, f.Name)
+			}
 		}
 		if b.last {
+			if debug {
+				l.Debugf("pull: no source for %q / %q; deleting", p.repoCfg.ID, f.Name)
+			}
 			delete(p.openFiles, f.Name)
 		} else {
+			if debug {
+				l.Debugf("pull: no source for %q / %q; await more blocks", p.repoCfg.ID, f.Name)
+			}
 			p.openFiles[f.Name] = of
 		}
 		return true