浏览代码

Re-add inadvertently ignored files

Jakob Borg 11 年之前
父节点
当前提交
9743386166

+ 1 - 0
cmd/.gitignore

@@ -0,0 +1 @@
+!syncthing

+ 74 - 0
cmd/syncthing/blocks.go

@@ -0,0 +1,74 @@
+package main
+
+import (
+	"bytes"
+	"crypto/sha256"
+	"io"
+)
+
+type Block struct {
+	Offset int64
+	Size   uint32
+	Hash   []byte
+}
+
+// Blocks returns the blockwise hash of the reader.
+func Blocks(r io.Reader, blocksize int) ([]Block, error) {
+	var blocks []Block
+	var offset int64
+	for {
+		lr := &io.LimitedReader{R: r, N: int64(blocksize)}
+		hf := sha256.New()
+		n, err := io.Copy(hf, lr)
+		if err != nil {
+			return nil, err
+		}
+
+		if n == 0 {
+			break
+		}
+
+		b := Block{
+			Offset: offset,
+			Size:   uint32(n),
+			Hash:   hf.Sum(nil),
+		}
+		blocks = append(blocks, b)
+		offset += int64(n)
+	}
+
+	if len(blocks) == 0 {
+		// Empty file
+		blocks = append(blocks, Block{
+			Offset: 0,
+			Size:   0,
+			Hash:   []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55},
+		})
+	}
+
+	return blocks, nil
+}
+
+// BlockDiff returns lists of common and missing (to transform src into tgt)
+// blocks. Both block lists must have been created with the same block size.
+func BlockDiff(src, tgt []Block) (have, need []Block) {
+	if len(tgt) == 0 && len(src) != 0 {
+		return nil, nil
+	}
+
+	if len(tgt) != 0 && len(src) == 0 {
+		// Copy the entire file
+		return nil, tgt
+	}
+
+	for i := range tgt {
+		if i >= len(src) || bytes.Compare(tgt[i].Hash, src[i].Hash) != 0 {
+			// Copy differing block
+			need = append(need, tgt[i])
+		} else {
+			have = append(have, tgt[i])
+		}
+	}
+
+	return have, need
+}

+ 116 - 0
cmd/syncthing/blocks_test.go

@@ -0,0 +1,116 @@
+package main
+
+import (
+	"bytes"
+	"fmt"
+	"testing"
+)
+
+var blocksTestData = []struct {
+	data      []byte
+	blocksize int
+	hash      []string
+}{
+	{[]byte(""), 1024, []string{
+		"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"}},
+	{[]byte("contents"), 1024, []string{
+		"d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}},
+	{[]byte("contents"), 9, []string{
+		"d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}},
+	{[]byte("contents"), 8, []string{
+		"d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}},
+	{[]byte("contents"), 7, []string{
+		"ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73",
+		"043a718774c572bd8a25adbeb1bfcd5c0256ae11cecf9f9c3f925d0e52beaf89"},
+	},
+	{[]byte("contents"), 3, []string{
+		"1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952",
+		"e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3",
+		"44ad63f60af0f6db6fdde6d5186ef78176367df261fa06be3079b6c80c8adba4"},
+	},
+	{[]byte("conconts"), 3, []string{
+		"1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952",
+		"1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952",
+		"44ad63f60af0f6db6fdde6d5186ef78176367df261fa06be3079b6c80c8adba4"},
+	},
+	{[]byte("contenten"), 3, []string{
+		"1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952",
+		"e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3",
+		"e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3"},
+	},
+}
+
+func TestBlocks(t *testing.T) {
+	for _, test := range blocksTestData {
+		buf := bytes.NewBuffer(test.data)
+		blocks, err := Blocks(buf, test.blocksize)
+
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		if l := len(blocks); l != len(test.hash) {
+			t.Fatalf("Incorrect number of blocks %d != %d", l, len(test.hash))
+		} else {
+			i := 0
+			for off := int64(0); off < int64(len(test.data)); off += int64(test.blocksize) {
+				if blocks[i].Offset != off {
+					t.Errorf("Incorrect offset for block %d: %d != %d", i, blocks[i].Offset, off)
+				}
+
+				bs := test.blocksize
+				if rem := len(test.data) - int(off); bs > rem {
+					bs = rem
+				}
+				if int(blocks[i].Size) != bs {
+					t.Errorf("Incorrect length for block %d: %d != %d", i, blocks[i].Size, bs)
+				}
+				if h := fmt.Sprintf("%x", blocks[i].Hash); h != test.hash[i] {
+					t.Errorf("Incorrect block hash %q != %q", h, test.hash[i])
+				}
+
+				i++
+			}
+		}
+	}
+}
+
+var diffTestData = []struct {
+	a string
+	b string
+	s int
+	d []Block
+}{
+	{"contents", "contents", 1024, []Block{}},
+	{"", "", 1024, []Block{}},
+	{"contents", "contents", 3, []Block{}},
+	{"contents", "cantents", 3, []Block{{0, 3, nil}}},
+	{"contents", "contants", 3, []Block{{3, 3, nil}}},
+	{"contents", "cantants", 3, []Block{{0, 3, nil}, {3, 3, nil}}},
+	{"contents", "", 3, []Block{{0, 0, nil}}},
+	{"", "contents", 3, []Block{{0, 3, nil}, {3, 3, nil}, {6, 2, nil}}},
+	{"con", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}},
+	{"contents", "con", 3, nil},
+	{"contents", "cont", 3, []Block{{3, 1, nil}}},
+	{"cont", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}},
+}
+
+func TestDiff(t *testing.T) {
+	for i, test := range diffTestData {
+		a, _ := Blocks(bytes.NewBufferString(test.a), test.s)
+		b, _ := Blocks(bytes.NewBufferString(test.b), test.s)
+		_, d := BlockDiff(a, b)
+		if len(d) != len(test.d) {
+			t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d))
+		} else {
+			for j := range test.d {
+				if d[j].Offset != test.d[j].Offset {
+					t.Errorf("Incorrect offset for diff %d block %d; %d != %d", i, j, d[j].Offset, test.d[j].Offset)
+				}
+				if d[j].Size != test.d[j].Size {
+					t.Errorf("Incorrect length for diff %d block %d; %d != %d", i, j, d[j].Size, test.d[j].Size)
+				}
+			}
+		}
+	}
+}

+ 202 - 0
cmd/syncthing/config.go

@@ -0,0 +1,202 @@
+package main
+
+import (
+	"crypto/sha256"
+	"encoding/xml"
+	"fmt"
+	"io"
+	"reflect"
+	"sort"
+	"strconv"
+	"strings"
+)
+
+type Configuration struct {
+	Version      int                       `xml:"version,attr" default:"1"`
+	Repositories []RepositoryConfiguration `xml:"repository"`
+	Options      OptionsConfiguration      `xml:"options"`
+	XMLName      xml.Name                  `xml:"configuration" json:"-"`
+}
+
+type RepositoryConfiguration struct {
+	Directory string              `xml:"directory,attr"`
+	Nodes     []NodeConfiguration `xml:"node"`
+}
+
+type NodeConfiguration struct {
+	NodeID    string   `xml:"id,attr"`
+	Name      string   `xml:"name,attr"`
+	Addresses []string `xml:"address"`
+}
+
+type OptionsConfiguration struct {
+	ListenAddress      []string `xml:"listenAddress" default:":22000" ini:"listen-address"`
+	ReadOnly           bool     `xml:"readOnly" ini:"read-only"`
+	AllowDelete        bool     `xml:"allowDelete" default:"true" ini:"allow-delete"`
+	FollowSymlinks     bool     `xml:"followSymlinks" default:"true" ini:"follow-symlinks"`
+	GUIEnabled         bool     `xml:"guiEnabled" default:"true" ini:"gui-enabled"`
+	GUIAddress         string   `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"`
+	GlobalAnnServer    string   `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025" ini:"global-announce-server"`
+	GlobalAnnEnabled   bool     `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"`
+	LocalAnnEnabled    bool     `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"`
+	ParallelRequests   int      `xml:"parallelRequests" default:"16" ini:"parallel-requests"`
+	MaxSendKbps        int      `xml:"maxSendKbps" ini:"max-send-kbps"`
+	RescanIntervalS    int      `xml:"rescanIntervalS" default:"60" ini:"rescan-interval"`
+	ReconnectIntervalS int      `xml:"reconnectionIntervalS" default:"60" ini:"reconnection-interval"`
+	MaxChangeKbps      int      `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"`
+}
+
+func setDefaults(data interface{}) error {
+	s := reflect.ValueOf(data).Elem()
+	t := s.Type()
+
+	for i := 0; i < s.NumField(); i++ {
+		f := s.Field(i)
+		tag := t.Field(i).Tag
+
+		v := tag.Get("default")
+		if len(v) > 0 {
+			switch f.Interface().(type) {
+			case string:
+				f.SetString(v)
+
+			case []string:
+				rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1)
+				rv.Index(0).SetString(v)
+				f.Set(rv)
+
+			case int:
+				i, err := strconv.ParseInt(v, 10, 64)
+				if err != nil {
+					return err
+				}
+				f.SetInt(i)
+
+			case bool:
+				f.SetBool(v == "true")
+
+			default:
+				panic(f.Type())
+			}
+		}
+	}
+	return nil
+}
+
+func readConfigINI(m map[string]string, data interface{}) error {
+	s := reflect.ValueOf(data).Elem()
+	t := s.Type()
+
+	for i := 0; i < s.NumField(); i++ {
+		f := s.Field(i)
+		tag := t.Field(i).Tag
+
+		name := tag.Get("ini")
+		if len(name) == 0 {
+			name = strings.ToLower(t.Field(i).Name)
+		}
+
+		if v, ok := m[name]; ok {
+			switch f.Interface().(type) {
+			case string:
+				f.SetString(v)
+
+			case int:
+				i, err := strconv.ParseInt(v, 10, 64)
+				if err == nil {
+					f.SetInt(i)
+				}
+
+			case bool:
+				f.SetBool(v == "true")
+
+			default:
+				panic(f.Type())
+			}
+		}
+	}
+	return nil
+}
+
+func writeConfigXML(wr io.Writer, cfg Configuration) error {
+	e := xml.NewEncoder(wr)
+	e.Indent("", "    ")
+	err := e.Encode(cfg)
+	if err != nil {
+		return err
+	}
+	_, err = wr.Write([]byte("\n"))
+	return err
+}
+
+func uniqueStrings(ss []string) []string {
+	var m = make(map[string]bool, len(ss))
+	for _, s := range ss {
+		m[s] = true
+	}
+
+	var us = make([]string, 0, len(m))
+	for k := range m {
+		us = append(us, k)
+	}
+
+	return us
+}
+
+func readConfigXML(rd io.Reader) (Configuration, error) {
+	var cfg Configuration
+
+	setDefaults(&cfg)
+	setDefaults(&cfg.Options)
+
+	var err error
+	if rd != nil {
+		err = xml.NewDecoder(rd).Decode(&cfg)
+	}
+
+	cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress)
+	return cfg, err
+}
+
+type NodeConfigurationList []NodeConfiguration
+
+func (l NodeConfigurationList) Less(a, b int) bool {
+	return l[a].NodeID < l[b].NodeID
+}
+func (l NodeConfigurationList) Swap(a, b int) {
+	l[a], l[b] = l[b], l[a]
+}
+func (l NodeConfigurationList) Len() int {
+	return len(l)
+}
+
+func clusterHash(nodes []NodeConfiguration) string {
+	sort.Sort(NodeConfigurationList(nodes))
+	h := sha256.New()
+	for _, n := range nodes {
+		h.Write([]byte(n.NodeID))
+	}
+	return fmt.Sprintf("%x", h.Sum(nil))
+}
+
+func cleanNodeList(nodes []NodeConfiguration, myID string) []NodeConfiguration {
+	var myIDExists bool
+	for _, node := range nodes {
+		if node.NodeID == myID {
+			myIDExists = true
+			break
+		}
+	}
+
+	if !myIDExists {
+		nodes = append(nodes, NodeConfiguration{
+			NodeID:    myID,
+			Addresses: []string{"dynamic"},
+			Name:      "",
+		})
+	}
+
+	sort.Sort(NodeConfigurationList(nodes))
+
+	return nodes
+}

+ 173 - 0
cmd/syncthing/filemonitor.go

@@ -0,0 +1,173 @@
+package main
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"log"
+	"os"
+	"path"
+	"sync"
+	"time"
+
+	"github.com/calmh/syncthing/buffers"
+)
+
+type fileMonitor struct {
+	name        string // in-repo name
+	path        string // full path
+	writeDone   sync.WaitGroup
+	model       *Model
+	global      File
+	localBlocks []Block
+	copyError   error
+	writeError  error
+}
+
+func (m *fileMonitor) FileBegins(cc <-chan content) error {
+	if m.model.trace["file"] {
+		log.Printf("FILE: FileBegins: " + m.name)
+	}
+
+	tmp := tempName(m.path, m.global.Modified)
+
+	dir := path.Dir(tmp)
+	_, err := os.Stat(dir)
+	if err != nil && os.IsNotExist(err) {
+		err = os.MkdirAll(dir, 0777)
+		if err != nil {
+			return err
+		}
+	}
+
+	outFile, err := os.Create(tmp)
+	if err != nil {
+		return err
+	}
+
+	m.writeDone.Add(1)
+
+	var writeWg sync.WaitGroup
+	if len(m.localBlocks) > 0 {
+		writeWg.Add(1)
+		inFile, err := os.Open(m.path)
+		if err != nil {
+			return err
+		}
+
+		// Copy local blocks, close infile when done
+		go m.copyLocalBlocks(inFile, outFile, &writeWg)
+	}
+
+	// Write remote blocks,
+	writeWg.Add(1)
+	go m.copyRemoteBlocks(cc, outFile, &writeWg)
+
+	// Wait for both writing routines, then close the outfile
+	go func() {
+		writeWg.Wait()
+		outFile.Close()
+		m.writeDone.Done()
+	}()
+
+	return nil
+}
+
+func (m *fileMonitor) copyLocalBlocks(inFile, outFile *os.File, writeWg *sync.WaitGroup) {
+	defer inFile.Close()
+	defer writeWg.Done()
+
+	var buf = buffers.Get(BlockSize)
+	defer buffers.Put(buf)
+
+	for _, lb := range m.localBlocks {
+		buf = buf[:lb.Size]
+		_, err := inFile.ReadAt(buf, lb.Offset)
+		if err != nil {
+			m.copyError = err
+			return
+		}
+		_, err = outFile.WriteAt(buf, lb.Offset)
+		if err != nil {
+			m.copyError = err
+			return
+		}
+	}
+}
+
+func (m *fileMonitor) copyRemoteBlocks(cc <-chan content, outFile *os.File, writeWg *sync.WaitGroup) {
+	defer writeWg.Done()
+
+	for content := range cc {
+		_, err := outFile.WriteAt(content.data, content.offset)
+		buffers.Put(content.data)
+		if err != nil {
+			m.writeError = err
+			return
+		}
+	}
+}
+
+func (m *fileMonitor) FileDone() error {
+	if m.model.trace["file"] {
+		log.Printf("FILE: FileDone: " + m.name)
+	}
+
+	m.writeDone.Wait()
+
+	tmp := tempName(m.path, m.global.Modified)
+	defer os.Remove(tmp)
+
+	if m.copyError != nil {
+		return m.copyError
+	}
+	if m.writeError != nil {
+		return m.writeError
+	}
+
+	err := hashCheck(tmp, m.global.Blocks)
+	if err != nil {
+		return err
+	}
+
+	err = os.Chtimes(tmp, time.Unix(m.global.Modified, 0), time.Unix(m.global.Modified, 0))
+	if err != nil {
+		return err
+	}
+
+	err = os.Chmod(tmp, os.FileMode(m.global.Flags&0777))
+	if err != nil {
+		return err
+	}
+
+	err = os.Rename(tmp, m.path)
+	if err != nil {
+		return err
+	}
+
+	m.model.updateLocal(m.global)
+	return nil
+}
+
+func hashCheck(name string, correct []Block) error {
+	rf, err := os.Open(name)
+	if err != nil {
+		return err
+	}
+	defer rf.Close()
+
+	current, err := Blocks(rf, BlockSize)
+	if err != nil {
+		return err
+	}
+	if len(current) != len(correct) {
+		return errors.New("incorrect number of blocks")
+	}
+	for i := range current {
+		if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 {
+			return fmt.Errorf("hash mismatch: %x != %x", current[i], correct[i])
+		}
+	}
+
+	return nil
+}

