Browse Source

Enforce identical member configuration among nodes (fixes #63)

Jakob Borg 11 years ago
parent
commit
fc6eabea28
4 changed files with 67 additions and 29 deletions
  1. 24 0
      config.go
  2. 10 14
      main.go
  3. 19 12
      model/model.go
  4. 14 3
      protocol/protocol.go

+ 24 - 0
config.go

@@ -1,9 +1,12 @@
 package main
 
 import (
+	"crypto/sha256"
 	"encoding/xml"
+	"fmt"
 	"io"
 	"reflect"
+	"sort"
 	"strconv"
 	"strings"
 )
@@ -154,3 +157,24 @@ func readConfigXML(rd io.Reader) (Configuration, error) {
 	cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress)
 	return cfg, err
 }
+
+type NodeConfigurationList []NodeConfiguration
+
+func (l NodeConfigurationList) Less(a, b int) bool {
+	return l[a].NodeID < l[b].NodeID
+}
+func (l NodeConfigurationList) Swap(a, b int) {
+	l[a], l[b] = l[b], l[a]
+}
+func (l NodeConfigurationList) Len() int {
+	return len(l)
+}
+
+func clusterHash(nodes []NodeConfiguration) string {
+	sort.Sort(NodeConfigurationList(nodes))
+	h := sha256.New()
+	for _, n := range nodes {
+		h.Write([]byte(n.NodeID))
+	}
+	return fmt.Sprintf("%x", h.Sum(nil))
+}

+ 10 - 14
main.go

@@ -204,12 +204,18 @@ func main() {
 	loadIndex(m)
 	updateLocalModel(m)
 
+	connOpts := map[string]string{
+		"clientId":      "syncthing",
+		"clientVersion": Version,
+		"clusterHash":   clusterHash(cfg.Repositories[0].Nodes),
+	}
+
 	// Routine to listen for incoming connections
 	if verbose {
 		infoln("Listening for incoming connections")
 	}
 	for _, addr := range cfg.Options.ListenAddress {
-		go listen(myID, addr, m, tlsCfg)
+		go listen(myID, addr, m, tlsCfg, connOpts)
 	}
 
 	// Routine to connect out to configured nodes
@@ -217,7 +223,7 @@ func main() {
 		infoln("Attempting to connect to other nodes")
 	}
 	disc := discovery(cfg.Options.ListenAddress[0])
-	go connect(myID, disc, m, tlsCfg)
+	go connect(myID, disc, m, tlsCfg, connOpts)
 
 	// Routine to pull blocks from other nodes to synchronize the local
 	// repository. Does not run when we are in read only (publish only) mode.
@@ -320,18 +326,13 @@ func printStatsLoop(m *model.Model) {
 	}
 }
 
