From dba40eefb1900db524cf3bb23b1621d9fddcb6c0 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 19 May 2014 22:31:28 +0200 Subject: [PATCH] Fix handling of changed/deleted directories (fixes #231) --- cmd/stpidx/main.go | 4 +- cmd/syncthing/gui.go | 11 ++++++ files/set.go | 8 +--- integration/.gitignore | 1 + integration/folders.sh | 11 ++++-- model/blockqueue.go | 19 +++++++--- model/model.go | 28 +++++++++++--- model/puller.go | 86 +++++++++++++++++++++++++++++++----------- 8 files changed, 123 insertions(+), 45 deletions(-) diff --git a/cmd/stpidx/main.go b/cmd/stpidx/main.go index b9907d64..132a894a 100644 --- a/cmd/stpidx/main.go +++ b/cmd/stpidx/main.go @@ -41,8 +41,8 @@ func main() { inv := file.Flags&protocol.FlagInvalid != 0 dir := file.Flags&protocol.FlagDirectory != 0 prm := file.Flags & 0777 - log.Printf("File: %q, Del: %v, Inv: %v, Dir: %v, Perm: 0%03o, Modified: %d, Blocks: %d", - file.Name, del, inv, dir, prm, file.Modified, len(file.Blocks)) + log.Printf("File: %q, Ver:%d, Del: %v, Inv: %v, Dir: %v, Perm: 0%03o, Modified: %d, Blocks: %d", + file.Name, file.Version, del, inv, dir, prm, file.Modified, len(file.Blocks)) if *showBlocks { for _, block := range file.Blocks { log.Printf(" Size: %6d, Hash: %x", block.Size, block.Hash) diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 90160f1e..e0950002 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -51,6 +51,7 @@ func startGUI(cfg config.GUIConfiguration, m *model.Model) error { router.Get("/", getRoot) router.Get("/rest/version", restGetVersion) router.Get("/rest/model", restGetModel) + router.Get("/rest/need", restGetNeed) router.Get("/rest/connections", restGetConnections) router.Get("/rest/config", restGetConfig) router.Get("/rest/config/sync", restGetConfigInSync) @@ -125,6 +126,16 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(res) } +func restGetNeed(m *model.Model, w http.ResponseWriter, r *http.Request) { + var qs = r.URL.Query() + var repo = qs.Get("repo") + + files := m.NeedFilesRepo(repo) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(files) +} + func restGetConnections(m *model.Model, w http.ResponseWriter) { var res = m.ConnectionStats() w.Header().Set("Content-Type", "application/json") diff --git a/files/set.go b/files/set.go index 5c70d79c..20925572 100644 --- a/files/set.go +++ b/files/set.go @@ -116,12 +116,8 @@ func (m *Set) Need(id uint) []scanner.File { continue } - file := gf.File - switch { - case file.Flags&protocol.FlagDirectory == 0 && gk.newerThan(rkID[gk.Name]): - fs = append(fs, file) - case file.Flags&(protocol.FlagDirectory|protocol.FlagDeleted) == protocol.FlagDirectory && gk.newerThan(rkID[gk.Name]): - fs = append(fs, file) + if gk.newerThan(rkID[gk.Name]) { + fs = append(fs, gf.File) } } m.Unlock() diff --git a/integration/.gitignore b/integration/.gitignore index a6937582..4bf84b0c 100644 --- a/integration/.gitignore +++ b/integration/.gitignore @@ -11,3 +11,4 @@ md5r json *.idx.gz dirs-* +*.out diff --git a/integration/folders.sh b/integration/folders.sh index 92bbb934..2608a133 100755 --- a/integration/folders.sh +++ b/integration/folders.sh @@ -4,15 +4,13 @@ iterations=${1:-5} id1=I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA id2=JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ -id3=373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA go build json.go start() { echo "Starting..." - for i in 1 2 ; do - STTRACE=linenumbers STPROFILER=":909$i" syncthing -home "f$i" & - done + STTRACE=model,scanner STPROFILER=":9091" syncthing -home "f1" > 1.out 2>&1 & + STTRACE=model,scanner STPROFILER=":9092" syncthing -home "f2" > 2.out 2>&1 & } stop() { @@ -44,6 +42,11 @@ testConvergence() { tot=$(($s1comp + $s2comp)) echo $tot / 200 if [[ $tot == 200 ]] ; then + # when fixing up directories, a node will announce completion + # slightly before it's actually complete. this is arguably a bug, + # but we let it slide for the moment as long as it gets there + # eventually. + sleep 5 break fi done diff --git a/model/blockqueue.go b/model/blockqueue.go index 8b3b4d66..e8848238 100644 --- a/model/blockqueue.go +++ b/model/blockqueue.go @@ -1,7 +1,7 @@ package model import ( - "sync/atomic" + "sync" "github.com/calmh/syncthing/scanner" ) @@ -24,7 +24,8 @@ type blockQueue struct { outbox chan bqBlock queued []bqBlock - qlen uint32 + + mut sync.Mutex } func newBlockQueue() *blockQueue { @@ -37,6 +38,9 @@ func newBlockQueue() *blockQueue { } func (q *blockQueue) addBlock(a bqAdd) { + q.mut.Lock() + defer q.mut.Unlock() + // If we already have it queued, return for _, b := range q.queued { if b.file.Name == a.file.Name { @@ -74,15 +78,18 @@ func (q *blockQueue) run() { if len(q.queued) == 0 { q.addBlock(<-q.inbox) } else { + q.mut.Lock() next := q.queued[0] + q.mut.Unlock() select { case a := <-q.inbox: q.addBlock(a) case q.outbox <- next: + q.mut.Lock() q.queued = q.queued[1:] + q.mut.Unlock() } } - atomic.StoreUint32(&q.qlen, uint32(len(q.queued))) } } @@ -95,7 +102,7 @@ func (q *blockQueue) get() bqBlock { } func (q *blockQueue) empty() bool { - var l uint32 - atomic.LoadUint32(&l) - return l == 0 + q.mut.Lock() + defer q.mut.Unlock() + return len(q.queued) == 0 } diff --git a/model/model.go b/model/model.go index 3e0096e8..4ff8c7b7 100644 --- a/model/model.go +++ b/model/model.go @@ -221,7 +221,7 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) { return 0, 0, 0 } -// NeedFiles returns the list of currently needed files and the total size. +// NeedSize returns the number and total size of currently needed files. func (m *Model) NeedSize(repo string) (files int, bytes int64) { f, d, b := sizeOf(m.NeedFilesRepo(repo)) return f + d, b @@ -241,13 +241,21 @@ func (m *Model) NeedFilesRepo(repo string) []scanner.File { // Implements the protocol.Model interface. func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) { if debug { - l.Debugf("IDX(in): %s / %q: %d files", nodeID, repo, len(fs)) + l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs)) } var files = make([]scanner.File, len(fs)) for i := range fs { - lamport.Default.Tick(fs[i].Version) - files[i] = fileFromFileInfo(fs[i]) + f := fs[i] + lamport.Default.Tick(f.Version) + if debug { + var flagComment string + if f.Flags&protocol.FlagDeleted != 0 { + flagComment = " (deleted)" + } + l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks)) + } + files[i] = fileFromFileInfo(f) } id := m.cm.Get(nodeID) @@ -269,8 +277,16 @@ func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) var files = make([]scanner.File, len(fs)) for i := range fs { - lamport.Default.Tick(fs[i].Version) - files[i] = fileFromFileInfo(fs[i]) + f := fs[i] + lamport.Default.Tick(f.Version) + if debug { + var flagComment string + if f.Flags&protocol.FlagDeleted != 0 { + flagComment = " (deleted)" + } + l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks)) + } + files[i] = fileFromFileInfo(f) } id := m.cm.Get(nodeID) diff --git a/model/puller.go b/model/puller.go index fa593a1c..9636f1f4 100644 --- a/model/puller.go +++ b/model/puller.go @@ -207,7 +207,9 @@ func (p *puller) runRO() { func (p *puller) fixupDirectories() { var deleteDirs []string - filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error { + var changed = 0 + + var walkFn = func(path string, info os.FileInfo, err error) error { if !info.IsDir() { return nil } @@ -221,9 +223,12 @@ func (p *puller) fixupDirectories() { return nil } - cur := p.model.CurrentGlobalFile(p.repo, rn) + cur := p.model.CurrentRepoFile(p.repo, rn) if cur.Name != rn { // No matching dir in current list; weird + if debug { + l.Debugf("missing dir: %s; %v", rn, cur) + } return nil } @@ -241,31 +246,59 @@ func (p *puller) fixupDirectories() { } if cur.Flags&uint32(os.ModePerm) != uint32(info.Mode()&os.ModePerm) { - os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm) - if debug { - l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur) + err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm) + if err != nil { + l.Warnln("Restoring folder flags: %q: %v", path, err) + } else { + changed++ + if debug { + l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur) + } } } if cur.Modified != info.ModTime().Unix() { t := time.Unix(cur.Modified, 0) - os.Chtimes(path, t, t) - if debug { - l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur) + err := os.Chtimes(path, t, t) + if err != nil { + l.Warnln("Restoring folder modtime: %q: %v", path, err) + } else { + changed++ + if debug { + l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur) + } } } return nil - }) + } - // Delete any queued directories - for i := len(deleteDirs) - 1; i >= 0; i-- { - if debug { - l.Debugln("delete dir:", deleteDirs[i]) + for { + deleteDirs = nil + changed = 0 + filepath.Walk(p.dir, walkFn) + + var deleted = 0 + // Delete any queued directories + for i := len(deleteDirs) - 1; i >= 0; i-- { + dir := deleteDirs[i] + if debug { + l.Debugln("delete dir:", dir) + } + err := os.Remove(dir) + if err != nil { + l.Warnln(err) + } else { + deleted++ + } } - err := os.Remove(deleteDirs[i]) - if err != nil { - l.Warnln(err) + + if debug { + l.Debugf("changed %d, deleted %d dirs", changed, deleted) + } + + if changed+deleted == 0 { + return } } } @@ -301,12 +334,23 @@ func (p *puller) handleRequestResult(res requestResult) { func (p *puller) handleBlock(b bqBlock) bool { f := b.file - // For directories, simply making sure they exist is enough + // For directories, making sure they exist is enough. + // Deleted directories we mark as handled and delete later. if f.Flags&protocol.FlagDirectory != 0 { - path := filepath.Join(p.dir, f.Name) - _, err := os.Stat(path) - if err != nil && os.IsNotExist(err) { - os.MkdirAll(path, 0777) + if f.Flags&protocol.FlagDeleted == 0 { + path := filepath.Join(p.dir, f.Name) + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + if debug { + l.Debugf("create dir: %v", f) + } + err = os.MkdirAll(path, 0777) + if err != nil { + l.Warnf("Create folder: %q: %v", path, err) + } + } + } else if debug { + l.Debugf("ignore delete dir: %v", f) } p.model.updateLocal(p.repo, f) return true