+ 239 - 0
cmd/syncthing/filequeue.go

@@ -0,0 +1,239 @@
+package main
+
+import (
+	"log"
+	"sort"
+	"sync"
+	"time"
+)
+
+type Monitor interface {
+	FileBegins(<-chan content) error
+	FileDone() error
+}
+
+type FileQueue struct {
+	files        queuedFileList
+	sorted       bool
+	fmut         sync.Mutex // protects files and sorted
+	availability map[string][]string
+	amut         sync.Mutex // protects availability
+	queued       map[string]bool
+}
+
+type queuedFile struct {
+	name         string
+	blocks       []Block
+	activeBlocks []bool
+	given        int
+	remaining    int
+	channel      chan content
+	nodes        []string
+	nodesChecked time.Time
+	monitor      Monitor
+}
+
+type content struct {
+	offset int64
+	data   []byte
+}
+
+type queuedFileList []queuedFile
+
+func (l queuedFileList) Len() int { return len(l) }
+
+func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
+
+func (l queuedFileList) Less(a, b int) bool {
+	// Sort by most blocks already given out, then alphabetically
+	if l[a].given != l[b].given {
+		return l[a].given > l[b].given
+	}
+	return l[a].name < l[b].name
+}
+
+type queuedBlock struct {
+	name  string
+	block Block
+	index int
+}
+
+func NewFileQueue() *FileQueue {
+	return &FileQueue{
+		availability: make(map[string][]string),
+		queued:       make(map[string]bool),
+	}
+}
+
+func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
+	q.fmut.Lock()
+	defer q.fmut.Unlock()
+
+	if q.queued[name] {
+		return
+	}
+
+	q.files = append(q.files, queuedFile{
+		name:         name,
+		blocks:       blocks,
+		activeBlocks: make([]bool, len(blocks)),
+		remaining:    len(blocks),
+		channel:      make(chan content),
+		monitor:      monitor,
+	})
+	q.queued[name] = true
+	q.sorted = false
+}
+
+func (q *FileQueue) Len() int {
+	q.fmut.Lock()
+	defer q.fmut.Unlock()
+
+	return len(q.files)
+}
+
+func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
+	q.fmut.Lock()
+	defer q.fmut.Unlock()
+
+	if !q.sorted {
+		sort.Sort(q.files)
+		q.sorted = true
+	}
+
+	for i := range q.files {
+		qf := &q.files[i]
+
+		q.amut.Lock()
+		av := q.availability[qf.name]
+		q.amut.Unlock()
+
+		if len(av) == 0 {
+			// Noone has the file we want; abort.
+			if qf.remaining != len(qf.blocks) {
+				// We have already started on this file; close it down
+				close(qf.channel)
+				if mon := qf.monitor; mon != nil {
+					mon.FileDone()
+				}
+			}
+			delete(q.queued, qf.name)
+			q.deleteAt(i)
+			return queuedBlock{}, false
+		}
+
+		for _, ni := range av {
+			// Find and return the next block in the queue
+			if ni == nodeID {
+				for j, b := range qf.blocks {
+					if !qf.activeBlocks[j] {
+						qf.activeBlocks[j] = true
+						qf.given++
+						return queuedBlock{
+							name:  qf.name,
+							block: b,
+							index: j,
+						}, true
+					}
+				}
+				break
+			}
+		}
+	}
+
+	// We found nothing to do
+	return queuedBlock{}, false
+}
+
+func (q *FileQueue) Done(file string, offset int64, data []byte) {
+	q.fmut.Lock()
+	defer q.fmut.Unlock()
+
+	c := content{
+		offset: offset,
+		data:   data,
+	}
+	for i := range q.files {
+		qf := &q.files[i]
+
+		if qf.name == file {
+			if qf.monitor != nil && qf.remaining == len(qf.blocks) {
+				err := qf.monitor.FileBegins(qf.channel)
+				if err != nil {
+					log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
+					delete(q.queued, qf.name)
+					q.deleteAt(i)
+					return
+				}
+			}
+
+			qf.channel <- c
+			qf.remaining--
+
+			if qf.remaining == 0 {
+				close(qf.channel)
+				if qf.monitor != nil {
+					err := qf.monitor.FileDone()
+					if err != nil {
+						log.Printf("WARNING: %s: %v", qf.name, err)
+					}
+				}
+				delete(q.queued, qf.name)
+				q.deleteAt(i)
+			}
+			return
+		}
+	}
+
+	// We found nothing, might have errored out already
+}
+
+func (q *FileQueue) QueuedFiles() (files []string) {
+	q.fmut.Lock()
+	defer q.fmut.Unlock()
+
+	for _, qf := range q.files {
+		files = append(files, qf.name)
+	}
+	return
+}
+
+func (q *FileQueue) deleteAt(i int) {
+	q.files = append(q.files[:i], q.files[i+1:]...)
+}
+
+func (q *FileQueue) deleteFile(n string) {
+	for i, file := range q.files {
+		if n == file.name {
+			q.deleteAt(i)
+			delete(q.queued, file.name)
+			return
+		}
+	}
+}
+
+func (q *FileQueue) SetAvailable(file string, nodes []string) {
+	q.amut.Lock()
+	defer q.amut.Unlock()
+
+	q.availability[file] = nodes
+}
+
+func (q *FileQueue) RemoveAvailable(toRemove string) {
+	q.fmut.Lock()
+	q.amut.Lock()
+	defer q.amut.Unlock()
+	defer q.fmut.Unlock()
+
+	for file, nodes := range q.availability {
+		for i, node := range nodes {
+			if node == toRemove {
+				q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
+				if len(q.availability[file]) == 0 {
+					q.deleteFile(file)
+				}
+			}
+			break
+		}
+	}
+}

+ 295 - 0
cmd/syncthing/filequeue_test.go

@@ -0,0 +1,295 @@
+package main
+
+import (
+	"reflect"
+	"sync"
+	"sync/atomic"
+	"testing"
+)
+
+func TestFileQueueAdd(t *testing.T) {
+	q := NewFileQueue()
+	q.Add("foo", nil, nil)
+}
+
+func TestFileQueueAddSorting(t *testing.T) {
+	q := NewFileQueue()
+	q.SetAvailable("zzz", []string{"nodeID"})
+	q.SetAvailable("aaa", []string{"nodeID"})
+
+	q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
+	q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
+	b, _ := q.Get("nodeID")
+	if b.name != "aaa" {
+		t.Errorf("Incorrectly sorted get: %+v", b)
+	}
+
+	q = NewFileQueue()
+	q.SetAvailable("zzz", []string{"nodeID"})
+	q.SetAvailable("aaa", []string{"nodeID"})
+
+	q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
+	b, _ = q.Get("nodeID") // Start on zzzz
+	if b.name != "zzz" {
+		t.Errorf("Incorrectly sorted get: %+v", b)
+	}
+	q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
+	b, _ = q.Get("nodeID")
+	if b.name != "zzz" {
+		// Continue rather than starting a new file
+		t.Errorf("Incorrectly sorted get: %+v", b)
+	}
+}
+
+func TestFileQueueLen(t *testing.T) {
+	q := NewFileQueue()
+	q.Add("foo", nil, nil)
+	q.Add("bar", nil, nil)
+
+	if l := q.Len(); l != 2 {
+		t.Errorf("Incorrect len %d != 2 after adds", l)
+	}
+}
+
+func TestFileQueueGet(t *testing.T) {
+	q := NewFileQueue()
+	q.SetAvailable("foo", []string{"nodeID"})
+	q.SetAvailable("bar", []string{"nodeID"})
+
+	q.Add("foo", []Block{
+		{Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")},
+		{Offset: 128, Size: 128, Hash: []byte("some other foo hash bytes")},
+		{Offset: 256, Size: 128, Hash: []byte("more foo hash bytes")},
+	}, nil)
+	q.Add("bar", []Block{
+		{Offset: 0, Size: 128, Hash: []byte("some bar hash bytes")},
+		{Offset: 128, Size: 128, Hash: []byte("some other bar hash bytes")},
+	}, nil)
+
+	// First get should return the first block of the first file
+
+	expected := queuedBlock{
+		name: "bar",
+		block: Block{
+			Offset: 0,
+			Size:   128,
+			Hash:   []byte("some bar hash bytes"),
+		},
+	}
+	actual, ok := q.Get("nodeID")
+
+	if !ok {
+		t.Error("Unexpected non-OK Get()")
+	}
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("Incorrect block returned (first)\n  E: %+v\n  A: %+v", expected, actual)
+	}
+
+	// Second get should return the next block of the first file
+
+	expected = queuedBlock{
+		name: "bar",
+		block: Block{
+			Offset: 128,
+			Size:   128,
+			Hash:   []byte("some other bar hash bytes"),
+		},
+		index: 1,
+	}
+	actual, ok = q.Get("nodeID")
+
+	if !ok {
+		t.Error("Unexpected non-OK Get()")
+	}
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("Incorrect block returned (second)\n  E: %+v\n  A: %+v", expected, actual)
+	}
+
+	// Third get should return the first block of the second file
+
+	expected = queuedBlock{
+		name: "foo",
+		block: Block{
+			Offset: 0,
+			Size:   128,
+			Hash:   []byte("some foo hash bytes"),
+		},
+	}
+	actual, ok = q.Get("nodeID")
+
+	if !ok {
+		t.Error("Unexpected non-OK Get()")
+	}
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("Incorrect block returned (third)\n  E: %+v\n  A: %+v", expected, actual)
+	}
+}
+
+/*
+func TestFileQueueDone(t *testing.T) {
+	ch := make(chan content)
+	var recv sync.WaitGroup
+	recv.Add(1)
+	go func() {
+		content := <-ch
+		if bytes.Compare(content.data, []byte("first block bytes")) != 0 {
+			t.Error("Incorrect data in first content block")
+		}
+
+		content = <-ch
+		if bytes.Compare(content.data, []byte("second block bytes")) != 0 {
+			t.Error("Incorrect data in second content block")
+		}
+
+		_, ok := <-ch
+		if ok {
+			t.Error("Content channel not closed")
+		}
+
+		recv.Done()
+	}()
+
+	q := FileQueue{resolver: fakeResolver{}}
+	q.Add("foo", []Block{
+		{Offset: 0, Length: 128, Hash: []byte("some foo hash bytes")},
+		{Offset: 128, Length: 128, Hash: []byte("some other foo hash bytes")},
+	}, ch)
+
+	b0, _ := q.Get("nodeID")
+	b1, _ := q.Get("nodeID")
+
+	q.Done(b0.name, b0.block.Offset, []byte("first block bytes"))
+	q.Done(b1.name, b1.block.Offset, []byte("second block bytes"))
+
+	recv.Wait()
+
+	// Queue should now have one file less
+
+	if l := q.Len(); l != 0 {
+		t.Error("Queue not empty")
+	}
+
+	_, ok := q.Get("nodeID")
+	if ok {
+		t.Error("Unexpected OK Get()")
+	}
+}
+*/
+
+func TestFileQueueGetNodeIDs(t *testing.T) {
+	q := NewFileQueue()
+	q.SetAvailable("a-foo", []string{"nodeID", "a"})
+	q.SetAvailable("b-bar", []string{"nodeID", "b"})
+
+	q.Add("a-foo", []Block{
+		{Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")},
+		{Offset: 128, Size: 128, Hash: []byte("some other foo hash bytes")},
+		{Offset: 256, Size: 128, Hash: []byte("more foo hash bytes")},
+	}, nil)
+	q.Add("b-bar", []Block{
+		{Offset: 0, Size: 128, Hash: []byte("some bar hash bytes")},
+		{Offset: 128, Size: 128, Hash: []byte("some other bar hash bytes")},
+	}, nil)
+
+	expected := queuedBlock{
+		name: "b-bar",
+		block: Block{
+			Offset: 0,
+			Size:   128,
+			Hash:   []byte("some bar hash bytes"),
+		},
+	}
+	actual, ok := q.Get("b")
+	if !ok {
+		t.Error("Unexpected non-OK Get()")
+	}
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("Incorrect block returned\n  E: %+v\n  A: %+v", expected, actual)
+	}
+
+	expected = queuedBlock{
+		name: "a-foo",
+		block: Block{
+			Offset: 0,
+			Size:   128,
+			Hash:   []byte("some foo hash bytes"),
+		},
+	}
+	actual, ok = q.Get("a")
+	if !ok {
+		t.Error("Unexpected non-OK Get()")
+	}
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("Incorrect block returned\n  E: %+v\n  A: %+v", expected, actual)
+	}
+
+	expected = queuedBlock{
+		name: "a-foo",
+		block: Block{
+			Offset: 128,
+			Size:   128,
+			Hash:   []byte("some other foo hash bytes"),
+		},
+		index: 1,
+	}
+	actual, ok = q.Get("nodeID")
+	if !ok {
+		t.Error("Unexpected non-OK Get()")
+	}
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("Incorrect block returned\n  E: %+v\n  A: %+v", expected, actual)
+	}
+}
+
+func TestFileQueueThreadHandling(t *testing.T) {
+	// This should pass with go test -race
+
+	const n = 100
+	var total int
+	var blocks []Block
+	for i := 1; i <= n; i++ {
+		blocks = append(blocks, Block{Offset: int64(i), Size: 1})
+		total += i
+	}
+
+	q := NewFileQueue()
+	q.Add("foo", blocks, nil)
+	q.SetAvailable("foo", []string{"nodeID"})
+
+	var start = make(chan bool)
+	var gotTot uint32
+	var wg sync.WaitGroup
+	wg.Add(n)
+	for i := 1; i <= n; i++ {
+		go func() {
+			<-start
+			b, _ := q.Get("nodeID")
+			atomic.AddUint32(&gotTot, uint32(b.block.Offset))
+			wg.Done()
+		}()
+	}
+
+	close(start)
+	wg.Wait()
+	if int(gotTot) != total {
+		t.Errorf("Total mismatch; %d != %d", gotTot, total)
+	}
+}
+
+func TestDeleteAt(t *testing.T) {
+	q := FileQueue{}
+
+	for i := 0; i < 4; i++ {
+		q.files = queuedFileList{{name: "a"}, {name: "b"}, {name: "c"}, {name: "d"}}
+		q.deleteAt(i)
+		if l := len(q.files); l != 3 {
+			t.Fatalf("deleteAt(%d) failed; %d != 3", i, l)
+		}
+	}
+
+	q.files = queuedFileList{{name: "a"}}
+	q.deleteAt(0)
+	if l := len(q.files); l != 0 {
+		t.Fatalf("deleteAt(only) failed; %d != 0", l)
+	}
+}

