diff --git a/main.go b/main.go index d211ae00..a2e1ec03 100644 --- a/main.go +++ b/main.go @@ -286,8 +286,8 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, } nc := protocol.NewConnection(nodeID, conn, conn, m) - okln("Connected to node", remoteID, "(out)") m.AddNode(nc) + okln("Connected to node", remoteID, "(out)") continue nextNode } } diff --git a/model.go b/model.go index df699692..665a3cbe 100644 --- a/model.go +++ b/model.go @@ -25,13 +25,16 @@ import ( type Model struct { sync.RWMutex - dir string - updated int64 - global map[string]File // the latest version of each file as it exists in the cluster - local map[string]File // the files we currently have locally on disk - remote map[string]map[string]File - need map[string]bool // the files we need to update - nodes map[string]*protocol.Connection + dir string + + global map[string]File // the latest version of each file as it exists in the cluster + local map[string]File // the files we currently have locally on disk + remote map[string]map[string]File + need map[string]bool // the files we need to update + nodes map[string]*protocol.Connection + + updatedLocal int64 // timestamp of last update to local + updateGlobal int64 // timestamp of last update to remote lastIdxBcast time.Time lastIdxBcastRequest time.Time @@ -59,7 +62,7 @@ func NewModel(dir string) *Model { lastIdxBcast: time.Now(), } - go m.printStats() + go m.printStatsLoop() go m.broadcastIndexLoop() return m } @@ -68,22 +71,53 @@ func (m *Model) Start() { go m.puller() } -func (m *Model) printStats() { +func (m *Model) printStatsLoop() { + var lastUpdated int64 for { time.Sleep(60 * time.Second) m.RLock() - for node, conn := range m.nodes { - stats := conn.Statistics() - if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 { - infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000) - } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 { - infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec)) - } + m.printConnectionStats() + if m.updatedLocal+m.updateGlobal > lastUpdated { + m.printModelStats() + lastUpdated = m.updatedLocal + m.updateGlobal } m.RUnlock() } } +func (m *Model) printConnectionStats() { + for node, conn := range m.nodes { + stats := conn.Statistics() + if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 { + infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000) + } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 { + infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec)) + } + } +} + +func (m *Model) printModelStats() { + var tot int + for _, f := range m.global { + tot += f.Size() + } + infof("%6d files, %8sB in cluster", len(m.global), toSI(tot)) + + if len(m.need) > 0 { + tot = 0 + for _, f := range m.local { + tot += f.Size() + } + infof("%6d files, %8sB in local repo", len(m.local), toSI(tot)) + + tot = 0 + for n := range m.need { + tot += m.global[n].Size() + } + infof("%6d files, %8sB to synchronize", len(m.need), toSI(tot)) + } +} + func toSI(n int) string { if n > 1<<30 { return fmt.Sprintf("%.02f G", float64(n)/(1<<30)) @@ -97,6 +131,7 @@ func toSI(n int) string { return fmt.Sprintf("%d ", n) } +// Index is called when a new node is connected and we receive their full index. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { m.Lock() defer m.Unlock() @@ -116,8 +151,10 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { m.recomputeGlobal() m.recomputeNeed() + m.printModelStats() } +// IndexUpdate is called for incremental updates to connected nodes' indexes. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { m.Lock() defer m.Unlock() @@ -143,6 +180,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { m.recomputeNeed() } +// SeedIndex is called when our previously cached index is loaded from disk at startup. func (m *Model) SeedIndex(fs []protocol.FileInfo) { m.Lock() defer m.Unlock() @@ -154,6 +192,7 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) { m.recomputeGlobal() m.recomputeNeed() + m.printModelStats() } func (m *Model) Close(node string) { @@ -232,7 +271,7 @@ func (m *Model) ReplaceLocal(fs []File) { m.local = newLocal m.recomputeGlobal() m.recomputeNeed() - m.updated = time.Now().Unix() + m.updatedLocal = time.Now().Unix() m.lastIdxBcastRequest = time.Now() } } @@ -254,7 +293,7 @@ func (m *Model) broadcastIndexLoop() { for _, node := range m.nodes { node := node if opts.Debug.TraceNet { - debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) + debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx)) } go func() { node.Index(idx) @@ -300,7 +339,7 @@ func (m *Model) UpdateLocal(f File) { m.local[f.Name] = f m.recomputeGlobal() m.recomputeNeed() - m.updated = time.Now().Unix() + m.updatedLocal = time.Now().Unix() m.lastIdxBcastRequest = time.Now() } } @@ -351,7 +390,24 @@ func (m *Model) recomputeGlobal() { } } - m.global = newGlobal + // Figure out if anything actually changed + + var updated bool + if len(newGlobal) != len(m.global) { + updated = true + } else { + for n, f0 := range newGlobal { + if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified { + updated = true + break + } + } + } + + if updated { + m.updateGlobal = time.Now().Unix() + m.global = newGlobal + } } // Must be called with the write lock held. @@ -418,7 +474,7 @@ func (m *Model) AddNode(node *protocol.Connection) { m.RUnlock() if opts.Debug.TraceNet { - debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) + debugf("NET IDX(out/add): %s: %d files", node.ID, len(idx)) } // Sending the index might take a while if we have many files and a slow diff --git a/walk.go b/walk.go index c7594f81..7c8f127d 100644 --- a/walk.go +++ b/walk.go @@ -25,6 +25,13 @@ func (f File) Dump() { fmt.Println() } +func (f File) Size() (bytes int) { + for _, b := range f.Blocks { + bytes += int(b.Length) + } + return +} + func isTempName(name string) bool { return strings.HasPrefix(path.Base(name), ".syncthing.") }