-func listen(myID string, addr string, m *model.Model, tlsCfg *tls.Config) {
+func listen(myID string, addr string, m *model.Model, tlsCfg *tls.Config, connOpts map[string]string) {
 	if strings.Contains(trace, "connect") {
 		debugln("NET: Listening on", addr)
 	}
 	l, err := tls.Listen("tcp", addr, tlsCfg)
 	fatalErr(err)
 
-	connOpts := map[string]string{
-		"clientId":      "syncthing",
-		"clientVersion": Version,
-	}
-
 listen:
 	for {
 		conn, err := l.Accept()
@@ -401,12 +402,7 @@ func discovery(addr string) *discover.Discoverer {
 	return disc
 }
 
-func connect(myID string, disc *discover.Discoverer, m *model.Model, tlsCfg *tls.Config) {
-	connOpts := map[string]string{
-		"clientId":      "syncthing",
-		"clientVersion": Version,
-	}
-
+func connect(myID string, disc *discover.Discoverer, m *model.Model, tlsCfg *tls.Config, connOpts map[string]string) {
 	for {
 	nextNode:
 		for _, nodeCfg := range cfg.Repositories[0].Nodes {

+ 19 - 12
model/model.go

@@ -268,7 +268,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
 	defer m.imut.Unlock()
 
 	if m.trace["net"] {
-		log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
+		log.Printf("DEBUG: NET IDX(in): %s: %d files", nodeID, len(fs))
 	}
 
 	repo := make(map[string]File)
@@ -296,7 +296,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
 	defer m.imut.Unlock()
 
 	if m.trace["net"] {
-		log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(files))
+		log.Printf("DEBUG: NET IDXUP(in): %s: %d files", nodeID, len(files))
 	}
 
 	m.rmut.Lock()
@@ -322,7 +322,7 @@ func (m *Model) indexUpdate(repo map[string]File, f File) {
 		if f.Flags&protocol.FlagDeleted != 0 {
 			flagComment = " (deleted)"
 		}
-		log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
+		log.Printf("DEBUG: IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
 	}
 
 	if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
@@ -336,6 +336,13 @@ func (m *Model) indexUpdate(repo map[string]File, f File) {
 // Close removes the peer from the model and closes the underlying connection if possible.
 // Implements the protocol.Model interface.
 func (m *Model) Close(node string, err error) {
+	if m.trace["net"] {
+		log.Printf("DEBUG: NET: %s: %v", node, err)
+	}
+	if err == protocol.ErrClusterHash {
+		log.Printf("WARNING: Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node)
+	}
+
 	m.fq.RemoveAvailable(node)
 
 	m.pmut.Lock()
@@ -378,7 +385,7 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b
 	}
 
 	if m.trace["net"] && nodeID != "<local>" {
-		log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
+		log.Printf("DEBUG: NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
 	}
 	fn := path.Join(m.dir, name)
 	fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
@@ -495,13 +502,13 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
 		i := i
 		go func() {
 			if m.trace["pull"] {
-				log.Println("PULL: Starting", nodeID, i)
+				log.Println("DEBUG: PULL: Starting", nodeID, i)
 			}
 			for {
 				m.pmut.RLock()
 				if _, ok := m.protoConn[nodeID]; !ok {
 					if m.trace["pull"] {
-						log.Println("PULL: Exiting", nodeID, i)
+						log.Println("DEBUG: PULL: Exiting", nodeID, i)
 					}
 					m.pmut.RUnlock()
 					return
@@ -511,7 +518,7 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
 				qb, ok := m.fq.Get(nodeID)
 				if ok {
 					if m.trace["pull"] {
-						log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
+						log.Println("DEBUG: PULL: Request", nodeID, i, qb.name, qb.block.Offset)
 					}
 					data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
 					m.fq.Done(qb.name, qb.block.Offset, data)
@@ -537,7 +544,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo {
 			if mf.Flags&protocol.FlagDeleted != 0 {
 				flagComment = " (deleted)"
 			}
-			log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
+			log.Printf("DEBUG: IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
 		}
 		index = append(index, mf)
 	}
@@ -556,7 +563,7 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, ha
 	}
 
 	if m.trace["net"] {
-		log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
+		log.Printf("DEBUG: NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
 	}
 
 	return nc.Request(name, offset, size, hash)
@@ -584,7 +591,7 @@ func (m *Model) broadcastIndexLoop() {
 			for _, node := range m.protoConn {
 				node := node
 				if m.trace["net"] {
-					log.Printf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
+					log.Printf("DEBUG: NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
 				}
 				go func() {
 					node.Index(idx)
@@ -796,7 +803,7 @@ func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File)
 			return toAdd, toDelete
 		}
 		if m.trace["need"] {
-			log.Printf("NEED: lf:%v gf:%v", lf, gf)
+			log.Printf("DEBUG: NEED: lf:%v gf:%v", lf, gf)
 		}
 
 		if gf.Flags&protocol.FlagDeleted != 0 {
@@ -838,7 +845,7 @@ func (m *Model) WhoHas(name string) []string {
 func (m *Model) deleteLoop() {
 	for file := range m.dq {
 		if m.trace["file"] {
-			log.Println("FILE: Delete", file.Name)
+			log.Println("DEBUG: FILE: Delete", file.Name)
 		}
 		path := path.Clean(path.Join(m.dir, file.Name))
 		err := os.Remove(path)

+ 14 - 3
protocol/protocol.go

@@ -27,6 +27,10 @@ const (
 	FlagInvalid = 1 << 13
 )
 
+var (
+	ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash")
+)
+
 type FileInfo struct {
 	Name     string
 	Flags    uint32
@@ -64,7 +68,8 @@ type Connection struct {
 	awaiting    map[int]chan asyncResult
 	nextId      int
 	indexSent   map[string][2]int64
-	options     map[string]string
+	peerOptions map[string]string
+	myOptions   map[string]string
 	optionsLock sync.Mutex
 
 	hasSentIndex  bool
@@ -106,6 +111,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
 	go c.pingerLoop()
 
 	if options != nil {
+		c.myOptions = options
 		go func() {
 			c.Lock()
 			c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions})
@@ -348,9 +354,14 @@ loop:
 
 		case messageTypeOptions:
 			c.optionsLock.Lock()
-			c.options = c.mreader.readOptions()
+			c.peerOptions = c.mreader.readOptions()
 			c.optionsLock.Unlock()
 
+			if mh, rh := c.myOptions["clusterHash"], c.peerOptions["clusterHash"]; len(mh) > 0 && len(rh) > 0 && mh != rh {
+				c.close(ErrClusterHash)
+				break loop
+			}
+
 		default:
 			c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
 			break loop
@@ -423,5 +434,5 @@ func (c *Connection) Statistics() Statistics {
 func (c *Connection) Option(key string) string {
 	c.optionsLock.Lock()
 	defer c.optionsLock.Unlock()
-	return c.options[key]
+	return c.peerOptions[key]
 }