+ 172 - 0
cmd/syncthing/gui.go

@@ -0,0 +1,172 @@
+package main
+
+import (
+	"encoding/json"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"runtime"
+	"sync"
+	"time"
+
+	"github.com/codegangsta/martini"
+)
+
+type guiError struct {
+	Time  time.Time
+	Error string
+}
+
+var (
+	configInSync = true
+	guiErrors    = []guiError{}
+	guiErrorsMut sync.Mutex
+)
+
+func startGUI(addr string, m *Model) {
+	router := martini.NewRouter()
+	router.Get("/", getRoot)
+	router.Get("/rest/version", restGetVersion)
+	router.Get("/rest/model", restGetModel)
+	router.Get("/rest/connections", restGetConnections)
+	router.Get("/rest/config", restGetConfig)
+	router.Get("/rest/config/sync", restGetConfigInSync)
+	router.Get("/rest/need", restGetNeed)
+	router.Get("/rest/system", restGetSystem)
+	router.Get("/rest/errors", restGetErrors)
+
+	router.Post("/rest/config", restPostConfig)
+	router.Post("/rest/restart", restPostRestart)
+	router.Post("/rest/error", restPostError)
+
+	go func() {
+		mr := martini.New()
+		mr.Use(embeddedStatic())
+		mr.Use(martini.Recovery())
+		mr.Action(router.Handle)
+		mr.Map(m)
+		err := http.ListenAndServe(addr, mr)
+		if err != nil {
+			warnln("GUI not possible:", err)
+		}
+	}()
+}
+
+func getRoot(w http.ResponseWriter, r *http.Request) {
+	http.Redirect(w, r, "/index.html", 302)
+}
+
+func restGetVersion() string {
+	return Version
+}
+
+func restGetModel(m *Model, w http.ResponseWriter) {
+	var res = make(map[string]interface{})
+
+	globalFiles, globalDeleted, globalBytes := m.GlobalSize()
+	res["globalFiles"], res["globalDeleted"], res["globalBytes"] = globalFiles, globalDeleted, globalBytes
+
+	localFiles, localDeleted, localBytes := m.LocalSize()
+	res["localFiles"], res["localDeleted"], res["localBytes"] = localFiles, localDeleted, localBytes
+
+	inSyncFiles, inSyncBytes := m.InSyncSize()
+	res["inSyncFiles"], res["inSyncBytes"] = inSyncFiles, inSyncBytes
+
+	files, total := m.NeedFiles()
+	res["needFiles"], res["needBytes"] = len(files), total
+
+	w.Header().Set("Content-Type", "application/json")
+	json.NewEncoder(w).Encode(res)
+}
+
+func restGetConnections(m *Model, w http.ResponseWriter) {
+	var res = m.ConnectionStats()
+	w.Header().Set("Content-Type", "application/json")
+	json.NewEncoder(w).Encode(res)
+}
+
+func restGetConfig(w http.ResponseWriter) {
+	json.NewEncoder(w).Encode(cfg)
+}
+
+func restPostConfig(req *http.Request) {
+	err := json.NewDecoder(req.Body).Decode(&cfg)
+	if err != nil {
+		log.Println(err)
+	} else {
+		saveConfig()
+		configInSync = false
+	}
+}
+
+func restGetConfigInSync(w http.ResponseWriter) {
+	json.NewEncoder(w).Encode(map[string]bool{"configInSync": configInSync})
+}
+
+func restPostRestart(req *http.Request) {
+	restart()
+}
+
+type guiFile File
+
+func (f guiFile) MarshalJSON() ([]byte, error) {
+	type t struct {
+		Name string
+		Size int64
+	}
+	return json.Marshal(t{
+		Name: f.Name,
+		Size: File(f).Size,
+	})
+}
+
+func restGetNeed(m *Model, w http.ResponseWriter) {
+	files, _ := m.NeedFiles()
+	gfs := make([]guiFile, len(files))
+	for i, f := range files {
+		gfs[i] = guiFile(f)
+	}
+	w.Header().Set("Content-Type", "application/json")
+	json.NewEncoder(w).Encode(gfs)
+}
+
+var cpuUsagePercent float64
+var cpuUsageLock sync.RWMutex
+
+func restGetSystem(w http.ResponseWriter) {
+	var m runtime.MemStats
+	runtime.ReadMemStats(&m)
+
+	res := make(map[string]interface{})
+	res["myID"] = myID
+	res["goroutines"] = runtime.NumGoroutine()
+	res["alloc"] = m.Alloc
+	res["sys"] = m.Sys
+	cpuUsageLock.RLock()
+	res["cpuPercent"] = cpuUsagePercent
+	cpuUsageLock.RUnlock()
+
+	w.Header().Set("Content-Type", "application/json")
+	json.NewEncoder(w).Encode(res)
+}
+
+func restGetErrors(w http.ResponseWriter) {
+	guiErrorsMut.Lock()
+	json.NewEncoder(w).Encode(guiErrors)
+	guiErrorsMut.Unlock()
+}
+
+func restPostError(req *http.Request) {
+	bs, _ := ioutil.ReadAll(req.Body)
+	req.Body.Close()
+	showGuiError(string(bs))
+}
+
+func showGuiError(err string) {
+	guiErrorsMut.Lock()
+	guiErrors = append(guiErrors, guiError{time.Now(), err})
+	if len(guiErrors) > 5 {
+		guiErrors = guiErrors[len(guiErrors)-5:]
+	}
+	guiErrorsMut.Unlock()
+}

+ 9 - 0
cmd/syncthing/gui_development.go

@@ -0,0 +1,9 @@
+//+build guidev
+
+package main
+
+import "github.com/codegangsta/martini"
+
+func embeddedStatic() interface{} {
+	return martini.Static("gui")
+}

+ 40 - 0
cmd/syncthing/gui_embedded.go

@@ -0,0 +1,40 @@
+//+build !guidev
+
+package main
+
+import (
+	"fmt"
+	"log"
+	"mime"
+	"net/http"
+	"path/filepath"
+	"time"
+
+	"github.com/calmh/syncthing/auto"
+)
+
+func embeddedStatic() interface{} {
+	var modt = time.Now().UTC().Format(http.TimeFormat)
+
+	return func(res http.ResponseWriter, req *http.Request, log *log.Logger) {
+		file := req.URL.Path
+
+		if file[0] == '/' {
+			file = file[1:]
+		}
+
+		bs, ok := auto.Assets[file]
+		if !ok {
+			return
+		}
+
+		mtype := mime.TypeByExtension(filepath.Ext(req.URL.Path))
+		if len(mtype) != 0 {
+			res.Header().Set("Content-Type", mtype)
+		}
+		res.Header().Set("Content-Size", fmt.Sprintf("%d", len(bs)))
+		res.Header().Set("Last-Modified", modt)
+
+		res.Write(bs)
+	}
+}

+ 31 - 0
cmd/syncthing/gui_unix.go

@@ -0,0 +1,31 @@
+//+build !windows,!solaris
+
+package main
+
+import (
+	"syscall"
+	"time"
+)
+
+func init() {
+	go trackCPUUsage()
+}
+
+func trackCPUUsage() {
+	var prevUsage int64
+	var prevTime = time.Now().UnixNano()
+	var rusage syscall.Rusage
+	for {
+		time.Sleep(10 * time.Second)
+		syscall.Getrusage(syscall.RUSAGE_SELF, &rusage)
+		curTime := time.Now().UnixNano()
+		timeDiff := curTime - prevTime
+		curUsage := rusage.Utime.Nano() + rusage.Stime.Nano()
+		usageDiff := curUsage - prevUsage
+		cpuUsageLock.Lock()
+		cpuUsagePercent = 100 * float64(usageDiff) / float64(timeDiff)
+		cpuUsageLock.Unlock()
+		prevTime = curTime
+		prevUsage = curUsage
+	}
+}

+ 43 - 0
cmd/syncthing/locktrace.go

@@ -0,0 +1,43 @@
+//+build locktrace
+
+package main
+
+import (
+	"log"
+	"path"
+	"runtime"
+	"time"
+)
+
+var (
+	lockTime time.Time
+)
+
+func (m *Model) Lock() {
+	_, file, line, _ := runtime.Caller(1)
+	log.Printf("%s:%d: Lock()...", path.Base(file), line)
+	blockTime := time.Now()
+	m.RWMutex.Lock()
+	lockTime = time.Now()
+	log.Printf("%s:%d: ...Lock() [%.04f ms]", path.Base(file), line, time.Since(blockTime).Seconds()*1000)
+}
+
+func (m *Model) Unlock() {
+	_, file, line, _ := runtime.Caller(1)
+	m.RWMutex.Unlock()
+	log.Printf("%s:%d: Unlock() [%.04f ms]", path.Base(file), line, time.Since(lockTime).Seconds()*1000)
+}
+
+func (m *Model) RLock() {
+	_, file, line, _ := runtime.Caller(1)
+	log.Printf("%s:%d: RLock()...", path.Base(file), line)
+	blockTime := time.Now()
+	m.RWMutex.RLock()
+	log.Printf("%s:%d: ...RLock() [%.04f ms]", path.Base(file), line, time.Since(blockTime).Seconds()*1000)
+}
+
+func (m *Model) RUnlock() {
+	_, file, line, _ := runtime.Caller(1)
+	m.RWMutex.RUnlock()
+	log.Printf("%s:%d: RUnlock()", path.Base(file), line)
+}

+ 74 - 0
cmd/syncthing/logger.go

@@ -0,0 +1,74 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"os"
+)
+
+var logger *log.Logger
+
+func init() {
+	log.SetOutput(os.Stderr)
+	logger = log.New(os.Stderr, "", log.Flags())
+}
+
+func debugln(vals ...interface{}) {
+	s := fmt.Sprintln(vals...)
+	logger.Output(2, "DEBUG: "+s)
+}
+
+func debugf(format string, vals ...interface{}) {
+	s := fmt.Sprintf(format, vals...)
+	logger.Output(2, "DEBUG: "+s)
+}
+
+func infoln(vals ...interface{}) {
+	s := fmt.Sprintln(vals...)
+	logger.Output(2, "INFO: "+s)
+}
+
+func infof(format string, vals ...interface{}) {
+	s := fmt.Sprintf(format, vals...)
+	logger.Output(2, "INFO: "+s)
+}
+
+func okln(vals ...interface{}) {
+	s := fmt.Sprintln(vals...)
+	logger.Output(2, "OK: "+s)
+}
+
+func okf(format string, vals ...interface{}) {
+	s := fmt.Sprintf(format, vals...)
+	logger.Output(2, "OK: "+s)
+}
+
+func warnln(vals ...interface{}) {
+	s := fmt.Sprintln(vals...)
+	showGuiError(s)
+	logger.Output(2, "WARNING: "+s)
+}
+
+func warnf(format string, vals ...interface{}) {
+	s := fmt.Sprintf(format, vals...)
+	showGuiError(s)
+	logger.Output(2, "WARNING: "+s)
+}
+
+func fatalln(vals ...interface{}) {
+	s := fmt.Sprintln(vals...)
+	logger.Output(2, "FATAL: "+s)
+	os.Exit(3)
+}
+
+func fatalf(format string, vals ...interface{}) {
+	s := fmt.Sprintf(format, vals...)
+	logger.Output(2, "FATAL: "+s)
+	os.Exit(3)
+}
+
+func fatalErr(err error) {
+	if err != nil {
+		fatalf(err.Error())
+	}
+}

+ 569 - 0
cmd/syncthing/main.go

