From 7943902d73cb9d01f941fd9c0d480d82ac825a3c Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 15 Jul 2014 17:54:00 +0200 Subject: [PATCH] Handle needed files in batches --- files/leveldb.go | 2 +- model/model.go | 73 +++++++++++++++++++++++++++++++----------------- model/puller.go | 11 +++++--- 3 files changed, 55 insertions(+), 31 deletions(-) diff --git a/files/leveldb.go b/files/leveldb.go index ce4b0e40..6b465e19 100644 --- a/files/leveldb.go +++ b/files/leveldb.go @@ -207,8 +207,8 @@ func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint6 if debug { l.Debugf("delete; repo=%q node=%x name=%q", repo, node, name) } - batch.Delete(dbi.Key()) ldbRemoveFromGlobal(db, batch, repo, node, name) + batch.Delete(dbi.Key()) return 0 }) } diff --git a/model/model.go b/model/model.go index 28502c46..550589a9 100644 --- a/model/model.go +++ b/model/model.go @@ -278,8 +278,17 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) { // NeedSize returns the number and total size of currently needed files. func (m *Model) NeedSize(repo string) (files int, bytes int64) { - f, d, b := sizeOf(m.NeedFilesRepo(repo)) - return f + d, b + m.rmut.RLock() + defer m.rmut.RUnlock() + if rf, ok := m.repoFiles[repo]; ok { + rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + fs, de, by := sizeOfFile(f) + files += fs + de + bytes += by + return true + }) + } + return } // NeedFiles returns the list of currently needed files @@ -287,10 +296,10 @@ func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo { m.rmut.RLock() defer m.rmut.RUnlock() if rf, ok := m.repoFiles[repo]; ok { - var fs []protocol.FileInfo + fs := make([]protocol.FileInfo, 0, indexBatchSize) rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool { fs = append(fs, f) - return true + return len(fs) < indexBatchSize }) if r := m.repoCfgs[repo].FileRanker(); r != nil { files.SortBy(r).Sort(fs) @@ -721,9 +730,10 @@ func (m *Model) ScanRepo(repo string) error { if err != nil { return err } - batch := make([]protocol.FileInfo, 0, indexBatchSize) + batchSize := 100 + batch := make([]protocol.FileInfo, 0, 00) for f := range fchan { - if len(batch) == indexBatchSize { + if len(batch) == batchSize { fs.Update(protocol.LocalNodeID, batch) batch = batch[:0] } @@ -736,7 +746,7 @@ func (m *Model) ScanRepo(repo string) error { batch = batch[:0] fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { if !protocol.IsDeleted(f.Flags) { - if len(batch) == indexBatchSize { + if len(batch) == batchSize { fs.Update(protocol.LocalNodeID, batch) batch = batch[:0] } @@ -810,41 +820,52 @@ func (m *Model) State(repo string) string { } func (m *Model) Override(repo string) { - fs := m.NeedFilesRepo(repo) - m.rmut.RLock() - r := m.repoFiles[repo] + fs := m.repoFiles[repo] m.rmut.RUnlock() - for i := range fs { - f := &fs[i] - h := r.Get(protocol.LocalNodeID, f.Name) - if h.Name != f.Name { + batch := make([]protocol.FileInfo, 0, indexBatchSize) + fs.WithNeed(protocol.LocalNodeID, func(need protocol.FileInfo) bool { + if len(batch) == indexBatchSize { + fs.Update(protocol.LocalNodeID, batch) + batch = batch[:0] + } + + have := fs.Get(protocol.LocalNodeID, need.Name) + if have.Name != need.Name { // We are missing the file - f.Flags |= protocol.FlagDeleted - f.Blocks = nil + need.Flags |= protocol.FlagDeleted + need.Blocks = nil } else { // We have the file, replace with our version - *f = h + need = have } - f.Version = lamport.Default.Tick(f.Version) - f.LocalVersion = 0 + need.Version = lamport.Default.Tick(need.Version) + need.LocalVersion = 0 + batch = append(batch, need) + return true + }) + if len(batch) > 0 { + fs.Update(protocol.LocalNodeID, batch) } - - r.Update(protocol.LocalNodeID, fs) } // Version returns the change version for the given repository. This is // guaranteed to increment if the contents of the local or global repository // has changed. func (m *Model) LocalVersion(repo string) uint64 { - var ver uint64 - m.rmut.Lock() - for _, n := range m.repoNodes[repo] { - ver += m.repoFiles[repo].LocalVersion(n) + defer m.rmut.Unlock() + + fs, ok := m.repoFiles[repo] + if !ok { + return 0 + } + + ver := fs.LocalVersion(protocol.LocalNodeID) + for _, n := range m.repoNodes[repo] { + ver += fs.LocalVersion(n) } - m.rmut.Unlock() return ver } diff --git a/model/puller.go b/model/puller.go index 8cdfb25d..8f6d23d4 100644 --- a/model/puller.go +++ b/model/puller.go @@ -195,8 +195,10 @@ func (p *puller) run() { if v := p.model.LocalVersion(p.repoCfg.ID); v > prevVer { // Queue more blocks to fetch, if any - p.queueNeededBlocks() - prevVer = v + if p.queueNeededBlocks() == 0 { + // We've fetched all blocks we need + prevVer = v + } } } } @@ -618,7 +620,7 @@ func (p *puller) handleEmptyBlock(b bqBlock) { delete(p.openFiles, f.Name) } -func (p *puller) queueNeededBlocks() { +func (p *puller) queueNeededBlocks() int { queued := 0 for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) { lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name) @@ -634,8 +636,9 @@ func (p *puller) queueNeededBlocks() { }) } if debug && queued > 0 { - l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued) + l.Debugf("%q: queued %d items", p.repoCfg.ID, queued) } + return queued } func (p *puller) closeFile(f protocol.FileInfo) {