From 435f9113e8bde6f5fb1a3b8c2ff079a620cdb708 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Mon, 6 Oct 2014 21:57:33 +0100 Subject: [PATCH 1/7] Implement BlockMap --- internal/files/blockmap.go | 201 +++++++++++++++++++++++++++ internal/files/blockmap_test.go | 235 ++++++++++++++++++++++++++++++++ internal/files/leveldb.go | 1 + internal/protocol/message.go | 4 + 4 files changed, 441 insertions(+) create mode 100644 internal/files/blockmap.go create mode 100644 internal/files/blockmap_test.go diff --git a/internal/files/blockmap.go b/internal/files/blockmap.go new file mode 100644 index 00000000..c24f1f03 --- /dev/null +++ b/internal/files/blockmap.go @@ -0,0 +1,201 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program. If not, see . + +// Package files provides a set type to track local/remote files with newness +// checks. We must do a certain amount of normalization in here. We will get +// fed paths with either native or wire-format separators and encodings +// depending on who calls us. We transform paths to wire-format (NFC and +// slashes) on the way to the database, and transform to native format +// (varying separator and encoding) on the way back out. + +package files + +import ( + "bytes" + "encoding/binary" + "sort" + "sync" + + "github.com/syncthing/syncthing/internal/config" + "github.com/syncthing/syncthing/internal/protocol" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var blockFinder *BlockFinder + +type BlockMap struct { + db *leveldb.DB + folder string +} + +func NewBlockMap(db *leveldb.DB, folder string) *BlockMap { + return &BlockMap{ + db: db, + folder: folder, + } +} + +// Add files to the block map, ignoring any deleted or invalid files. +func (m *BlockMap) Add(files []protocol.FileInfo) error { + batch := new(leveldb.Batch) + buf := make([]byte, 4) + for _, file := range files { + if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() { + continue + } + + for i, block := range file.Blocks { + binary.BigEndian.PutUint32(buf, uint32(i)) + batch.Put(m.blockKey(block.Hash, file.Name), buf) + } + } + return m.db.Write(batch, nil) +} + +// Update block map state, removing any deleted or invalid files. +func (m *BlockMap) Update(files []protocol.FileInfo) error { + batch := new(leveldb.Batch) + buf := make([]byte, 4) + for _, file := range files { + if file.IsDirectory() { + continue + } + + if file.IsDeleted() || file.IsInvalid() { + for _, block := range file.Blocks { + batch.Delete(m.blockKey(block.Hash, file.Name)) + } + continue + } + + for i, block := range file.Blocks { + binary.BigEndian.PutUint32(buf, uint32(i)) + batch.Put(m.blockKey(block.Hash, file.Name), buf) + } + } + return m.db.Write(batch, nil) +} + +// Drop block map, removing all entries related to this block map from the db. +func (m *BlockMap) Drop() error { + batch := new(leveldb.Batch) + iter := m.db.NewIterator(util.BytesPrefix(m.blockKey(nil, "")[:1+64]), nil) + defer iter.Release() + for iter.Next() { + batch.Delete(iter.Key()) + } + if iter.Error() != nil { + return iter.Error() + } + return m.db.Write(batch, nil) +} + +func (m *BlockMap) blockKey(hash []byte, file string) []byte { + return toBlockKey(hash, m.folder, file) +} + +type BlockFinder struct { + db *leveldb.DB + folders []string + mut sync.RWMutex +} + +func NewBlockFinder(db *leveldb.DB, cfg *config.ConfigWrapper) *BlockFinder { + if blockFinder != nil { + return blockFinder + } + + f := &BlockFinder{ + db: db, + } + f.Changed(cfg.Raw()) + cfg.Subscribe(f) + return f +} + +// Implements config.Handler interface +func (f *BlockFinder) Changed(cfg config.Configuration) error { + folders := make([]string, len(cfg.Folders)) + for i, folder := range cfg.Folders { + folders[i] = folder.ID + } + + sort.Strings(folders) + + f.mut.Lock() + f.folders = folders + f.mut.Unlock() + + return nil +} + +// An iterator function which iterates over all matching blocks for the given +// hash. The iterator function has to return either true (if they are happy with +// the block) or false to continue iterating for whatever reason. +// The iterator finally returns the result, whether or not a satisfying block +// was eventually found. +func (f *BlockFinder) Iterate(hash []byte, iterFn func(string, string, uint32) bool) bool { + f.mut.RLock() + folders := f.folders + f.mut.RUnlock() + for _, folder := range folders { + key := toBlockKey(hash, folder, "") + iter := f.db.NewIterator(util.BytesPrefix(key), nil) + defer iter.Release() + + for iter.Next() && iter.Error() == nil { + folder, file := fromBlockKey(iter.Key()) + index := binary.BigEndian.Uint32(iter.Value()) + if iterFn(folder, nativeFilename(file), index) { + return true + } + } + } + return false +} + +// m.blockKey returns a byte slice encoding the following information: +// keyTypeBlock (1 byte) +// folder (64 bytes) +// block hash (64 bytes) +// file name (variable size) +func toBlockKey(hash []byte, folder, file string) []byte { + o := make([]byte, 1+64+64+len(file)) + o[0] = keyTypeBlock + copy(o[1:], []byte(folder)) + copy(o[1+64:], []byte(hash)) + copy(o[1+64+64:], []byte(file)) + return o +} + +func fromBlockKey(data []byte) (string, string) { + if len(data) < 1+64+64+1 { + panic("Incorrect key length") + } + if data[0] != keyTypeBlock { + panic("Incorrect key type") + } + + file := string(data[1+64+64:]) + + slice := data[1 : 1+64] + izero := bytes.IndexByte(slice, 0) + if izero > -1 { + return string(slice[:izero]), file + } + return string(slice), file +} diff --git a/internal/files/blockmap_test.go b/internal/files/blockmap_test.go new file mode 100644 index 00000000..ea195261 --- /dev/null +++ b/internal/files/blockmap_test.go @@ -0,0 +1,235 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program. If not, see . + +package files + +import ( + "testing" + + "github.com/syncthing/syncthing/internal/config" + "github.com/syncthing/syncthing/internal/protocol" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +func genBlocks(n int) []protocol.BlockInfo { + b := make([]protocol.BlockInfo, n) + for i := range b { + h := make([]byte, 32) + for j := range h { + h[j] = byte(i + j) + } + b[i].Size = uint32(i) + b[i].Hash = h + } + return b +} + +var f1, f2, f3 protocol.FileInfo + +func init() { + blocks := genBlocks(30) + + f1 = protocol.FileInfo{ + Name: "f1", + Blocks: blocks[:10], + } + + f2 = protocol.FileInfo{ + Name: "f2", + Blocks: blocks[10:20], + } + + f3 = protocol.FileInfo{ + Name: "f3", + Blocks: blocks[20:], + } +} + +func setup() (*leveldb.DB, *BlockFinder) { + // Setup + + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + panic(err) + } + + wrapper := config.Wrap("", config.Configuration{}) + wrapper.SetFolder(config.FolderConfiguration{ + ID: "folder1", + }) + wrapper.SetFolder(config.FolderConfiguration{ + ID: "folder2", + }) + + return db, NewBlockFinder(db, wrapper) +} + +func dbEmpty(db *leveldb.DB) bool { + iter := db.NewIterator(nil, nil) + defer iter.Release() + if iter.Next() { + return false + } + return true +} + +func TestBlockMapAddUpdateWipe(t *testing.T) { + db, f := setup() + + if !dbEmpty(db) { + t.Fatal("db not empty") + } + + m := NewBlockMap(db, "folder1") + + f3.Flags |= protocol.FlagDirectory + + err := m.Add([]protocol.FileInfo{f1, f2, f3}) + if err != nil { + t.Fatal(err) + } + + f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool { + if folder != "folder1" || file != "f1" || index != 0 { + t.Fatal("Mismatch") + } + return true + }) + + f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool { + if folder != "folder1" || file != "f2" || index != 0 { + t.Fatal("Mismatch") + } + return true + }) + + f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool { + t.Fatal("Unexpected block") + return true + }) + + f3.Flags = f1.Flags + f1.Flags |= protocol.FlagDeleted + f2.Flags |= protocol.FlagInvalid + + // Should remove + err = m.Update([]protocol.FileInfo{f1, f2, f3}) + if err != nil { + t.Fatal(err) + } + + f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool { + t.Fatal("Unexpected block") + return false + }) + + f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool { + t.Fatal("Unexpected block") + return false + }) + + f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool { + if folder != "folder1" || file != "f3" || index != 0 { + t.Fatal("Mismatch") + } + return true + }) + + err = m.Drop() + if err != nil { + t.Fatal(err) + } + + if !dbEmpty(db) { + t.Fatal("db not empty") + } + + // Should not add + err = m.Add([]protocol.FileInfo{f1, f2}) + if err != nil { + t.Fatal(err) + } + + if !dbEmpty(db) { + t.Fatal("db not empty") + } + + f1.Flags = 0 + f2.Flags = 0 + f3.Flags = 0 +} + +func TestBlockMapFinderLookup(t *testing.T) { + db, f := setup() + + m1 := NewBlockMap(db, "folder1") + m2 := NewBlockMap(db, "folder2") + + err := m1.Add([]protocol.FileInfo{f1}) + if err != nil { + t.Fatal(err) + } + err = m2.Add([]protocol.FileInfo{f1}) + if err != nil { + t.Fatal(err) + } + + counter := 0 + f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool { + counter++ + switch counter { + case 1: + if folder != "folder1" || file != "f1" || index != 0 { + t.Fatal("Mismatch") + } + case 2: + if folder != "folder2" || file != "f1" || index != 0 { + t.Fatal("Mismatch") + } + default: + t.Fatal("Unexpected block") + } + return false + }) + if counter != 2 { + t.Fatal("Incorrect count", counter) + } + + f1.Flags |= protocol.FlagDeleted + + err = m1.Update([]protocol.FileInfo{f1}) + if err != nil { + t.Fatal(err) + } + + counter = 0 + f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool { + counter++ + switch counter { + case 1: + if folder != "folder2" || file != "f1" || index != 0 { + t.Fatal("Mismatch") + } + default: + t.Fatal("Unexpected block") + } + return false + }) + if counter != 1 { + t.Fatal("Incorrect count") + } +} diff --git a/internal/files/leveldb.go b/internal/files/leveldb.go index 0ac1b833..06c8bba6 100644 --- a/internal/files/leveldb.go +++ b/internal/files/leveldb.go @@ -49,6 +49,7 @@ func clock(v uint64) uint64 { const ( keyTypeDevice = iota keyTypeGlobal + keyTypeBlock ) type fileVersion struct { diff --git a/internal/protocol/message.go b/internal/protocol/message.go index 800787b9..b4714505 100644 --- a/internal/protocol/message.go +++ b/internal/protocol/message.go @@ -54,6 +54,10 @@ func (f FileInfo) IsInvalid() bool { return IsInvalid(f.Flags) } +func (f FileInfo) IsDirectory() bool { + return IsDirectory(f.Flags) +} + // Used for unmarshalling a FileInfo structure but skipping the actual block list type FileInfoTruncated struct { Name string // max:8192 From 0bc50f7284049b5704fa0b19383a2e171c5359e2 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Tue, 7 Oct 2014 22:15:01 +0100 Subject: [PATCH 2/7] Populate BlockMap --- internal/files/set.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/files/set.go b/internal/files/set.go index 11c28480..112c7386 100644 --- a/internal/files/set.go +++ b/internal/files/set.go @@ -42,6 +42,7 @@ type Set struct { mutex sync.Mutex folder string db *leveldb.DB + blockmap *BlockMap } func NewSet(folder string, db *leveldb.DB) *Set { @@ -49,6 +50,7 @@ func NewSet(folder string, db *leveldb.DB) *Set { localVersion: make(map[protocol.DeviceID]uint64), folder: folder, db: db, + blockmap: NewBlockMap(db, folder), } var deviceID protocol.DeviceID @@ -80,6 +82,10 @@ func (s *Set) Replace(device protocol.DeviceID, fs []protocol.FileInfo) { // Reset the local version if all files were removed. s.localVersion[device] = 0 } + if device == protocol.LocalDeviceID { + s.blockmap.Drop() + s.blockmap.Add(fs) + } } func (s *Set) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo) { @@ -92,6 +98,10 @@ func (s *Set) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] { s.localVersion[device] = lv } + if device == protocol.LocalDeviceID { + s.blockmap.Drop() + s.blockmap.Add(fs) + } } func (s *Set) Update(device protocol.DeviceID, fs []protocol.FileInfo) { @@ -104,6 +114,9 @@ func (s *Set) Update(device protocol.DeviceID, fs []protocol.FileInfo) { if lv := ldbUpdate(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] { s.localVersion[device] = lv } + if device == protocol.LocalDeviceID { + s.blockmap.Update(fs) + } } func (s *Set) WithNeed(device protocol.DeviceID, fn fileIterator) { @@ -179,6 +192,11 @@ func ListFolders(db *leveldb.DB) []string { // database. func DropFolder(db *leveldb.DB, folder string) { ldbDropFolder(db, []byte(folder)) + bm := &BlockMap{ + db: db, + folder: folder, + } + bm.Drop() } func normalizeFilenames(fs []protocol.FileInfo) { From 1e15b1e0be66808e1f0691de574be0b6cf65de88 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Wed, 8 Oct 2014 23:41:23 +0100 Subject: [PATCH 3/7] Implement block fetcher (fixes #781, fixes #3) --- internal/model/model.go | 6 +- internal/model/puller.go | 177 +++++++++++++++------------- internal/model/sharedpullerstate.go | 13 +- 3 files changed, 108 insertions(+), 88 deletions(-) diff --git a/internal/model/model.go b/internal/model/model.go index 527f0a55..a9cd6b15 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -81,8 +81,9 @@ type service interface { } type Model struct { - cfg *config.ConfigWrapper - db *leveldb.DB + cfg *config.ConfigWrapper + db *leveldb.DB + finder *files.BlockFinder deviceName string clientName string @@ -137,6 +138,7 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s protoConn: make(map[protocol.DeviceID]protocol.Connection), rawConn: make(map[protocol.DeviceID]io.Closer), deviceVer: make(map[protocol.DeviceID]string), + finder: files.NewBlockFinder(db, cfg), } var timeout = 20 * 60 // seconds diff --git a/internal/model/puller.go b/internal/model/puller.go index dc216093..0ebe5882 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -16,6 +16,7 @@ package model import ( + "bytes" "errors" "fmt" "os" @@ -23,6 +24,8 @@ import ( "sync" "time" + "github.com/AudriusButkevicius/lfu-go" + "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/osutil" @@ -50,7 +53,7 @@ type pullBlockState struct { } // A copyBlocksState is passed to copy routine if the file has blocks to be -// copied from the original. +// copied. type copyBlocksState struct { *sharedPullerState blocks []protocol.BlockInfo @@ -236,24 +239,25 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) - var wg sync.WaitGroup + var copyWg sync.WaitGroup + var pullWg sync.WaitGroup var doneWg sync.WaitGroup for i := 0; i < ncopiers; i++ { - wg.Add(1) + copyWg.Add(1) go func() { // copierRoutine finishes when copyChan is closed - p.copierRoutine(copyChan, finisherChan) - wg.Done() + p.copierRoutine(copyChan, pullChan, finisherChan) + copyWg.Done() }() } for i := 0; i < npullers; i++ { - wg.Add(1) + pullWg.Add(1) go func() { // pullerRoutine finishes when pullChan is closed p.pullerRoutine(pullChan, finisherChan) - wg.Done() + pullWg.Done() }() } @@ -310,7 +314,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { default: // A new or changed file. This is the only case where we do stuff // in the background; the other three are done synchronously. - p.handleFile(file, copyChan, pullChan, finisherChan) + p.handleFile(file, copyChan, finisherChan) } changed++ @@ -318,13 +322,13 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { }) // Signal copy and puller routines that we are done with the in data for - // this iteration + // this iteration. Wait for them to finish. close(copyChan) + copyWg.Wait() close(pullChan) + pullWg.Wait() - // Wait for them to finish, then signal the finisher chan that there will - // be no more input. - wg.Wait() + // Signal the finisher chan that there will be no more input. close(finisherChan) // Wait for the finisherChan to finish. @@ -419,11 +423,15 @@ func (p *Puller) deleteFile(file protocol.FileInfo) { // handleFile queues the copies and pulls as necessary for a single new or // changed file. -func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) { +func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) { curFile := p.model.CurrentFolderFile(p.folder, file.Name) - copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks) - if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 { + if len(curFile.Blocks) == len(file.Blocks) { + for i := range file.Blocks { + if !bytes.Equal(curFile.Blocks[i].Hash, file.Blocks[i].Hash) { + goto FilesAreDifferent + } + } // We are supposed to copy the entire file, and then fetch nothing. We // are only updating metadata, so we don't actually *need* to make the // copy. @@ -434,11 +442,14 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt return } +FilesAreDifferent: + // Figure out the absolute filenames we need once and for all tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) realName := filepath.Join(p.dir, file.Name) var reuse bool + var blocks []protocol.BlockInfo // Check for an old temporary file which might have some blocks we could // reuse. @@ -453,38 +464,26 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt existingBlocks[block.String()] = true } - // Since the blocks are already there, we don't need to copy them - // nor we need to pull them, hence discard blocks which are already - // there, if they are exactly the same... - var newCopyBlocks []protocol.BlockInfo - for _, block := range copyBlocks { + // Since the blocks are already there, we don't need to get them. + for _, block := range file.Blocks { _, ok := existingBlocks[block.String()] if !ok { - newCopyBlocks = append(newCopyBlocks, block) - } - } - - var newPullBlocks []protocol.BlockInfo - for _, block := range pullBlocks { - _, ok := existingBlocks[block.String()] - if !ok { - newPullBlocks = append(newPullBlocks, block) + blocks = append(blocks, block) } } // If any blocks could be reused, let the sharedpullerstate know // which flags it is expected to set on the file. - // Also update the list of work for the routines. - if len(copyBlocks) != len(newCopyBlocks) || len(pullBlocks) != len(newPullBlocks) { + if len(blocks) != len(file.Blocks) { reuse = true - copyBlocks = newCopyBlocks - pullBlocks = newPullBlocks } else { // Otherwise, discard the file ourselves in order for the // sharedpuller not to panic when it fails to exlusively create a // file which already exists os.Remove(tempName) } + } else { + blocks = file.Blocks } s := sharedPullerState{ @@ -492,43 +491,19 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt folder: p.folder, tempName: tempName, realName: realName, - pullNeeded: len(pullBlocks), + copyNeeded: len(blocks), reuse: reuse, } - if len(copyBlocks) > 0 { - s.copyNeeded = 1 - } if debug { - l.Debugf("%v need file %s; copy %d, pull %d, reuse %v", p, file.Name, len(copyBlocks), len(pullBlocks), reuse) + l.Debugf("%v need file %s; copy %d, reuse %v", p, file.Name, len(blocks), reuse) } - if len(copyBlocks) > 0 { - cs := copyBlocksState{ - sharedPullerState: &s, - blocks: copyBlocks, - } - copyChan <- cs - } - - if len(pullBlocks) > 0 { - for _, block := range pullBlocks { - ps := pullBlockState{ - sharedPullerState: &s, - block: block, - } - pullChan <- ps - } - } - - if len(pullBlocks) == 0 && len(copyBlocks) == 0 { - if !reuse { - panic("bug: nothing to do with file?") - } - // We have a temp file that we can reuse totally. Jump directly to the - // finisher stage. - finisherChan <- &s + cs := copyBlocksState{ + sharedPullerState: &s, + blocks: blocks, } + copyChan <- cs } // shortcutFile sets file mode and modification time, when that's the only @@ -561,9 +536,9 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) { p.model.updateLocal(p.folder, file) } -// copierRoutine reads pullerStates until the in channel closes and performs -// the relevant copy. -func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) { +// copierRoutine reads copierStates until the in channel closes and performs +// the relevant copies when possible, or passes it to the puller routine. +func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { buf := make([]byte, protocol.BlockSize) nextFile: @@ -575,32 +550,66 @@ nextFile: continue nextFile } - srcFd, err := state.sourceFile() - if err != nil { - // As above - continue nextFile - } + evictionChan := make(chan lfu.Eviction) + + fdCache := lfu.New() + fdCache.UpperBound = 50 + fdCache.LowerBound = 20 + fdCache.EvictionChannel = evictionChan + + go func() { + for item := range evictionChan { + item.Value.(*os.File).Close() + } + }() for _, block := range state.blocks { buf = buf[:int(block.Size)] - _, err = srcFd.ReadAt(buf, block.Offset) - if err != nil { - state.earlyClose("src read", err) - srcFd.Close() - continue nextFile + success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { + path := filepath.Join(p.model.folderCfgs[folder].Path, file) + + var fd *os.File + + fdi := fdCache.Get(path) + if fdi != nil { + fd = fdi.(*os.File) + } else { + fd, err = os.Open(path) + if err != nil { + return false + } + fdCache.Set(path, fd) + } + + _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index)) + if err != nil { + return false + } + + _, err = dstFd.WriteAt(buf, block.Offset) + if err != nil { + state.earlyClose("dst write", err) + } + return true + }) + + if state.failed() != nil { + break } - _, err = dstFd.WriteAt(buf, block.Offset) - if err != nil { - state.earlyClose("dst write", err) - srcFd.Close() - continue nextFile + if !success { + state.pullStarted() + ps := pullBlockState{ + sharedPullerState: state.sharedPullerState, + block: block, + } + pullChan <- ps } + state.copyDone() } - - srcFd.Close() - state.copyDone() + fdCache.Evict(fdCache.Len()) + close(evictionChan) out <- state.sharedPullerState } } diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go index 1347e627..bba67b14 100644 --- a/internal/model/sharedpullerstate.go +++ b/internal/model/sharedpullerstate.go @@ -149,7 +149,16 @@ func (s *sharedPullerState) copyDone() { s.mut.Lock() s.copyNeeded-- if debug { - l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.pullNeeded) + l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded) + } + s.mut.Unlock() +} + +func (s *sharedPullerState) pullStarted() { + s.mut.Lock() + s.pullNeeded++ + if debug { + l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded) } s.mut.Unlock() } @@ -158,7 +167,7 @@ func (s *sharedPullerState) pullDone() { s.mut.Lock() s.pullNeeded-- if debug { - l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded ->", s.pullNeeded) + l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded) } s.mut.Unlock() } From 4360b2c815d51c48e3cf096ee96c7585222afae9 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Wed, 8 Oct 2014 23:41:23 +0100 Subject: [PATCH 4/7] Fix tests --- internal/model/model_test.go | 8 ++++-- internal/model/puller_test.go | 52 ++++++++--------------------------- 2 files changed, 18 insertions(+), 42 deletions(-) diff --git a/internal/model/model_test.go b/internal/model/model_test.go index b37dd0bd..a55f0029 100644 --- a/internal/model/model_test.go +++ b/internal/model/model_test.go @@ -390,8 +390,12 @@ func TestIgnores(t *testing.T) { } db, _ := leveldb.Open(storage.NewMemStorage(), nil) - m := NewModel(config.Wrap("", config.Configuration{}), "device", "syncthing", "dev", db) - m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"}) + fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"} + cfg := config.Wrap("/tmp", config.Configuration{ + Folders: []config.FolderConfiguration{fcfg}, + }) + m := NewModel(cfg, "device", "syncthing", "dev", db) + m.AddFolder(fcfg) expected := []string{ ".*", diff --git a/internal/model/puller_test.go b/internal/model/puller_test.go index b878e6da..6634d7e7 100644 --- a/internal/model/puller_test.go +++ b/internal/model/puller_test.go @@ -73,34 +73,20 @@ func TestHandleFile(t *testing.T) { model: m, } - copyChan := make(chan copyBlocksState, 1) // Copy chan gets all blocks needed to copy in a wrapper struct - pullChan := make(chan pullBlockState, 5) // Pull chan gets blocks one by one + copyChan := make(chan copyBlocksState, 1) - p.handleFile(requiredFile, copyChan, pullChan, nil) + p.handleFile(requiredFile, copyChan, nil) // Receive the results toCopy := <-copyChan - toPull := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan, <-pullChan} - select { - case <-pullChan: - t.Error("Channel not empty!") - default: + if len(toCopy.blocks) != 8 { + t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks)) } - if len(toCopy.blocks) != 3 { - t.Errorf("Unexpected count of copy blocks: %d != 3", len(toCopy.blocks)) - } - - for i, eq := range []int{2, 5, 8} { - if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) { - t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String()) - } - } - - for i, eq := range []int{1, 3, 4, 6, 7} { - if string(toPull[i].block.Hash) != string(blocks[eq].Hash) { - t.Errorf("Block mismatch: %s != %s", toPull[i].block.String(), blocks[eq].String()) + for i, block := range toCopy.blocks { + if string(block.Hash) != string(blocks[i+1].Hash) { + t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String()) } } } @@ -140,34 +126,20 @@ func TestHandleFileWithTemp(t *testing.T) { model: m, } - copyChan := make(chan copyBlocksState, 1) // Copy chan gets all blocks needed to copy in a wrapper struct - pullChan := make(chan pullBlockState, 2) // Pull chan gets blocks one by one + copyChan := make(chan copyBlocksState, 1) - p.handleFile(requiredFile, copyChan, pullChan, nil) + p.handleFile(requiredFile, copyChan, nil) // Receive the results toCopy := <-copyChan - toPull := []pullBlockState{<-pullChan, <-pullChan} - select { - case <-pullChan: - t.Error("Channel not empty!") - default: + if len(toCopy.blocks) != 4 { + t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks)) } - if len(toCopy.blocks) != 2 { - t.Errorf("Unexpected count of copy blocks: %d != 2", len(toCopy.blocks)) - } - - for i, eq := range []int{5, 8} { + for i, eq := range []int{1, 5, 6, 8} { if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) { t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String()) } } - - for i, eq := range []int{1, 6} { - if string(toPull[i].block.Hash) != string(blocks[eq].Hash) { - t.Errorf("Block mismatch: %s != %s", toPull[i].block.String(), blocks[eq].String()) - } - } } From 53da778506545beed158262ddb7a2b8948ecce78 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sun, 12 Oct 2014 21:38:22 +0100 Subject: [PATCH 5/7] Track total block counts, count copier blocks Will eventually allow us to track progress per file --- internal/model/puller.go | 22 +++++++++++++--------- internal/model/sharedpullerstate.go | 20 ++++++++++++++++---- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/internal/model/puller.go b/internal/model/puller.go index 0ebe5882..43efd323 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -448,7 +448,7 @@ FilesAreDifferent: tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) realName := filepath.Join(p.dir, file.Name) - var reuse bool + reused := 0 var blocks []protocol.BlockInfo // Check for an old temporary file which might have some blocks we could @@ -472,11 +472,10 @@ FilesAreDifferent: } } - // If any blocks could be reused, let the sharedpullerstate know - // which flags it is expected to set on the file. - if len(blocks) != len(file.Blocks) { - reuse = true - } else { + // The sharedpullerstate will know which flags to use when opening the + // temp file depending if we are reusing any blocks or not. + reused = len(file.Blocks) - len(blocks) + if reused == 0 { // Otherwise, discard the file ourselves in order for the // sharedpuller not to panic when it fails to exlusively create a // file which already exists @@ -491,12 +490,13 @@ FilesAreDifferent: folder: p.folder, tempName: tempName, realName: realName, + copyTotal: len(blocks), copyNeeded: len(blocks), - reuse: reuse, + reused: reused, } if debug { - l.Debugf("%v need file %s; copy %d, reuse %v", p, file.Name, len(blocks), reuse) + l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused) } cs := copyBlocksState{ @@ -591,6 +591,9 @@ nextFile: if err != nil { state.earlyClose("dst write", err) } + if file == state.file.Name { + state.copiedFromOrigin() + } return true }) @@ -605,8 +608,9 @@ nextFile: block: block, } pullChan <- ps + } else { + state.copyDone() } - state.copyDone() } fdCache.Evict(fdCache.Len()) close(evictionChan) diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go index bba67b14..9557b0b6 100644 --- a/internal/model/sharedpullerstate.go +++ b/internal/model/sharedpullerstate.go @@ -31,13 +31,16 @@ type sharedPullerState struct { folder string tempName string realName string - reuse bool + reused int // Number of blocks reused from temporary file // Mutable, must be locked for access err error // The first error we hit fd *os.File // The fd of the temp file - copyNeeded int // Number of copy actions we expect to happen - pullNeeded int // Number of block pulls we expect to happen + copyTotal int // Total number of copy actions for the whole job + pullTotal int // Total number of pull actions for the whole job + copyNeeded int // Number of copy actions still pending + pullNeeded int // Number of block pulls still pending + copyOrigin int // Number of blocks copied from the original file closed bool // Set when the file has been closed mut sync.Mutex // Protects the above } @@ -79,7 +82,7 @@ func (s *sharedPullerState) tempFile() (*os.File, error) { // Attempt to create the temp file flags := os.O_WRONLY - if !s.reuse { + if s.reused == 0 { flags |= os.O_CREATE | os.O_EXCL } fd, err := os.OpenFile(s.tempName, flags, 0644) @@ -154,8 +157,17 @@ func (s *sharedPullerState) copyDone() { s.mut.Unlock() } +func (s *sharedPullerState) copiedFromOrigin() { + s.mut.Lock() + s.copyOrigin++ + s.mut.Unlock() +} + func (s *sharedPullerState) pullStarted() { s.mut.Lock() + s.copyTotal-- + s.copyNeeded-- + s.pullTotal++ s.pullNeeded++ if debug { l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded) From e62b9c60092320804c2b32bb8e7b38630af01ce4 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sat, 11 Oct 2014 00:27:17 +0100 Subject: [PATCH 6/7] Add fetcher tests --- internal/model/puller_test.go | 111 +++++++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) diff --git a/internal/model/puller_test.go b/internal/model/puller_test.go index 6634d7e7..8c76c30d 100644 --- a/internal/model/puller_test.go +++ b/internal/model/puller_test.go @@ -16,10 +16,13 @@ package model import ( + "os" + "path/filepath" "testing" "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/protocol" + "github.com/syncthing/syncthing/internal/scanner" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" @@ -47,7 +50,7 @@ func TestHandleFile(t *testing.T) { // Copy: 2, 5, 8 // Pull: 1, 3, 4, 6, 7 - // Create existing file, and update local index + // Create existing file existingFile := protocol.FileInfo{ Name: "filex", Flags: 0, @@ -65,6 +68,7 @@ func TestHandleFile(t *testing.T) { db, _ := leveldb.Open(storage.NewMemStorage(), nil) m := NewModel(config.Wrap("/tmp/test", config.Configuration{}), "device", "syncthing", "dev", db) m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"}) + // Update index m.updateLocal("default", existingFile) p := Puller{ @@ -100,7 +104,7 @@ func TestHandleFileWithTemp(t *testing.T) { // Copy: 5, 8 // Pull: 1, 6 - // Create existing file, and update local index + // Create existing file existingFile := protocol.FileInfo{ Name: "file", Flags: 0, @@ -118,6 +122,7 @@ func TestHandleFileWithTemp(t *testing.T) { db, _ := leveldb.Open(storage.NewMemStorage(), nil) m := NewModel(config.Wrap("/tmp/test", config.Configuration{}), "device", "syncthing", "dev", db) m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"}) + // Update index m.updateLocal("default", existingFile) p := Puller{ @@ -143,3 +148,105 @@ func TestHandleFileWithTemp(t *testing.T) { } } } + +func TestCopierFinder(t *testing.T) { + // After diff between required and existing we should: + // Copy: 1, 2, 3, 4, 6, 7, 8 + // Since there is no existing file, nor a temp file + + // After dropping out blocks found locally: + // Pull: 1, 5, 6, 8 + + tempFile := filepath.Join("testdata", defTempNamer.TempName("file2")) + err := os.Remove(tempFile) + if err != nil && !os.IsNotExist(err) { + t.Error(err) + } + + // Create existing file + existingFile := protocol.FileInfo{ + Name: defTempNamer.TempName("file"), + Flags: 0, + Modified: 0, + Blocks: []protocol.BlockInfo{ + blocks[0], blocks[2], blocks[3], blocks[4], + blocks[0], blocks[0], blocks[7], blocks[0], + }, + } + + // Create target file + requiredFile := existingFile + requiredFile.Blocks = blocks[1:] + requiredFile.Name = "file2" + + fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"} + cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}} + + db, _ := leveldb.Open(storage.NewMemStorage(), nil) + m := NewModel(config.Wrap("/tmp/test", cfg), "device", "syncthing", "dev", db) + m.AddFolder(fcfg) + // Update index + m.updateLocal("default", existingFile) + + iterFn := func(folder, file string, index uint32) bool { + return true + } + + // Verify that the blocks we say exist on file, really exist in the db. + for _, idx := range []int{2, 3, 4, 7} { + if m.finder.Iterate(blocks[idx].Hash, iterFn) == false { + t.Error("Didn't find block") + } + } + + p := Puller{ + folder: "default", + dir: "testdata", + model: m, + } + + copyChan := make(chan copyBlocksState) + pullChan := make(chan pullBlockState, 4) + finisherChan := make(chan *sharedPullerState, 1) + + // Run a single fetcher routine + go p.copierRoutine(copyChan, pullChan, finisherChan) + + p.handleFile(requiredFile, copyChan, finisherChan) + + pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan} + finish := <-finisherChan + + select { + case <-pullChan: + t.Fatal("Finisher channel has data to be read") + case <-finisherChan: + t.Fatal("Finisher channel has data to be read") + default: + } + + // Verify that the right blocks went into the pull list + for i, eq := range []int{1, 5, 6, 8} { + if string(pulls[i].block.Hash) != string(blocks[eq].Hash) { + t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String()) + } + if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) { + t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String()) + } + } + + // Verify that the fetched blocks have actually been written to the temp file + blks, err := scanner.HashFile(tempFile, protocol.BlockSize) + if err != nil { + t.Log(err) + } + + for _, eq := range []int{2, 3, 4, 7} { + if string(blks[eq-1].Hash) != string(blocks[eq].Hash) { + t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String()) + } + } + finish.fd.Close() + + os.Remove(tempFile) +} From dedf835aa6d877009b84cbc1b8043344f941d72c Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sun, 12 Oct 2014 22:01:57 +0100 Subject: [PATCH 7/7] Delete files and directories after pulling --- internal/model/puller.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/internal/model/puller.go b/internal/model/puller.go index 43efd323..2420cb9e 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -281,6 +281,9 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { // !!! changed := 0 + + var deletions []protocol.FileInfo + files.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool { // Needed items are delivered sorted lexicographically. This isn't @@ -302,15 +305,12 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { } switch { - case protocol.IsDirectory(file.Flags) && protocol.IsDeleted(file.Flags): - // A deleted directory - p.deleteDir(file) + case protocol.IsDeleted(file.Flags): + // A deleted file or directory + deletions = append(deletions, file) case protocol.IsDirectory(file.Flags): // A new or changed directory p.handleDir(file) - case protocol.IsDeleted(file.Flags): - // A deleted file - p.deleteFile(file) default: // A new or changed file. This is the only case where we do stuff // in the background; the other three are done synchronously. @@ -334,6 +334,15 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { // Wait for the finisherChan to finish. doneWg.Wait() + for i := range deletions { + deletion := deletions[len(deletions)-i-1] + if deletion.IsDirectory() { + p.deleteDir(deletion) + } else { + p.deleteFile(deletion) + } + } + return changed }