@@ -0,0 +1,569 @@
+package main
+
+import (
+	"compress/gzip"
+	"crypto/tls"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"net/http"
+	_ "net/http/pprof"
+	"os"
+	"os/exec"
+	"path"
+	"runtime"
+	"runtime/debug"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/calmh/ini"
+	"github.com/calmh/syncthing/discover"
+	"github.com/calmh/syncthing/protocol"
+)
+
+var cfg Configuration
+var Version = "unknown-dev"
+
+var (
+	myID string
+)
+
+var (
+	showVersion  bool
+	confDir      string
+	trace        string
+	profiler     string
+	verbose      bool
+	startupDelay int
+)
+
+func main() {
+	flag.StringVar(&confDir, "home", "~/.syncthing", "Set configuration directory")
+	flag.StringVar(&trace, "debug.trace", "", "(connect,net,idx,file,pull)")
+	flag.StringVar(&profiler, "debug.profiler", "", "(addr)")
+	flag.BoolVar(&showVersion, "version", false, "Show version")
+	flag.BoolVar(&verbose, "v", false, "Be more verbose")
+	flag.IntVar(&startupDelay, "delay", 0, "Startup delay (s)")
+	flag.Usage = usageFor(flag.CommandLine, "syncthing [options]")
+	flag.Parse()
+
+	if startupDelay > 0 {
+		time.Sleep(time.Duration(startupDelay) * time.Second)
+	}
+
+	if showVersion {
+		fmt.Println(Version)
+		os.Exit(0)
+	}
+
+	if len(os.Getenv("GOGC")) == 0 {
+		debug.SetGCPercent(25)
+	}
+
+	if len(os.Getenv("GOMAXPROCS")) == 0 {
+		runtime.GOMAXPROCS(runtime.NumCPU())
+	}
+
+	if len(trace) > 0 {
+		log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
+		logger.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
+	}
+	confDir = expandTilde(confDir)
+
+	// Ensure that our home directory exists and that we have a certificate and key.
+
+	ensureDir(confDir, 0700)
+	cert, err := loadCert(confDir)
+	if err != nil {
+		newCertificate(confDir)
+		cert, err = loadCert(confDir)
+		fatalErr(err)
+	}
+
+	myID = string(certID(cert.Certificate[0]))
+	log.SetPrefix("[" + myID[0:5] + "] ")
+	logger.SetPrefix("[" + myID[0:5] + "] ")
+
+	infoln("Version", Version)
+	infoln("My ID:", myID)
+
+	// Prepare to be able to save configuration
+
+	cfgFile := path.Join(confDir, "config.xml")
+	go saveConfigLoop(cfgFile)
+
+	// Load the configuration file, if it exists.
+	// If it does not, create a template.
+
+	cf, err := os.Open(cfgFile)
+	if err == nil {
+		// Read config.xml
+		cfg, err = readConfigXML(cf)
+		if err != nil {
+			fatalln(err)
+		}
+		cf.Close()
+	} else {
+		// No config.xml, let's try the old syncthing.ini
+		iniFile := path.Join(confDir, "syncthing.ini")
+		cf, err := os.Open(iniFile)
+		if err == nil {
+			infoln("Migrating syncthing.ini to config.xml")
+			iniCfg := ini.Parse(cf)
+			cf.Close()
+			os.Rename(iniFile, path.Join(confDir, "migrated_syncthing.ini"))
+
+			cfg, _ = readConfigXML(nil)
+			cfg.Repositories = []RepositoryConfiguration{
+				{Directory: iniCfg.Get("repository", "dir")},
+			}
+			readConfigINI(iniCfg.OptionMap("settings"), &cfg.Options)
+			for name, addrs := range iniCfg.OptionMap("nodes") {
+				n := NodeConfiguration{
+					NodeID:    name,
+					Addresses: strings.Fields(addrs),
+				}
+				cfg.Repositories[0].Nodes = append(cfg.Repositories[0].Nodes, n)
+			}
+
+			saveConfig()
+		}
+	}
+
+	if len(cfg.Repositories) == 0 {
+		infoln("No config file; starting with empty defaults")
+
+		cfg, err = readConfigXML(nil)
+		cfg.Repositories = []RepositoryConfiguration{
+			{
+				Directory: "~/Sync",
+				Nodes: []NodeConfiguration{
+					{NodeID: myID, Addresses: []string{"dynamic"}},
+				},
+			},
+		}
+
+		saveConfig()
+		infof("Edit %s to taste or use the GUI\n", cfgFile)
+	}
+
+	// Make sure the local node is in the node list.
+	cfg.Repositories[0].Nodes = cleanNodeList(cfg.Repositories[0].Nodes, myID)
+
+	var dir = expandTilde(cfg.Repositories[0].Directory)
+
+	if len(profiler) > 0 {
+		go func() {
+			err := http.ListenAndServe(profiler, nil)
+			if err != nil {
+				warnln(err)
+			}
+		}()
+	}
+
+	// The TLS configuration is used for both the listening socket and outgoing
+	// connections.
+
+	tlsCfg := &tls.Config{
+		Certificates:           []tls.Certificate{cert},
+		NextProtos:             []string{"bep/1.0"},
+		ServerName:             myID,
+		ClientAuth:             tls.RequestClientCert,
+		SessionTicketsDisabled: true,
+		InsecureSkipVerify:     true,
+		MinVersion:             tls.VersionTLS12,
+	}
+
+	ensureDir(dir, -1)
+	m := NewModel(dir, cfg.Options.MaxChangeKbps*1000)
+	for _, t := range strings.Split(trace, ",") {
+		m.Trace(t)
+	}
+	if cfg.Options.MaxSendKbps > 0 {
+		m.LimitRate(cfg.Options.MaxSendKbps)
+	}
+
+	// GUI
+	if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" {
+		addr, err := net.ResolveTCPAddr("tcp", cfg.Options.GUIAddress)
+		if err != nil {
+			warnf("Cannot start GUI on %q: %v", cfg.Options.GUIAddress, err)
+		} else {
+			var hostOpen, hostShow string
+			switch {
+			case addr.IP == nil:
+				hostOpen = "localhost"
+				hostShow = "0.0.0.0"
+			case addr.IP.IsUnspecified():
+				hostOpen = "localhost"
+				hostShow = addr.IP.String()
+			default:
+				hostOpen = addr.IP.String()
+				hostShow = hostOpen
+			}
+
+			infof("Starting web GUI on http://%s:%d/", hostShow, addr.Port)
+			startGUI(cfg.Options.GUIAddress, m)
+			openURL(fmt.Sprintf("http://%s:%d", hostOpen, addr.Port))
+		}
+	}
+
+	// Walk the repository and update the local model before establishing any
+	// connections to other nodes.
+
+	if verbose {
+		infoln("Populating repository index")
+	}
+	loadIndex(m)
+	updateLocalModel(m)
+
+	connOpts := map[string]string{
+		"clientId":      "syncthing",
+		"clientVersion": Version,
+		"clusterHash":   clusterHash(cfg.Repositories[0].Nodes),
+	}
+
+	// Routine to listen for incoming connections
+	if verbose {
+		infoln("Listening for incoming connections")
+	}
+	for _, addr := range cfg.Options.ListenAddress {
+		go listen(myID, addr, m, tlsCfg, connOpts)
+	}
+
+	// Routine to connect out to configured nodes
+	if verbose {
+		infoln("Attempting to connect to other nodes")
+	}
+	disc := discovery(cfg.Options.ListenAddress[0])
+	go connect(myID, disc, m, tlsCfg, connOpts)
+
+	// Routine to pull blocks from other nodes to synchronize the local
+	// repository. Does not run when we are in read only (publish only) mode.
+	if !cfg.Options.ReadOnly {
+		if verbose {
+			if cfg.Options.AllowDelete {
+				infoln("Deletes from peer nodes are allowed")
+			} else {
+				infoln("Deletes from peer nodes will be ignored")
+			}
+			okln("Ready to synchronize (read-write)")
+		}
+		m.StartRW(cfg.Options.AllowDelete, cfg.Options.ParallelRequests)
+	} else if verbose {
+		okln("Ready to synchronize (read only; no external updates accepted)")
+	}
+
+	// Periodically scan the repository and update the local
+	// XXX: Should use some fsnotify mechanism.
+	go func() {
+		td := time.Duration(cfg.Options.RescanIntervalS) * time.Second
+		for {
+			time.Sleep(td)
+			if m.LocalAge() > (td / 2).Seconds() {
+				updateLocalModel(m)
+			}
+		}
+	}()
+
+	if verbose {
+		// Periodically print statistics
+		go printStatsLoop(m)
+	}
+
+	select {}
+}
+
+func restart() {
+	infoln("Restarting")
+	args := os.Args
+	doAppend := true
+	for _, arg := range args {
+		if arg == "-delay" {
+			doAppend = false
+			break
+		}
+	}
+	if doAppend {
+		args = append(args, "-delay", "2")
+	}
+	pgm, err := exec.LookPath(os.Args[0])
+	if err != nil {
+		warnln(err)
+		return
+	}
+	proc, err := os.StartProcess(pgm, args, &os.ProcAttr{
+		Env:   os.Environ(),
+		Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
+	})
+	if err != nil {
+		fatalln(err)
+	}
+	proc.Release()
+	os.Exit(0)
+}
+
+var saveConfigCh = make(chan struct{})
+
+func saveConfigLoop(cfgFile string) {
+	for _ = range saveConfigCh {
+		fd, err := os.Create(cfgFile + ".tmp")
+		if err != nil {
+			warnln(err)
+			continue
+		}
+
+		err = writeConfigXML(fd, cfg)
+		if err != nil {
+			warnln(err)
+			fd.Close()
+			continue
+		}
+
+		err = fd.Close()
+		if err != nil {
+			warnln(err)
+			continue
+		}
+
+		err = os.Rename(cfgFile+".tmp", cfgFile)
+		if err != nil {
+			warnln(err)
+		}
+	}
+}
+
+func saveConfig() {
+	saveConfigCh <- struct{}{}
+}
+
+func printStatsLoop(m *Model) {
+	var lastUpdated int64
+	var lastStats = make(map[string]ConnectionInfo)
+
+	for {
+		time.Sleep(60 * time.Second)
+
+		for node, stats := range m.ConnectionStats() {
+			secs := time.Since(lastStats[node].At).Seconds()
+			inbps := 8 * int(float64(stats.InBytesTotal-lastStats[node].InBytesTotal)/secs)
+			outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs)
+
+			if inbps+outbps > 0 {
+				infof("%s: %sb/s in, %sb/s out", node[0:5], MetricPrefix(int64(inbps)), MetricPrefix(int64(outbps)))
+			}
+
+			lastStats[node] = stats
+		}
+
+		if lu := m.Generation(); lu > lastUpdated {
+			lastUpdated = lu
+			files, _, bytes := m.GlobalSize()
+			infof("%6d files, %9sB in cluster", files, BinaryPrefix(bytes))
+			files, _, bytes = m.LocalSize()
+			infof("%6d files, %9sB in local repo", files, BinaryPrefix(bytes))
+			needFiles, bytes := m.NeedFiles()
+			infof("%6d files, %9sB to synchronize", len(needFiles), BinaryPrefix(bytes))
+		}
+	}
+}
+
+func listen(myID string, addr string, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
+	if strings.Contains(trace, "connect") {
+		debugln("NET: Listening on", addr)
+	}
+	l, err := tls.Listen("tcp", addr, tlsCfg)
+	fatalErr(err)
+
+listen:
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			warnln(err)
+			continue
+		}
+
+		if strings.Contains(trace, "connect") {
+			debugln("NET: Connect from", conn.RemoteAddr())
+		}
+
+		tc := conn.(*tls.Conn)
+		err = tc.Handshake()
+		if err != nil {
+			warnln(err)
+			tc.Close()
+			continue
+		}
+
+		remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw)
+
+		if remoteID == myID {
+			warnf("Connect from myself (%s) - should not happen", remoteID)
+			conn.Close()
+			continue
+		}
+
+		if m.ConnectedTo(remoteID) {
+			warnf("Connect from connected node (%s)", remoteID)
+		}
+
+		for _, nodeCfg := range cfg.Repositories[0].Nodes {
+			if nodeCfg.NodeID == remoteID {
+				protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
+				m.AddConnection(conn, protoConn)
+				continue listen
+			}
+		}
+		conn.Close()
+	}
+}
+
+func discovery(addr string) *discover.Discoverer {
+	_, portstr, err := net.SplitHostPort(addr)
+	fatalErr(err)
+	port, _ := strconv.Atoi(portstr)
+
+	if !cfg.Options.LocalAnnEnabled {
+		port = -1
+	} else if verbose {
+		infoln("Sending local discovery announcements")
+	}
+
+	if !cfg.Options.GlobalAnnEnabled {
+		cfg.Options.GlobalAnnServer = ""
+	} else if verbose {
+		infoln("Sending external discovery announcements")
+	}
+
+	disc, err := discover.NewDiscoverer(myID, port, cfg.Options.GlobalAnnServer)
+
+	if err != nil {
+		warnf("No discovery possible (%v)", err)
+	}
+
+	return disc
+}
+
+func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
+	for {
+	nextNode:
+		for _, nodeCfg := range cfg.Repositories[0].Nodes {
+			if nodeCfg.NodeID == myID {
+				continue
+			}
+			if m.ConnectedTo(nodeCfg.NodeID) {
+				continue
+			}
+			for _, addr := range nodeCfg.Addresses {
+				if addr == "dynamic" {
+					if disc != nil {
+						t := disc.Lookup(nodeCfg.NodeID)
+						if len(t) == 0 {
+							continue
+						}
+						addr = t[0] //XXX: Handle all of them
+					}
+				}
+
+				if strings.Contains(trace, "connect") {
+					debugln("NET: Dial", nodeCfg.NodeID, addr)
+				}
+				conn, err := tls.Dial("tcp", addr, tlsCfg)
+				if err != nil {
+					if strings.Contains(trace, "connect") {
+						debugln("NET:", err)
+					}
+					continue
+				}
+
+				remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
+				if remoteID != nodeCfg.NodeID {
+					warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID)
+					conn.Close()
+					continue
+				}
+
+				protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
+				m.AddConnection(conn, protoConn)
+				continue nextNode
+			}
+		}
+
+		time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second)
+	}
+}
+
+func updateLocalModel(m *Model) {
+	files, _ := m.Walk(cfg.Options.FollowSymlinks)
+	m.ReplaceLocal(files)
+	saveIndex(m)
+}
+
+func saveIndex(m *Model) {
+	name := m.RepoID() + ".idx.gz"
+	fullName := path.Join(confDir, name)
+	idxf, err := os.Create(fullName + ".tmp")
+	if err != nil {
+		return
+	}
+
+	gzw := gzip.NewWriter(idxf)
+
+	protocol.IndexMessage{
+		Repository: "local",
+		Files:      m.ProtocolIndex(),
+	}.EncodeXDR(gzw)
+	gzw.Close()
+	idxf.Close()
+	os.Rename(fullName+".tmp", fullName)
+}
+
+func loadIndex(m *Model) {
+	name := m.RepoID() + ".idx.gz"
+	idxf, err := os.Open(path.Join(confDir, name))
+	if err != nil {
+		return
+	}
+	defer idxf.Close()
+
+	gzr, err := gzip.NewReader(idxf)
+	if err != nil {
+		return
+	}
+	defer gzr.Close()
+
+	var im protocol.IndexMessage
+	err = im.DecodeXDR(gzr)
+	if err != nil || im.Repository != "local" {
+		return
+	}
+	m.SeedLocal(im.Files)
+}
+
+func ensureDir(dir string, mode int) {
+	fi, err := os.Stat(dir)
+	if os.IsNotExist(err) {
+		err := os.MkdirAll(dir, 0700)
+		fatalErr(err)
+	} else if mode >= 0 && err == nil && int(fi.Mode()&0777) != mode {
+		err := os.Chmod(dir, os.FileMode(mode))
+		fatalErr(err)
+	}
+}
+
+func expandTilde(p string) string {
+	if strings.HasPrefix(p, "~/") {
+		return strings.Replace(p, "~", getHomeDir(), 1)
+	}
+	return p
+}
+
+func getHomeDir() string {
+	home := os.Getenv("HOME")
+	if home == "" {
+		fatalln("No home directory?")
+	}
+	return home
+}

