diff --git a/config.go b/config.go index 34aba60a..047eb8a8 100644 --- a/config.go +++ b/config.go @@ -1,9 +1,12 @@ package main import ( + "crypto/sha256" "encoding/xml" + "fmt" "io" "reflect" + "sort" "strconv" "strings" ) @@ -154,3 +157,24 @@ func readConfigXML(rd io.Reader) (Configuration, error) { cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress) return cfg, err } + +type NodeConfigurationList []NodeConfiguration + +func (l NodeConfigurationList) Less(a, b int) bool { + return l[a].NodeID < l[b].NodeID +} +func (l NodeConfigurationList) Swap(a, b int) { + l[a], l[b] = l[b], l[a] +} +func (l NodeConfigurationList) Len() int { + return len(l) +} + +func clusterHash(nodes []NodeConfiguration) string { + sort.Sort(NodeConfigurationList(nodes)) + h := sha256.New() + for _, n := range nodes { + h.Write([]byte(n.NodeID)) + } + return fmt.Sprintf("%x", h.Sum(nil)) +} diff --git a/main.go b/main.go index 6b6dbbfb..dcd0b4b4 100644 --- a/main.go +++ b/main.go @@ -204,12 +204,18 @@ func main() { loadIndex(m) updateLocalModel(m) + connOpts := map[string]string{ + "clientId": "syncthing", + "clientVersion": Version, + "clusterHash": clusterHash(cfg.Repositories[0].Nodes), + } + // Routine to listen for incoming connections if verbose { infoln("Listening for incoming connections") } for _, addr := range cfg.Options.ListenAddress { - go listen(myID, addr, m, tlsCfg) + go listen(myID, addr, m, tlsCfg, connOpts) } // Routine to connect out to configured nodes @@ -217,7 +223,7 @@ func main() { infoln("Attempting to connect to other nodes") } disc := discovery(cfg.Options.ListenAddress[0]) - go connect(myID, disc, m, tlsCfg) + go connect(myID, disc, m, tlsCfg, connOpts) // Routine to pull blocks from other nodes to synchronize the local // repository. Does not run when we are in read only (publish only) mode. @@ -320,18 +326,13 @@ func printStatsLoop(m *model.Model) { } } -func listen(myID string, addr string, m *model.Model, tlsCfg *tls.Config) { +func listen(myID string, addr string, m *model.Model, tlsCfg *tls.Config, connOpts map[string]string) { if strings.Contains(trace, "connect") { debugln("NET: Listening on", addr) } l, err := tls.Listen("tcp", addr, tlsCfg) fatalErr(err) - connOpts := map[string]string{ - "clientId": "syncthing", - "clientVersion": Version, - } - listen: for { conn, err := l.Accept() @@ -401,12 +402,7 @@ func discovery(addr string) *discover.Discoverer { return disc } -func connect(myID string, disc *discover.Discoverer, m *model.Model, tlsCfg *tls.Config) { - connOpts := map[string]string{ - "clientId": "syncthing", - "clientVersion": Version, - } - +func connect(myID string, disc *discover.Discoverer, m *model.Model, tlsCfg *tls.Config, connOpts map[string]string) { for { nextNode: for _, nodeCfg := range cfg.Repositories[0].Nodes { diff --git a/model/model.go b/model/model.go index 815776b1..65bbb7c1 100644 --- a/model/model.go +++ b/model/model.go @@ -268,7 +268,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { defer m.imut.Unlock() if m.trace["net"] { - log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs)) + log.Printf("DEBUG: NET IDX(in): %s: %d files", nodeID, len(fs)) } repo := make(map[string]File) @@ -296,7 +296,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { defer m.imut.Unlock() if m.trace["net"] { - log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(files)) + log.Printf("DEBUG: NET IDXUP(in): %s: %d files", nodeID, len(files)) } m.rmut.Lock() @@ -322,7 +322,7 @@ func (m *Model) indexUpdate(repo map[string]File, f File) { if f.Flags&protocol.FlagDeleted != 0 { flagComment = " (deleted)" } - log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks)) + log.Printf("DEBUG: IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks)) } if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 { @@ -336,6 +336,13 @@ func (m *Model) indexUpdate(repo map[string]File, f File) { // Close removes the peer from the model and closes the underlying connection if possible. // Implements the protocol.Model interface. func (m *Model) Close(node string, err error) { + if m.trace["net"] { + log.Printf("DEBUG: NET: %s: %v", node, err) + } + if err == protocol.ErrClusterHash { + log.Printf("WARNING: Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node) + } + m.fq.RemoveAvailable(node) m.pmut.Lock() @@ -378,7 +385,7 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b } if m.trace["net"] && nodeID != "" { - log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + log.Printf("DEBUG: NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) } fn := path.Join(m.dir, name) fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? @@ -495,13 +502,13 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { i := i go func() { if m.trace["pull"] { - log.Println("PULL: Starting", nodeID, i) + log.Println("DEBUG: PULL: Starting", nodeID, i) } for { m.pmut.RLock() if _, ok := m.protoConn[nodeID]; !ok { if m.trace["pull"] { - log.Println("PULL: Exiting", nodeID, i) + log.Println("DEBUG: PULL: Exiting", nodeID, i) } m.pmut.RUnlock() return @@ -511,7 +518,7 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { qb, ok := m.fq.Get(nodeID) if ok { if m.trace["pull"] { - log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset) + log.Println("DEBUG: PULL: Request", nodeID, i, qb.name, qb.block.Offset) } data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash) m.fq.Done(qb.name, qb.block.Offset, data) @@ -537,7 +544,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { if mf.Flags&protocol.FlagDeleted != 0 { flagComment = " (deleted)" } - log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) + log.Printf("DEBUG: IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) } index = append(index, mf) } @@ -556,7 +563,7 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, ha } if m.trace["net"] { - log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + log.Printf("DEBUG: NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) } return nc.Request(name, offset, size, hash) @@ -584,7 +591,7 @@ func (m *Model) broadcastIndexLoop() { for _, node := range m.protoConn { node := node if m.trace["net"] { - log.Printf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx)) + log.Printf("DEBUG: NET IDX(out/loop): %s: %d files", node.ID(), len(idx)) } go func() { node.Index(idx) @@ -796,7 +803,7 @@ func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) return toAdd, toDelete } if m.trace["need"] { - log.Printf("NEED: lf:%v gf:%v", lf, gf) + log.Printf("DEBUG: NEED: lf:%v gf:%v", lf, gf) } if gf.Flags&protocol.FlagDeleted != 0 { @@ -838,7 +845,7 @@ func (m *Model) WhoHas(name string) []string { func (m *Model) deleteLoop() { for file := range m.dq { if m.trace["file"] { - log.Println("FILE: Delete", file.Name) + log.Println("DEBUG: FILE: Delete", file.Name) } path := path.Clean(path.Join(m.dir, file.Name)) err := os.Remove(path) diff --git a/protocol/protocol.go b/protocol/protocol.go index 717332e0..77d97f5d 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -27,6 +27,10 @@ const ( FlagInvalid = 1 << 13 ) +var ( + ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash") +) + type FileInfo struct { Name string Flags uint32 @@ -64,7 +68,8 @@ type Connection struct { awaiting map[int]chan asyncResult nextId int indexSent map[string][2]int64 - options map[string]string + peerOptions map[string]string + myOptions map[string]string optionsLock sync.Mutex hasSentIndex bool @@ -106,6 +111,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M go c.pingerLoop() if options != nil { + c.myOptions = options go func() { c.Lock() c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions}) @@ -348,9 +354,14 @@ loop: case messageTypeOptions: c.optionsLock.Lock() - c.options = c.mreader.readOptions() + c.peerOptions = c.mreader.readOptions() c.optionsLock.Unlock() + if mh, rh := c.myOptions["clusterHash"], c.peerOptions["clusterHash"]; len(mh) > 0 && len(rh) > 0 && mh != rh { + c.close(ErrClusterHash) + break loop + } + default: c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) break loop @@ -423,5 +434,5 @@ func (c *Connection) Statistics() Statistics { func (c *Connection) Option(key string) string { c.optionsLock.Lock() defer c.optionsLock.Unlock() - return c.options[key] + return c.peerOptions[key] }