From ea5ef28c5af6ba47bd0bf9c2d10b1547ee503f15 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 23 Jan 2014 22:20:15 +0100 Subject: [PATCH] Performance: improve need computation --- model/model.go | 159 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 107 insertions(+), 52 deletions(-) diff --git a/model/model.go b/model/model.go index 6493aa3f..1ee83731 100644 --- a/model/model.go +++ b/model/model.go @@ -259,6 +259,11 @@ func (m *Model) NeedFiles() (files []File, bytes int) { // Index is called when a new node is connected and we receive their full index. // Implements the protocol.Model interface. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { + var files = make([]File, len(fs)) + for i := range fs { + files[i] = fileFromFileInfo(fs[i]) + } + m.imut.Lock() defer m.imut.Unlock() @@ -267,7 +272,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { } repo := make(map[string]File) - for _, f := range fs { + for _, f := range files { m.indexUpdate(repo, f) } @@ -276,17 +281,22 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { m.rmut.Unlock() m.recomputeGlobal() - m.recomputeNeed() + m.recomputeNeedForFiles(files) } // IndexUpdate is called for incremental updates to connected nodes' indexes. // Implements the protocol.Model interface. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { + var files = make([]File, len(fs)) + for i := range fs { + files[i] = fileFromFileInfo(fs[i]) + } + m.imut.Lock() defer m.imut.Unlock() if m.trace["net"] { - log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs)) + log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(files)) } m.rmut.Lock() @@ -297,16 +307,16 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { return } - for _, f := range fs { + for _, f := range files { m.indexUpdate(repo, f) } m.rmut.Unlock() m.recomputeGlobal() - m.recomputeNeed() + m.recomputeNeedForFiles(files) } -func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) { +func (m *Model) indexUpdate(repo map[string]File, f File) { if m.trace["idx"] { var flagComment string if f.Flags&protocol.FlagDeleted != 0 { @@ -320,7 +330,7 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) { return } - repo[f.Name] = fileFromFileInfo(f) + repo[f.Name] = f } // Close removes the peer from the model and closes the underlying connection if possible. @@ -344,7 +354,7 @@ func (m *Model) Close(node string, err error) { m.pmut.Unlock() m.recomputeGlobal() - m.recomputeNeed() + m.recomputeNeedForGlobal() } // Request returns the specified data segment by reading it from local disk. @@ -422,7 +432,7 @@ func (m *Model) ReplaceLocal(fs []File) { m.lmut.Unlock() m.recomputeGlobal() - m.recomputeNeed() + m.recomputeNeedForGlobal() m.umut.Lock() m.updatedLocal = time.Now().Unix() @@ -443,7 +453,7 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) { m.lmut.Unlock() m.recomputeGlobal() - m.recomputeNeed() + m.recomputeNeedForGlobal() } // ConnectedTo returns true if we are connected to the named node. @@ -644,6 +654,25 @@ func (m *Model) updateLocal(f File) { } } +/* +XXX: Not done, needs elegant handling of availability + +func (m *Model) recomputeGlobalFor(files []File) bool { + m.gmut.Lock() + defer m.gmut.Unlock() + + var updated bool + for _, f := range files { + if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) { + m.global[f.Name] = f + updated = true + // Fix availability + } + } + return updated +} +*/ + func (m *Model) recomputeGlobal() { var newGlobal = make(map[string]File) @@ -702,54 +731,20 @@ func (m *Model) recomputeGlobal() { } } -func (m *Model) recomputeNeed() { - type addOrder struct { - n string - remote []Block - fm *fileMonitor - } +type addOrder struct { + n string + remote []Block + fm *fileMonitor +} +func (m *Model) recomputeNeedForGlobal() { var toDelete []File var toAdd []addOrder m.gmut.RLock() - for n, gf := range m.global { - m.lmut.RLock() - lf, ok := m.local[n] - m.lmut.RUnlock() - - if !ok || gf.NewerThan(lf) { - if gf.Flags&protocol.FlagInvalid != 0 { - // Never attempt to sync invalid files - continue - } - if gf.Flags&protocol.FlagDeleted != 0 && !m.delete { - // Don't want to delete files, so forget this need - continue - } - if gf.Flags&protocol.FlagDeleted != 0 && !ok { - // Don't have the file, so don't need to delete it - continue - } - if m.trace["need"] { - log.Printf("NEED: lf:%v gf:%v", lf, gf) - } - - if gf.Flags&protocol.FlagDeleted != 0 { - toDelete = append(toDelete, gf) - } else { - local, remote := BlockDiff(lf.Blocks, gf.Blocks) - fm := fileMonitor{ - name: n, - path: path.Clean(path.Join(m.dir, n)), - global: gf, - model: m, - localBlocks: local, - } - toAdd = append(toAdd, addOrder{n, remote, &fm}) - } - } + for _, gf := range m.global { + toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete) } m.gmut.RUnlock() @@ -762,6 +757,66 @@ func (m *Model) recomputeNeed() { } } +func (m *Model) recomputeNeedForFiles(files []File) { + var toDelete []File + var toAdd []addOrder + + m.gmut.RLock() + + for _, gf := range files { + toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete) + } + + m.gmut.RUnlock() + + for _, ao := range toAdd { + m.fq.Add(ao.n, ao.remote, ao.fm) + } + for _, gf := range toDelete { + m.dq <- gf + } +} + +func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) { + m.lmut.RLock() + lf, ok := m.local[gf.Name] + m.lmut.RUnlock() + + if !ok || gf.NewerThan(lf) { + if gf.Flags&protocol.FlagInvalid != 0 { + // Never attempt to sync invalid files + return toAdd, toDelete + } + if gf.Flags&protocol.FlagDeleted != 0 && !m.delete { + // Don't want to delete files, so forget this need + return toAdd, toDelete + } + if gf.Flags&protocol.FlagDeleted != 0 && !ok { + // Don't have the file, so don't need to delete it + return toAdd, toDelete + } + if m.trace["need"] { + log.Printf("NEED: lf:%v gf:%v", lf, gf) + } + + if gf.Flags&protocol.FlagDeleted != 0 { + toDelete = append(toDelete, gf) + } else { + local, remote := BlockDiff(lf.Blocks, gf.Blocks) + fm := fileMonitor{ + name: gf.Name, + path: path.Clean(path.Join(m.dir, gf.Name)), + global: gf, + model: m, + localBlocks: local, + } + toAdd = append(toAdd, addOrder{gf.Name, remote, &fm}) + } + } + + return toAdd, toDelete +} + func (m *Model) WhoHas(name string) []string { var remote []string