+ 914 - 0
cmd/syncthing/model.go

@@ -0,0 +1,914 @@
+package main
+
+import (
+	"crypto/sha1"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"os"
+	"path"
+	"sync"
+	"time"
+
+	"github.com/calmh/syncthing/buffers"
+	"github.com/calmh/syncthing/protocol"
+)
+
+type Model struct {
+	dir string
+
+	global    map[string]File // the latest version of each file as it exists in the cluster
+	gmut      sync.RWMutex    // protects global
+	local     map[string]File // the files we currently have locally on disk
+	lmut      sync.RWMutex    // protects local
+	remote    map[string]map[string]File
+	rmut      sync.RWMutex // protects remote
+	protoConn map[string]Connection
+	rawConn   map[string]io.Closer
+	pmut      sync.RWMutex // protects protoConn and rawConn
+
+	// Queue for files to fetch. fq can call back into the model, so we must ensure
+	// to hold no locks when calling methods on fq.
+	fq *FileQueue
+	dq chan File // queue for files to delete
+
+	updatedLocal        int64 // timestamp of last update to local
+	updateGlobal        int64 // timestamp of last update to remote
+	lastIdxBcast        time.Time
+	lastIdxBcastRequest time.Time
+	umut                sync.RWMutex // provides updated* and lastIdx*
+
+	rwRunning bool
+	delete    bool
+	initmut   sync.Mutex // protects rwRunning and delete
+
+	trace map[string]bool
+
+	sup suppressor
+
+	parallelRequests int
+	limitRequestRate chan struct{}
+
+	imut sync.Mutex // protects Index
+}
+
+type Connection interface {
+	ID() string
+	Index(string, []protocol.FileInfo)
+	Request(repo, name string, offset int64, size int) ([]byte, error)
+	Statistics() protocol.Statistics
+	Option(key string) string
+}
+
+const (
+	idxBcastHoldtime = 15 * time.Second  // Wait at least this long after the last index modification
+	idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
+)
+
+var (
+	ErrNoSuchFile = errors.New("no such file")
+	ErrInvalid    = errors.New("file is invalid")
+)
+
+// NewModel creates and starts a new model. The model starts in read-only mode,
+// where it sends index information to connected peers and responds to requests
+// for file data without altering the local repository in any way.
+func NewModel(dir string, maxChangeBw int) *Model {
+	m := &Model{
+		dir:          dir,
+		global:       make(map[string]File),
+		local:        make(map[string]File),
+		remote:       make(map[string]map[string]File),
+		protoConn:    make(map[string]Connection),
+		rawConn:      make(map[string]io.Closer),
+		lastIdxBcast: time.Now(),
+		trace:        make(map[string]bool),
+		sup:          suppressor{threshold: int64(maxChangeBw)},
+		fq:           NewFileQueue(),
+		dq:           make(chan File),
+	}
+
+	go m.broadcastIndexLoop()
+	return m
+}
+
+func (m *Model) LimitRate(kbps int) {
+	m.limitRequestRate = make(chan struct{}, kbps)
+	n := kbps/10 + 1
+	go func() {
+		for {
+			time.Sleep(100 * time.Millisecond)
+			for i := 0; i < n; i++ {
+				select {
+				case m.limitRequestRate <- struct{}{}:
+				}
+			}
+		}
+	}()
+}
+
+// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
+func (m *Model) Trace(t string) {
+	m.trace[t] = true
+}
+
+// StartRW starts read/write processing on the current model. When in
+// read/write mode the model will attempt to keep in sync with the cluster by
+// pulling needed files from peer nodes.
+func (m *Model) StartRW(del bool, threads int) {
+	m.initmut.Lock()
+	defer m.initmut.Unlock()
+
+	if m.rwRunning {
+		panic("starting started model")
+	}
+
+	m.rwRunning = true
+	m.delete = del
+	m.parallelRequests = threads
+
+	go m.cleanTempFiles()
+	if del {
+		go m.deleteLoop()
+	}
+}
+
+// Generation returns an opaque integer that is guaranteed to increment on
+// every change to the local repository or global model.
+func (m *Model) Generation() int64 {
+	m.umut.RLock()
+	defer m.umut.RUnlock()
+
+	return m.updatedLocal + m.updateGlobal
+}
+
+func (m *Model) LocalAge() float64 {
+	m.umut.RLock()
+	defer m.umut.RUnlock()
+
+	return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
+}
+
+type ConnectionInfo struct {
+	protocol.Statistics
+	Address       string
+	ClientID      string
+	ClientVersion string
+	Completion    int
+}
+
+// ConnectionStats returns a map with connection statistics for each connected node.
+func (m *Model) ConnectionStats() map[string]ConnectionInfo {
+	type remoteAddrer interface {
+		RemoteAddr() net.Addr
+	}
+
+	m.gmut.RLock()
+	m.pmut.RLock()
+	m.rmut.RLock()
+
+	var tot int64
+	for _, f := range m.global {
+		tot += f.Size
+	}
+
+	var res = make(map[string]ConnectionInfo)
+	for node, conn := range m.protoConn {
+		ci := ConnectionInfo{
+			Statistics:    conn.Statistics(),
+			ClientID:      conn.Option("clientId"),
+			ClientVersion: conn.Option("clientVersion"),
+		}
+		if nc, ok := m.rawConn[node].(remoteAddrer); ok {
+			ci.Address = nc.RemoteAddr().String()
+		}
+
+		var have int64
+		for _, f := range m.remote[node] {
+			if f.Equals(m.global[f.Name]) {
+				have += f.Size
+			}
+		}
+
+		ci.Completion = int(100 * have / tot)
+
+		res[node] = ci
+	}
+
+	m.rmut.RUnlock()
+	m.pmut.RUnlock()
+	m.gmut.RUnlock()
+	return res
+}
+
+// GlobalSize returns the number of files, deleted files and total bytes for all
+// files in the global model.
+func (m *Model) GlobalSize() (files, deleted int, bytes int64) {
+	m.gmut.RLock()
+
+	for _, f := range m.global {
+		if f.Flags&protocol.FlagDeleted == 0 {
+			files++
+			bytes += f.Size
+		} else {
+			deleted++
+		}
+	}
+
+	m.gmut.RUnlock()
+	return
+}
+
+// LocalSize returns the number of files, deleted files and total bytes for all
+// files in the local repository.
+func (m *Model) LocalSize() (files, deleted int, bytes int64) {
+	m.lmut.RLock()
+
+	for _, f := range m.local {
+		if f.Flags&protocol.FlagDeleted == 0 {
+			files++
+			bytes += f.Size
+		} else {
+			deleted++
+		}
+	}
+
+	m.lmut.RUnlock()
+	return
+}
+
+// InSyncSize returns the number and total byte size of the local files that
+// are in sync with the global model.
+func (m *Model) InSyncSize() (files, bytes int64) {
+	m.gmut.RLock()
+	m.lmut.RLock()
+
+	for n, f := range m.local {
+		if gf, ok := m.global[n]; ok && f.Equals(gf) {
+			files++
+			bytes += f.Size
+		}
+	}
+
+	m.lmut.RUnlock()
+	m.gmut.RUnlock()
+	return
+}
+
+// NeedFiles returns the list of currently needed files and the total size.
+func (m *Model) NeedFiles() (files []File, bytes int64) {
+	qf := m.fq.QueuedFiles()
+
+	m.gmut.RLock()
+
+	for _, n := range qf {
+		f := m.global[n]
+		files = append(files, f)
+		bytes += f.Size
+	}
+
+	m.gmut.RUnlock()
+	return
+}
+
+// Index is called when a new node is connected and we receive their full index.
+// Implements the protocol.Model interface.
+func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
+	var files = make([]File, len(fs))
+	for i := range fs {
+		files[i] = fileFromFileInfo(fs[i])
+	}
+
+	m.imut.Lock()
+	defer m.imut.Unlock()
+
+	if m.trace["net"] {
+		debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
+	}
+
+	repo := make(map[string]File)
+	for _, f := range files {
+		m.indexUpdate(repo, f)
+	}
+
+	m.rmut.Lock()
+	m.remote[nodeID] = repo
+	m.rmut.Unlock()
+
+	m.recomputeGlobal()
+	m.recomputeNeedForFiles(files)
+}
+
+// IndexUpdate is called for incremental updates to connected nodes' indexes.
+// Implements the protocol.Model interface.
+func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
+	var files = make([]File, len(fs))
+	for i := range fs {
+		files[i] = fileFromFileInfo(fs[i])
+	}
+
+	m.imut.Lock()
+	defer m.imut.Unlock()
+
+	if m.trace["net"] {
+		debugf("NET IDXUP(in): %s: %d files", nodeID, len(files))
+	}
+
+	m.rmut.Lock()
+	repo, ok := m.remote[nodeID]
+	if !ok {
+		warnf("Index update from node %s that does not have an index", nodeID)
+		m.rmut.Unlock()
+		return
+	}
+
+	for _, f := range files {
+		m.indexUpdate(repo, f)
+	}
+	m.rmut.Unlock()
+
+	m.recomputeGlobal()
+	m.recomputeNeedForFiles(files)
+}
+
+func (m *Model) indexUpdate(repo map[string]File, f File) {
+	if m.trace["idx"] {
+		var flagComment string
+		if f.Flags&protocol.FlagDeleted != 0 {
+			flagComment = " (deleted)"
+		}
+		debugf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
+	}
+
+	if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
+		warnf("IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
+		return
+	}
+
+	repo[f.Name] = f
+}
+
+// Close removes the peer from the model and closes the underlying connection if possible.
+// Implements the protocol.Model interface.
+func (m *Model) Close(node string, err error) {
+	if m.trace["net"] {
+		debugf("NET: %s: %v", node, err)
+	}
+	if err == protocol.ErrClusterHash {
+		warnf("Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node)
+	} else if err != io.EOF {
+		warnf("Connection to %s closed: %v", node, err)
+	}
+
+	m.fq.RemoveAvailable(node)
+
+	m.pmut.Lock()
+	m.rmut.Lock()
+
+	conn, ok := m.rawConn[node]
+	if ok {
+		conn.Close()
+	}
+
+	delete(m.remote, node)
+	delete(m.protoConn, node)
+	delete(m.rawConn, node)
+
+	m.rmut.Unlock()
+	m.pmut.Unlock()
+
+	m.recomputeGlobal()
+	m.recomputeNeedForGlobal()
+}
+
+// Request returns the specified data segment by reading it from local disk.
+// Implements the protocol.Model interface.
+func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) {
+	// Verify that the requested file exists in the local and global model.
+	m.lmut.RLock()
+	lf, localOk := m.local[name]
+	m.lmut.RUnlock()
+
+	m.gmut.RLock()
+	_, globalOk := m.global[name]
+	m.gmut.RUnlock()
+
+	if !localOk || !globalOk {
+		warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size)
+		return nil, ErrNoSuchFile
+	}
+	if lf.Flags&protocol.FlagInvalid != 0 {
+		return nil, ErrInvalid
+	}
+
+	if m.trace["net"] && nodeID != "<local>" {
+		debugf("NET REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size)
+	}
+	fn := path.Join(m.dir, name)
+	fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
+	if err != nil {
+		return nil, err
+	}
+	defer fd.Close()
+
+	buf := buffers.Get(int(size))
+	_, err = fd.ReadAt(buf, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	if m.limitRequestRate != nil {
+		for s := 0; s < len(buf); s += 1024 {
+			<-m.limitRequestRate
+		}
+	}
+
+	return buf, nil
+}
+
+// ReplaceLocal replaces the local repository index with the given list of files.
+func (m *Model) ReplaceLocal(fs []File) {
+	var updated bool
+	var newLocal = make(map[string]File)
+
+	m.lmut.RLock()
+	for _, f := range fs {
+		newLocal[f.Name] = f
+		if ef := m.local[f.Name]; !ef.Equals(f) {
+			updated = true
+		}
+	}
+	m.lmut.RUnlock()
+
+	if m.markDeletedLocals(newLocal) {
+		updated = true
+	}
+
+	m.lmut.RLock()
+	if len(newLocal) != len(m.local) {
+		updated = true
+	}
+	m.lmut.RUnlock()
+
+	if updated {
+		m.lmut.Lock()
+		m.local = newLocal
+		m.lmut.Unlock()
+
+		m.recomputeGlobal()
+		m.recomputeNeedForGlobal()
+
+		m.umut.Lock()
+		m.updatedLocal = time.Now().Unix()
+		m.lastIdxBcastRequest = time.Now()
+		m.umut.Unlock()
+	}
+}
+
+// SeedLocal replaces the local repository index with the given list of files,
+// in protocol data types. Does not track deletes, should only be used to seed
+// the local index from a cache file at startup.
+func (m *Model) SeedLocal(fs []protocol.FileInfo) {
+	m.lmut.Lock()
+	m.local = make(map[string]File)
+	for _, f := range fs {
+		m.local[f.Name] = fileFromFileInfo(f)
+	}
+	m.lmut.Unlock()
+
+	m.recomputeGlobal()
+	m.recomputeNeedForGlobal()
+}
+
+// ConnectedTo returns true if we are connected to the named node.
+func (m *Model) ConnectedTo(nodeID string) bool {
+	m.pmut.RLock()
+	_, ok := m.protoConn[nodeID]
+	m.pmut.RUnlock()
+	return ok
+}
+
+// RepoID returns a unique ID representing the current repository location.
+func (m *Model) RepoID() string {
+	return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
+}
+
+// AddConnection adds a new peer connection to the model. An initial index will
+// be sent to the connected peer, thereafter index updates whenever the local
+// repository changes.
+func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
+	nodeID := protoConn.ID()
+	m.pmut.Lock()
+	m.protoConn[nodeID] = protoConn
+	m.rawConn[nodeID] = rawConn
+	m.pmut.Unlock()
+
+	go func() {
+		idx := m.ProtocolIndex()
+		protoConn.Index("default", idx)
+	}()
+
+	m.initmut.Lock()
+	rw := m.rwRunning
+	m.initmut.Unlock()
+	if !rw {
+		return
+	}
+
+	for i := 0; i < m.parallelRequests; i++ {
+		i := i
+		go func() {
+			if m.trace["pull"] {
+				debugln("PULL: Starting", nodeID, i)
+			}
+			for {
+				m.pmut.RLock()
+				if _, ok := m.protoConn[nodeID]; !ok {
+					if m.trace["pull"] {
+						debugln("PULL: Exiting", nodeID, i)
+					}
+					m.pmut.RUnlock()
+					return
+				}
+				m.pmut.RUnlock()
+
+				qb, ok := m.fq.Get(nodeID)
+				if ok {
+					if m.trace["pull"] {
+						debugln("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
+					}
+					data, _ := protoConn.Request("default", qb.name, qb.block.Offset, int(qb.block.Size))
+					m.fq.Done(qb.name, qb.block.Offset, data)
+				} else {
+					time.Sleep(1 * time.Second)
+				}
+			}
+		}()
+	}
+}
+
+// ProtocolIndex returns the current local index in protocol data types.
+// Must be called with the read lock held.
+func (m *Model) ProtocolIndex() []protocol.FileInfo {
+	var index []protocol.FileInfo
+
+	m.lmut.RLock()
+
+	for _, f := range m.local {
+		mf := fileInfoFromFile(f)
+		if m.trace["idx"] {
+			var flagComment string
+			if mf.Flags&protocol.FlagDeleted != 0 {
+				flagComment = " (deleted)"
+			}
+			debugf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
+		}
+		index = append(index, mf)
+	}
+
+	m.lmut.RUnlock()
+	return index
+}
+
+func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash []byte) ([]byte, error) {
+	m.pmut.RLock()
+	nc, ok := m.protoConn[nodeID]
+	m.pmut.RUnlock()
+
+	if !ok {
+		return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
+	}
+
+	if m.trace["net"] {
+		debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
+	}
+
+	return nc.Request("default", name, offset, size)
+}
+
+func (m *Model) broadcastIndexLoop() {
+	for {
+		m.umut.RLock()
+		bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
+		holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
+		m.umut.RUnlock()
+
+		maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
+		if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
+			idx := m.ProtocolIndex()
+
+			var indexWg sync.WaitGroup
+			indexWg.Add(len(m.protoConn))
+
+			m.umut.Lock()
+			m.lastIdxBcast = time.Now()
+			m.umut.Unlock()
+
+			m.pmut.RLock()
+			for _, node := range m.protoConn {
+				node := node
+				if m.trace["net"] {
+					debugf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
+				}
+				go func() {
+					node.Index("default", idx)
+					indexWg.Done()
+				}()
+			}
+			m.pmut.RUnlock()
+
+			indexWg.Wait()
+		}
+		time.Sleep(idxBcastHoldtime)
+	}
+}
+
+// markDeletedLocals sets the deleted flag on files that have gone missing locally.
+func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
+	// For every file in the existing local table, check if they are also
+	// present in the new local table. If they are not, check that we already
+	// had the newest version available according to the global table and if so
+	// note the file as having been deleted.
+	var updated bool
+
+	m.gmut.RLock()
+	m.lmut.RLock()
+
+	for n, f := range m.local {
+		if _, ok := newLocal[n]; !ok {
+			if gf := m.global[n]; !gf.NewerThan(f) {
+				if f.Flags&protocol.FlagDeleted == 0 {
+					f.Flags = protocol.FlagDeleted
+					f.Version++
+					f.Blocks = nil
+					updated = true
+				}
+				newLocal[n] = f
+			}
+		}
+	}
+
+	m.lmut.RUnlock()
+	m.gmut.RUnlock()
+
+	return updated
+}
+
+func (m *Model) updateLocal(f File) {
+	var updated bool
+
+	m.lmut.Lock()
+	if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
+		m.local[f.Name] = f
+		updated = true
+	}
+	m.lmut.Unlock()
+
+	if updated {
+		m.recomputeGlobal()
+		// We don't recomputeNeed here for two reasons:
+		// - a need shouldn't have arisen due to having a newer local file
+		// - recomputeNeed might call into fq.Add but we might have been called by
+		//   fq which would be a deadlock on fq
+
+		m.umut.Lock()
+		m.updatedLocal = time.Now().Unix()
+		m.lastIdxBcastRequest = time.Now()
+		m.umut.Unlock()
+	}
+}
+
+/*
+XXX: Not done, needs elegant handling of availability
+
+func (m *Model) recomputeGlobalFor(files []File) bool {
+	m.gmut.Lock()
+	defer m.gmut.Unlock()
+
+	var updated bool
+	for _, f := range files {
+		if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) {
+			m.global[f.Name] = f
+			updated = true
+			// Fix availability
+		}
+	}
+	return updated
+}
+*/
+
+func (m *Model) recomputeGlobal() {
+	var newGlobal = make(map[string]File)
+
+	m.lmut.RLock()
+	for n, f := range m.local {
+		newGlobal[n] = f
+	}
+	m.lmut.RUnlock()
+
+	var available = make(map[string][]string)
+
+	m.rmut.RLock()
+	var highestMod int64
+	for nodeID, fs := range m.remote {
+		for n, nf := range fs {
+			if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
+				newGlobal[n] = nf
+				available[n] = []string{nodeID}
+				if nf.Modified > highestMod {
+					highestMod = nf.Modified
+				}
+			} else if lf.Equals(nf) {
+				available[n] = append(available[n], nodeID)
+			}
+		}
+	}
+	m.rmut.RUnlock()
+
+	for f, ns := range available {
+		m.fq.SetAvailable(f, ns)
+	}
+
+	// Figure out if anything actually changed
+
+	m.gmut.RLock()
+	var updated bool
+	if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
+		updated = true
+	} else {
+		for n, f0 := range newGlobal {
+			if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
+				updated = true
+				break
+			}
+		}
+	}
+	m.gmut.RUnlock()
+
+	if updated {
+		m.gmut.Lock()
+		m.umut.Lock()
+		m.global = newGlobal
+		m.updateGlobal = time.Now().Unix()
+		m.umut.Unlock()
+		m.gmut.Unlock()
+	}
+}
+
+type addOrder struct {
+	n      string
+	remote []Block
+	fm     *fileMonitor
+}
+
+func (m *Model) recomputeNeedForGlobal() {
+	var toDelete []File
+	var toAdd []addOrder
+
+	m.gmut.RLock()
+
+	for _, gf := range m.global {
+		toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
+	}
+
+	m.gmut.RUnlock()
+
+	for _, ao := range toAdd {
+		m.fq.Add(ao.n, ao.remote, ao.fm)
+	}
+	for _, gf := range toDelete {
+		m.dq <- gf
+	}
+}
+
+func (m *Model) recomputeNeedForFiles(files []File) {
+	var toDelete []File
+	var toAdd []addOrder
+
+	m.gmut.RLock()
+
+	for _, gf := range files {
+		toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
+	}
+
+	m.gmut.RUnlock()
+
+	for _, ao := range toAdd {
+		m.fq.Add(ao.n, ao.remote, ao.fm)
+	}
+	for _, gf := range toDelete {
+		m.dq <- gf
+	}
+}
+
+func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
+	m.lmut.RLock()
+	lf, ok := m.local[gf.Name]
+	m.lmut.RUnlock()
+
+	if !ok || gf.NewerThan(lf) {
+		if gf.Flags&protocol.FlagInvalid != 0 {
+			// Never attempt to sync invalid files
+			return toAdd, toDelete
+		}
+		if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
+			// Don't want to delete files, so forget this need
+			return toAdd, toDelete
+		}
+		if gf.Flags&protocol.FlagDeleted != 0 && !ok {
+			// Don't have the file, so don't need to delete it
+			return toAdd, toDelete
+		}
+		if m.trace["need"] {
+			debugf("NEED: lf:%v gf:%v", lf, gf)
+		}
+
+		if gf.Flags&protocol.FlagDeleted != 0 {
+			toDelete = append(toDelete, gf)
+		} else {
+			local, remote := BlockDiff(lf.Blocks, gf.Blocks)
+			fm := fileMonitor{
+				name:        gf.Name,
+				path:        path.Clean(path.Join(m.dir, gf.Name)),
+				global:      gf,
+				model:       m,
+				localBlocks: local,
+			}
+			toAdd = append(toAdd, addOrder{gf.Name, remote, &fm})
+		}
+	}
+
+	return toAdd, toDelete
+}
+
+func (m *Model) WhoHas(name string) []string {
+	var remote []string
+
+	m.gmut.RLock()
+	m.rmut.RLock()
+
+	gf := m.global[name]
+	for node, files := range m.remote {
+		if file, ok := files[name]; ok && file.Equals(gf) {
+			remote = append(remote, node)
+		}
+	}
+
+	m.rmut.RUnlock()
+	m.gmut.RUnlock()
+	return remote
+}
+
+func (m *Model) deleteLoop() {
+	for file := range m.dq {
+		if m.trace["file"] {
+			debugln("FILE: Delete", file.Name)
+		}
+		path := path.Clean(path.Join(m.dir, file.Name))
+		err := os.Remove(path)
+		if err != nil {
+			warnf("%s: %v", file.Name, err)
+		}
+
+		m.updateLocal(file)
+	}
+}
+
+func fileFromFileInfo(f protocol.FileInfo) File {
+	var blocks = make([]Block, len(f.Blocks))
+	var offset int64
+	for i, b := range f.Blocks {
+		blocks[i] = Block{
+			Offset: offset,
+			Size:   b.Size,
+			Hash:   b.Hash,
+		}
+		offset += int64(b.Size)
+	}
+	return File{
+		Name:     f.Name,
+		Size:     offset,
+		Flags:    f.Flags,
+		Modified: f.Modified,
+		Version:  f.Version,
+		Blocks:   blocks,
+	}
+}
+
+func fileInfoFromFile(f File) protocol.FileInfo {
+	var blocks = make([]protocol.BlockInfo, len(f.Blocks))
+	for i, b := range f.Blocks {
+		blocks[i] = protocol.BlockInfo{
+			Size: b.Size,
+			Hash: b.Hash,
+		}
+	}
+	return protocol.FileInfo{
+		Name:     f.Name,
+		Flags:    f.Flags,
+		Modified: f.Modified,
+		Version:  f.Version,
+		Blocks:   blocks,
+	}
+}

+ 540 - 0
cmd/syncthing/model_test.go

@@ -0,0 +1,540 @@
+package main
+
+import (
+	"bytes"
+	"fmt"
+	"os"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/calmh/syncthing/protocol"
+)
+
+func TestNewModel(t *testing.T) {
+	m := NewModel("foo", 1e6)
+
+	if m == nil {
+		t.Fatalf("NewModel returned nil")
+	}
+
+	if fs, _ := m.NeedFiles(); len(fs) > 0 {
+		t.Errorf("New model should have no Need")
+	}
+
+	if len(m.local) > 0 {
+		t.Errorf("New model should have no Have")
+	}
+}
+
+var testDataExpected = map[string]File{
+	"foo": File{
+		Name:     "foo",
+		Flags:    0,
+		Modified: 0,
+		Size:     7,
+		Blocks:   []Block{{Offset: 0x0, Size: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}},
+	},
+	"empty": File{
+		Name:     "empty",
+		Flags:    0,
+		Modified: 0,
+		Size:     0,
+		Blocks:   []Block{{Offset: 0x0, Size: 0x0, Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}},
+	},
+	"bar": File{
+		Name:     "bar",
+		Flags:    0,
+		Modified: 0,
+		Size:     10,
+		Blocks:   []Block{{Offset: 0x0, Size: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}},
+	},
+}
+
+func init() {
+	// Fix expected test data to match reality
+	for n, f := range testDataExpected {
+		fi, _ := os.Stat("testdata/" + n)
+		f.Flags = uint32(fi.Mode())
+		f.Modified = fi.ModTime().Unix()
+		testDataExpected[n] = f
+	}
+}
+
+func TestUpdateLocal(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	if fs, _ := m.NeedFiles(); len(fs) > 0 {
+		t.Fatalf("Model with only local data should have no need")
+	}
+
+	if l1, l2 := len(m.local), len(testDataExpected); l1 != l2 {
+		t.Fatalf("Model len(local) incorrect, %d != %d", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(testDataExpected); l1 != l2 {
+		t.Fatalf("Model len(global) incorrect, %d != %d", l1, l2)
+	}
+	for name, file := range testDataExpected {
+		if f, ok := m.local[name]; ok {
+			if !reflect.DeepEqual(f, file) {
+				t.Errorf("Incorrect local\n%v !=\n%v\nfor file %q", f, file, name)
+			}
+		} else {
+			t.Errorf("Missing file %q in local table", name)
+		}
+		if f, ok := m.global[name]; ok {
+			if !reflect.DeepEqual(f, file) {
+				t.Errorf("Incorrect global\n%v !=\n%v\nfor file %q", f, file, name)
+			}
+		} else {
+			t.Errorf("Missing file %q in global table", name)
+		}
+	}
+
+	for _, f := range fs {
+		if hf, ok := m.local[f.Name]; !ok || hf.Modified != f.Modified {
+			t.Fatalf("Incorrect local for %q", f.Name)
+		}
+		if cf, ok := m.global[f.Name]; !ok || cf.Modified != f.Modified {
+			t.Fatalf("Incorrect global for %q", f.Name)
+		}
+	}
+}
+
+func TestRemoteUpdateExisting(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	newFile := protocol.FileInfo{
+		Name:     "foo",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+	m.Index("42", []protocol.FileInfo{newFile})
+
+	if fs, _ := m.NeedFiles(); len(fs) != 1 {
+		t.Errorf("Model missing Need for one file (%d != 1)", len(fs))
+	}
+}
+
+func TestRemoteAddNew(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	newFile := protocol.FileInfo{
+		Name:     "a new file",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+	m.Index("42", []protocol.FileInfo{newFile})
+
+	if fs, _ := m.NeedFiles(); len(fs) != 1 {
+		t.Errorf("Model len(m.need) incorrect (%d != 1)", len(fs))
+	}
+}
+
+func TestRemoteUpdateOld(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	oldTimeStamp := int64(1234)
+	newFile := protocol.FileInfo{
+		Name:     "foo",
+		Modified: oldTimeStamp,
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+	m.Index("42", []protocol.FileInfo{newFile})
+
+	if fs, _ := m.NeedFiles(); len(fs) != 0 {
+		t.Errorf("Model len(need) incorrect (%d != 0)", len(fs))
+	}
+}
+
+func TestRemoteIndexUpdate(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	foo := protocol.FileInfo{
+		Name:     "foo",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+
+	bar := protocol.FileInfo{
+		Name:     "bar",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+
+	m.Index("42", []protocol.FileInfo{foo})
+
+	if fs, _ := m.NeedFiles(); fs[0].Name != "foo" {
+		t.Error("Model doesn't need 'foo'")
+	}
+
+	m.IndexUpdate("42", []protocol.FileInfo{bar})
+
+	if fs, _ := m.NeedFiles(); fs[0].Name != "foo" {
+		t.Error("Model doesn't need 'foo'")
+	}
+	if fs, _ := m.NeedFiles(); fs[1].Name != "bar" {
+		t.Error("Model doesn't need 'bar'")
+	}
+}
+
+func TestDelete(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	if l1, l2 := len(m.local), len(fs); l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs); l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+
+	ot := time.Now().Unix()
+	newFile := File{
+		Name:     "a new file",
+		Modified: ot,
+		Blocks:   []Block{{0, 100, []byte("some hash bytes")}},
+	}
+	m.updateLocal(newFile)
+
+	if l1, l2 := len(m.local), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+
+	// The deleted file is kept in the local and global tables and marked as deleted.
+
+	m.ReplaceLocal(fs)
+
+	if l1, l2 := len(m.local), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+
+	if m.local["a new file"].Flags&(1<<12) == 0 {
+		t.Error("Unexpected deleted flag = 0 in local table")
+	}
+	if len(m.local["a new file"].Blocks) != 0 {
+		t.Error("Unexpected non-zero blocks for deleted file in local")
+	}
+	if ft := m.local["a new file"].Modified; ft != ot {
+		t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot+1)
+	}
+	if fv := m.local["a new file"].Version; fv != 1 {
+		t.Errorf("Unexpected version %d != 1 for deleted file in local", fv)
+	}
+
+	if m.global["a new file"].Flags&(1<<12) == 0 {
+		t.Error("Unexpected deleted flag = 0 in global table")
+	}
+	if len(m.global["a new file"].Blocks) != 0 {
+		t.Error("Unexpected non-zero blocks for deleted file in global")
+	}
+	if ft := m.global["a new file"].Modified; ft != ot {
+		t.Errorf("Unexpected time %d != %d for deleted file in global", ft, ot+1)
+	}
+	if fv := m.local["a new file"].Version; fv != 1 {
+		t.Errorf("Unexpected version %d != 1 for deleted file in global", fv)
+	}
+
+	// Another update should change nothing
+
+	m.ReplaceLocal(fs)
+
+	if l1, l2 := len(m.local), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+
+	if m.local["a new file"].Flags&(1<<12) == 0 {
+		t.Error("Unexpected deleted flag = 0 in local table")
+	}
+	if len(m.local["a new file"].Blocks) != 0 {
+		t.Error("Unexpected non-zero blocks for deleted file in local")
+	}
+	if ft := m.local["a new file"].Modified; ft != ot {
+		t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot)
+	}
+	if fv := m.local["a new file"].Version; fv != 1 {
+		t.Errorf("Unexpected version %d != 1 for deleted file in local", fv)
+	}
+
+	if m.global["a new file"].Flags&(1<<12) == 0 {
+		t.Error("Unexpected deleted flag = 0 in global table")
+	}
+	if len(m.global["a new file"].Blocks) != 0 {
+		t.Error("Unexpected non-zero blocks for deleted file in global")
+	}
+	if ft := m.global["a new file"].Modified; ft != ot {
+		t.Errorf("Unexpected time %d != %d for deleted file in global", ft, ot)
+	}
+	if fv := m.local["a new file"].Version; fv != 1 {
+		t.Errorf("Unexpected version %d != 1 for deleted file in global", fv)
+	}
+}
+
+func TestForgetNode(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	if l1, l2 := len(m.local), len(fs); l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs); l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+	if fs, _ := m.NeedFiles(); len(fs) != 0 {
+		t.Errorf("Model len(need) incorrect (%d != 0)", len(fs))
+	}
+
+	newFile := protocol.FileInfo{
+		Name:     "new file",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+	m.Index("42", []protocol.FileInfo{newFile})
+
+	newFile = protocol.FileInfo{
+		Name:     "new file 2",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+	}
+	m.Index("43", []protocol.FileInfo{newFile})
+
+	if l1, l2 := len(m.local), len(fs); l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs)+2; l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+	if fs, _ := m.NeedFiles(); len(fs) != 2 {
+		t.Errorf("Model len(need) incorrect (%d != 2)", len(fs))
+	}
+
+	m.Close("42", nil)
+
+	if l1, l2 := len(m.local), len(fs); l1 != l2 {
+		t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
+	}
+	if l1, l2 := len(m.global), len(fs)+1; l1 != l2 {
+		t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2)
+	}
+
+	if fs, _ := m.NeedFiles(); len(fs) != 1 {
+		t.Errorf("Model len(need) incorrect (%d != 1)", len(fs))
+	}
+}
+
+func TestRequest(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	bs, err := m.Request("some node", "default", "foo", 0, 6)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if bytes.Compare(bs, []byte("foobar")) != 0 {
+		t.Errorf("Incorrect data from request: %q", string(bs))
+	}
+
+	bs, err = m.Request("some node", "default", "../walk.go", 0, 6)
+	if err == nil {
+		t.Error("Unexpected nil error on insecure file read")
+	}
+	if bs != nil {
+		t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
+	}
+}
+
+func TestIgnoreWithUnknownFlags(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	valid := protocol.FileInfo{
+		Name:     "valid",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+		Flags:    protocol.FlagDeleted | 0755,
+	}
+
+	invalid := protocol.FileInfo{
+		Name:     "invalid",
+		Modified: time.Now().Unix(),
+		Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+		Flags:    1<<27 | protocol.FlagDeleted | 0755,
+	}
+
+	m.Index("42", []protocol.FileInfo{valid, invalid})
+
+	if _, ok := m.global[valid.Name]; !ok {
+		t.Error("Model should include", valid)
+	}
+	if _, ok := m.global[invalid.Name]; ok {
+		t.Error("Model not should include", invalid)
+	}
+}
+
+func genFiles(n int) []protocol.FileInfo {
+	files := make([]protocol.FileInfo, n)
+	t := time.Now().Unix()
+	for i := 0; i < n; i++ {
+		files[i] = protocol.FileInfo{
+			Name:     fmt.Sprintf("file%d", i),
+			Modified: t,
+			Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+		}
+	}
+
+	return files
+}
+
+func BenchmarkIndex10000(b *testing.B) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+	files := genFiles(10000)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		m.Index("42", files)
+	}
+}
+
+func BenchmarkIndex00100(b *testing.B) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+	files := genFiles(100)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		m.Index("42", files)
+	}
+}
+
+func BenchmarkIndexUpdate10000f10000(b *testing.B) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+	files := genFiles(10000)
+	m.Index("42", files)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		m.IndexUpdate("42", files)
+	}
+}
+
+func BenchmarkIndexUpdate10000f00100(b *testing.B) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+	files := genFiles(10000)
+	m.Index("42", files)
+
+	ufiles := genFiles(100)
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		m.IndexUpdate("42", ufiles)
+	}
+}
+
+func BenchmarkIndexUpdate10000f00001(b *testing.B) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+	files := genFiles(10000)
+	m.Index("42", files)
+
+	ufiles := genFiles(1)
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		m.IndexUpdate("42", ufiles)
+	}
+}
+
+type FakeConnection struct {
+	id          string
+	requestData []byte
+}
+
+func (FakeConnection) Close() error {
+	return nil
+}
+
+func (f FakeConnection) ID() string {
+	return string(f.id)
+}
+
+func (f FakeConnection) Option(string) string {
+	return ""
+}
+
+func (FakeConnection) Index(string, []protocol.FileInfo) {}
+
+func (f FakeConnection) Request(repo, name string, offset int64, size int) ([]byte, error) {
+	return f.requestData, nil
+}
+
+func (FakeConnection) Ping() bool {
+	return true
+}
+
+func (FakeConnection) Statistics() protocol.Statistics {
+	return protocol.Statistics{}
+}
+
+func BenchmarkRequest(b *testing.B) {
+	m := NewModel("testdata", 1e6)
+	fs, _ := m.Walk(false)
+	m.ReplaceLocal(fs)
+
+	const n = 1000
+	files := make([]protocol.FileInfo, n)
+	t := time.Now().Unix()
+	for i := 0; i < n; i++ {
+		files[i] = protocol.FileInfo{
+			Name:     fmt.Sprintf("file%d", i),
+			Modified: t,
+			Blocks:   []protocol.BlockInfo{{100, []byte("some hash bytes")}},
+		}
+	}
+
+	fc := FakeConnection{
+		id:          "42",
+		requestData: []byte("some data to return"),
+	}
+	m.AddConnection(fc, fc)
+	m.Index("42", files)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		data, err := m.requestGlobal("42", files[i%n].Name, 0, 32, nil)
+		if err != nil {
+			b.Error(err)
+		}
+		if data == nil {
+			b.Error("nil data")
+		}
+	}
+}

+ 34 - 0
cmd/syncthing/openurl.go

@@ -0,0 +1,34 @@
+/*
+Copyright 2011 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"os/exec"
+	"runtime"
+)
+
+func openURL(url string) error {
+	if runtime.GOOS == "windows" {
+		return exec.Command("cmd.exe", "/C", "start "+url).Run()
+	}
+
+	if runtime.GOOS == "darwin" {
+		return exec.Command("open", url).Run()
+	}
+
+	return exec.Command("xdg-open", url).Run()
+}

+ 72 - 0
cmd/syncthing/suppressor.go

@@ -0,0 +1,72 @@
+package main
+
+import (
+	"sync"
+	"time"
+)
+
+const (
+	MaxChangeHistory = 4
+)
+
+type change struct {
+	size int64
+	when time.Time
+}
+
+type changeHistory struct {
+	changes []change
+	next    int64
+	prevSup bool
+}
+
+type suppressor struct {
+	sync.Mutex
+	changes   map[string]changeHistory
+	threshold int64 // bytes/s
+}
+
+func (h changeHistory) bandwidth(t time.Time) int64 {
+	if len(h.changes) == 0 {
+		return 0
+	}
+
+	var t0 = h.changes[0].when
+	if t == t0 {
+		return 0
+	}
+
+	var bw float64
+	for _, c := range h.changes {
+		bw += float64(c.size)
+	}
+	return int64(bw / t.Sub(t0).Seconds())
+}
+
+func (h *changeHistory) append(size int64, t time.Time) {
+	c := change{size, t}
+	if len(h.changes) == MaxChangeHistory {
+		h.changes = h.changes[1:MaxChangeHistory]
+	}
+	h.changes = append(h.changes, c)
+}
+
+func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) {
+	s.Lock()
+
+	if s.changes == nil {
+		s.changes = make(map[string]changeHistory)
+	}
+	h := s.changes[name]
+	sup := h.bandwidth(t) > s.threshold
+	prevSup := h.prevSup
+	h.prevSup = sup
+	if !sup {
+		h.append(size, t)
+	}
+	s.changes[name] = h
+
+	s.Unlock()
+
+	return sup, prevSup
+}

+ 113 - 0
cmd/syncthing/suppressor_test.go

@@ -0,0 +1,113 @@
+package main
+
+import (
+	"testing"
+	"time"
+)
+
+func TestSuppressor(t *testing.T) {
+	s := suppressor{threshold: 10000}
+	t0 := time.Now()
+
+	t1 := t0
+	sup, prev := s.suppress("foo", 10000, t1)
+	if sup {
+		t.Fatal("Never suppress first change")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is 10000 / 10 = 1000
+	t1 = t0.Add(10 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 1000 {
+		t.Errorf("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 10000, t1)
+	if sup {
+		t.Fatal("Should still be fine")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is (10000 + 10000) / 11 = 1818
+	t1 = t0.Add(11 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 1818 {
+		t.Errorf("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 100500, t1)
+	if sup {
+		t.Fatal("Should still be fine")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is (10000 + 10000 + 100500) / 12 = 10041
+	t1 = t0.Add(12 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 10041 {
+		t.Errorf("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored
+	if !sup {
+		t.Fatal("Should be over threshold")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is (10000 + 10000 + 100500) / 15 = 8033
+	t1 = t0.Add(15 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 8033 {
+		t.Errorf("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 10000000, t1)
+	if sup {
+		t.Fatal("Should be Ok")
+	}
+	if !prev {
+		t.Fatal("Incorrect prev status")
+	}
+}
+
+func TestHistory(t *testing.T) {
+	h := changeHistory{}
+
+	t0 := time.Now()
+	h.append(40, t0)
+
+	if l := len(h.changes); l != 1 {
+		t.Errorf("Incorrect history length %d", l)
+	}
+	if s := h.changes[0].size; s != 40 {
+		t.Errorf("Incorrect first record size %d", s)
+	}
+
+	for i := 1; i < MaxChangeHistory; i++ {
+		h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second))
+	}
+
+	if l := len(h.changes); l != MaxChangeHistory {
+		t.Errorf("Incorrect history length %d", l)
+	}
+	if s := h.changes[0].size; s != 40 {
+		t.Errorf("Incorrect first record size %d", s)
+	}
+	if s := h.changes[MaxChangeHistory-1].size; s != 40+MaxChangeHistory-1 {
+		t.Errorf("Incorrect last record size %d", s)
+	}
+
+	h.append(999, t0.Add(time.Duration(999)*time.Second))
+
+	if l := len(h.changes); l != MaxChangeHistory {
+		t.Errorf("Incorrect history length %d", l)
+	}
+	if s := h.changes[0].size; s != 41 {
+		t.Errorf("Incorrect first record size %d", s)
+	}
+	if s := h.changes[MaxChangeHistory-1].size; s != 999 {
+		t.Errorf("Incorrect last record size %d", s)
+	}
+
+}

+ 2 - 0
cmd/syncthing/testdata/.stignore

@@ -0,0 +1,2 @@
+.*
+quux

+ 1 - 0
cmd/syncthing/testdata/bar

@@ -0,0 +1 @@
+foobarbaz

+ 1 - 0
cmd/syncthing/testdata/baz/quux

@@ -0,0 +1 @@
+baazquux

+ 0 - 0
cmd/syncthing/testdata/empty


+ 1 - 0
cmd/syncthing/testdata/foo

@@ -0,0 +1 @@
+foobar

+ 71 - 0
cmd/syncthing/tls.go

@@ -0,0 +1,71 @@
+package main
+
+import (
+	"crypto/rand"
+	"crypto/rsa"
+	"crypto/sha256"
+	"crypto/tls"
+	"crypto/x509"
+	"crypto/x509/pkix"
+	"encoding/base32"
+	"encoding/pem"
+	"math/big"
+	"os"
+	"path"
+	"strings"
+	"time"
+)
+
+const (
+	tlsRSABits = 3072
+	tlsName    = "syncthing"
+)
+
+func loadCert(dir string) (tls.Certificate, error) {
+	return tls.LoadX509KeyPair(path.Join(dir, "cert.pem"), path.Join(dir, "key.pem"))
+}
+
+func certID(bs []byte) string {
+	hf := sha256.New()
+	hf.Write(bs)
+	id := hf.Sum(nil)
+	return strings.Trim(base32.StdEncoding.EncodeToString(id), "=")
+}
+
+func newCertificate(dir string) {
+	infoln("Generating RSA certificate and key...")
+
+	priv, err := rsa.GenerateKey(rand.Reader, tlsRSABits)
+	fatalErr(err)
+
+	notBefore := time.Now()
+	notAfter := time.Date(2049, 12, 31, 23, 59, 59, 0, time.UTC)
+
+	template := x509.Certificate{
+		SerialNumber: new(big.Int).SetInt64(0),
+		Subject: pkix.Name{
+			CommonName: tlsName,
+		},
+		NotBefore: notBefore,
+		NotAfter:  notAfter,
+
+		KeyUsage:              x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
+		ExtKeyUsage:           []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
+		BasicConstraintsValid: true,
+	}
+
+	derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
+	fatalErr(err)
+
+	certOut, err := os.Create(path.Join(dir, "cert.pem"))
+	fatalErr(err)
+	pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
+	certOut.Close()
+	okln("Created RSA certificate file")
+
+	keyOut, err := os.OpenFile(path.Join(dir, "key.pem"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
+	fatalErr(err)
+	pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)})
+	keyOut.Close()
+	okln("Created RSA key file")
+}

+ 52 - 0
cmd/syncthing/usage.go

@@ -0,0 +1,52 @@
+package main
+
+import (
+	"bytes"
+	"flag"
+	"fmt"
+	"io"
+	"text/tabwriter"
+)
+
+func optionTable(w io.Writer, rows [][]string) {
+	tw := tabwriter.NewWriter(w, 2, 4, 2, ' ', 0)
+	for _, row := range rows {
+		for i, cell := range row {
+			if i > 0 {
+				tw.Write([]byte("\t"))
+			}
+			tw.Write([]byte(cell))
+		}
+		tw.Write([]byte("\n"))
+	}
+	tw.Flush()
+}
+
+func usageFor(fs *flag.FlagSet, usage string) func() {
+	return func() {
+		var b bytes.Buffer
+		b.WriteString("Usage:\n  " + usage + "\n")
+
+		var options [][]string
+		fs.VisitAll(func(f *flag.Flag) {
+			var dash = "-"
+			if len(f.Name) > 1 {
+				dash = "--"
+			}
+			var opt = "  " + dash + f.Name
+
+			if f.DefValue != "false" {
+				opt += "=" + f.DefValue
+			}
+
+			options = append(options, []string{opt, f.Usage})
+		})
+
+		if len(options) > 0 {
+			b.WriteString("\nOptions:\n")
+			optionTable(&b, options)
+		}
+
+		fmt.Println(b.String())
+	}
+}

+ 29 - 0
cmd/syncthing/util.go

@@ -0,0 +1,29 @@
+package main
+
+import "fmt"
+
+func MetricPrefix(n int64) string {
+	if n > 1e9 {
+		return fmt.Sprintf("%.02f G", float64(n)/1e9)
+	}
+	if n > 1e6 {
+		return fmt.Sprintf("%.02f M", float64(n)/1e6)
+	}
+	if n > 1e3 {
+		return fmt.Sprintf("%.01f k", float64(n)/1e3)
+	}
+	return fmt.Sprintf("%d ", n)
+}
+
+func BinaryPrefix(n int64) string {
+	if n > 1<<30 {
+		return fmt.Sprintf("%.02f Gi", float64(n)/(1<<30))
+	}
+	if n > 1<<20 {
+		return fmt.Sprintf("%.02f Mi", float64(n)/(1<<20))
+	}
+	if n > 1<<10 {
+		return fmt.Sprintf("%.01f Ki", float64(n)/(1<<10))
+	}
+	return fmt.Sprintf("%d ", n)
+}

+ 238 - 0
cmd/syncthing/walk.go

@@ -0,0 +1,238 @@
+package main
+
+import (
+	"bytes"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"os"
+	"path"
+	"path/filepath"
+	"strings"
+	"time"
+
+	"github.com/calmh/syncthing/protocol"
+)
+
+const BlockSize = 128 * 1024
+
+type File struct {
+	Name     string
+	Flags    uint32
+	Modified int64
+	Version  uint32
+	Size     int64
+	Blocks   []Block
+}
+
+func (f File) String() string {
+	return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d, Size:%d, NumBlocks:%d}",
+		f.Name, f.Flags, f.Modified, f.Version, f.Size, len(f.Blocks))
+}
+
+func (f File) Equals(o File) bool {
+	return f.Modified == o.Modified && f.Version == o.Version
+}
+
+func (f File) NewerThan(o File) bool {
+	return f.Modified > o.Modified || (f.Modified == o.Modified && f.Version > o.Version)
+}
+
+func isTempName(name string) bool {
+	return strings.HasPrefix(path.Base(name), ".syncthing.")
+}
+
+func tempName(name string, modified int64) string {
+	tdir := path.Dir(name)
+	tname := fmt.Sprintf(".syncthing.%s.%d", path.Base(name), modified)
+	return path.Join(tdir, tname)
+}
+
+func (m *Model) loadIgnoreFiles(ign map[string][]string) filepath.WalkFunc {
+	return func(p string, info os.FileInfo, err error) error {
+		if err != nil {
+			return nil
+		}
+
+		rn, err := filepath.Rel(m.dir, p)
+		if err != nil {
+			return nil
+		}
+
+		if pn, sn := path.Split(rn); sn == ".stignore" {
+			pn := strings.Trim(pn, "/")
+			bs, _ := ioutil.ReadFile(p)
+			lines := bytes.Split(bs, []byte("\n"))
+			var patterns []string
+			for _, line := range lines {
+				if len(line) > 0 {
+					patterns = append(patterns, string(line))
+				}
+			}
+			ign[pn] = patterns
+		}
+
+		return nil
+	}
+}
+
+func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.WalkFunc {
+	return func(p string, info os.FileInfo, err error) error {
+		if err != nil {
+			if m.trace["file"] {
+				log.Printf("FILE: %q: %v", p, err)
+			}
+			return nil
+		}
+
+		if isTempName(p) {
+			return nil
+		}
+
+		rn, err := filepath.Rel(m.dir, p)
+		if err != nil {
+			return nil
+		}
+
+		if _, sn := path.Split(rn); sn == ".stignore" {
+			// We never sync the .stignore files
+			return nil
+		}
+
+		if ignoreFile(ign, rn) {
+			if m.trace["file"] {
+				log.Println("FILE: IGNORE:", rn)
+			}
+			return nil
+		}
+
+		if info.Mode()&os.ModeType == 0 {
+			modified := info.ModTime().Unix()
+
+			m.lmut.RLock()
+			lf, ok := m.local[rn]
+			m.lmut.RUnlock()
+
+			if ok && lf.Modified == modified {
+				if nf := uint32(info.Mode()); nf != lf.Flags {
+					lf.Flags = nf
+					lf.Version++
+				}
+				*res = append(*res, lf)
+			} else {
+				if cur, prev := m.sup.suppress(rn, info.Size(), time.Now()); cur {
+					if m.trace["file"] {
+						log.Printf("FILE: SUPPRESS: %q change bw over threshold", rn)
+					}
+					if !prev {
+						log.Printf("INFO: Changes to %q are being temporarily suppressed because it changes too frequently.", rn)
+					}
+
+					if ok {
+						lf.Flags = protocol.FlagInvalid
+						lf.Version++
+						*res = append(*res, lf)
+					}
+					return nil
+				} else if prev && !cur {
+					log.Printf("INFO: Changes to %q are no longer suppressed.", rn)
+				}
+
+				if m.trace["file"] {
+					log.Printf("FILE: Hash %q", p)
+				}
+				fd, err := os.Open(p)
+				if err != nil {
+					if m.trace["file"] {
+						log.Printf("FILE: %q: %v", p, err)
+					}
+					return nil
+				}
+				defer fd.Close()
+
+				blocks, err := Blocks(fd, BlockSize)
+				if err != nil {
+					if m.trace["file"] {
+						log.Printf("FILE: %q: %v", p, err)
+					}
+					return nil
+				}
+				f := File{
+					Name:     rn,
+					Size:     info.Size(),
+					Flags:    uint32(info.Mode()),
+					Modified: modified,
+					Blocks:   blocks,
+				}
+				*res = append(*res, f)
+			}
+		}
+
+		return nil
+	}
+}
+
+// Walk returns the list of files found in the local repository by scanning the
+// file system. Files are blockwise hashed.
+func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]string) {
+	ignore = make(map[string][]string)
+
+	hashFiles := m.walkAndHashFiles(&files, ignore)
+
+	filepath.Walk(m.dir, m.loadIgnoreFiles(ignore))
+	filepath.Walk(m.dir, hashFiles)
+
+	if followSymlinks {
+		d, err := os.Open(m.dir)
+		if err != nil {
+			return
+		}
+		defer d.Close()
+
+		fis, err := d.Readdir(-1)
+		if err != nil {
+			return
+		}
+
+		for _, info := range fis {
+			if info.Mode()&os.ModeSymlink != 0 {
+				dir := path.Join(m.dir, info.Name()) + "/"
+				filepath.Walk(dir, m.loadIgnoreFiles(ignore))
+				filepath.Walk(dir, hashFiles)
+			}
+		}
+	}
+
+	return
+}
+
+func (m *Model) cleanTempFile(path string, info os.FileInfo, err error) error {
+	if err != nil {
+		return err
+	}
+	if info.Mode()&os.ModeType == 0 && isTempName(path) {
+		if m.trace["file"] {
+			log.Printf("FILE: Remove %q", path)
+		}
+		os.Remove(path)
+	}
+	return nil
+}
+
+func (m *Model) cleanTempFiles() {
+	filepath.Walk(m.dir, m.cleanTempFile)
+}
+
+func ignoreFile(patterns map[string][]string, file string) bool {
+	first, last := path.Split(file)
+	for prefix, pats := range patterns {
+		if len(prefix) == 0 || prefix == first || strings.HasPrefix(first, prefix+"/") {
+			for _, pattern := range pats {
+				if match, _ := path.Match(pattern, last); match {
+					return true
+				}
+			}
+		}
+	}
+	return false
+}

+ 83 - 0
cmd/syncthing/walk_test.go

@@ -0,0 +1,83 @@
+package main
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+	"time"
+)
+
+var testdata = []struct {
+	name string
+	size int
+	hash string
+}{
+	{"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"},
+	{"empty", 0, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"},
+	{"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"},
+}
+
+var correctIgnores = map[string][]string{
+	"": {".*", "quux"},
+}
+
+func TestWalk(t *testing.T) {
+	m := NewModel("testdata", 1e6)
+	files, ignores := m.Walk(false)
+
+	if l1, l2 := len(files), len(testdata); l1 != l2 {
+		t.Fatalf("Incorrect number of walked files %d != %d", l1, l2)
+	}
+
+	for i := range testdata {
+		if n1, n2 := testdata[i].name, files[i].Name; n1 != n2 {
+			t.Errorf("Incorrect file name %q != %q for case #%d", n1, n2, i)
+		}
+
+		if h1, h2 := fmt.Sprintf("%x", files[i].Blocks[0].Hash), testdata[i].hash; h1 != h2 {
+			t.Errorf("Incorrect hash %q != %q for case #%d", h1, h2, i)
+		}
+
+		t0 := time.Date(2010, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
+		t1 := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
+		if mt := files[i].Modified; mt < t0 || mt > t1 {
+			t.Errorf("Unrealistic modtime %d for test %d", mt, i)
+		}
+	}
+
+	if !reflect.DeepEqual(ignores, correctIgnores) {
+		t.Errorf("Incorrect ignores\n  %v\n  %v", correctIgnores, ignores)
+	}
+}
+
+func TestIgnore(t *testing.T) {
+	var patterns = map[string][]string{
+		"":        {"t2"},
+		"foo":     {"bar", "z*"},
+		"foo/baz": {"quux", ".*"},
+	}
+	var tests = []struct {
+		f string
+		r bool
+	}{
+		{"foo/bar", true},
+		{"foo/quux", false},
+		{"foo/zuux", true},
+		{"foo/qzuux", false},
+		{"foo/baz/t1", false},
+		{"foo/baz/t2", true},
+		{"foo/baz/bar", true},
+		{"foo/baz/quuxa", false},
+		{"foo/baz/aquux", false},
+		{"foo/baz/.quux", true},
+		{"foo/baz/zquux", true},
+		{"foo/baz/quux", true},
+		{"foo/bazz/quux", false},
+	}
+
+	for i, tc := range tests {
+		if r := ignoreFile(patterns, tc.f); r != tc.r {
+			t.Errorf("Incorrect ignoreFile() #%d; E: %v, A: %v", i, tc.r, r)
+		}
+	}
+}