diff --git a/internal/files/blockmap.go b/internal/files/blockmap.go
new file mode 100644
index 00000000..c24f1f03
--- /dev/null
+++ b/internal/files/blockmap.go
@@ -0,0 +1,201 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see .
+
+// Package files provides a set type to track local/remote files with newness
+// checks. We must do a certain amount of normalization in here. We will get
+// fed paths with either native or wire-format separators and encodings
+// depending on who calls us. We transform paths to wire-format (NFC and
+// slashes) on the way to the database, and transform to native format
+// (varying separator and encoding) on the way back out.
+
+package files
+
+import (
+ "bytes"
+ "encoding/binary"
+ "sort"
+ "sync"
+
+ "github.com/syncthing/syncthing/internal/config"
+ "github.com/syncthing/syncthing/internal/protocol"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+var blockFinder *BlockFinder
+
+type BlockMap struct {
+ db *leveldb.DB
+ folder string
+}
+
+func NewBlockMap(db *leveldb.DB, folder string) *BlockMap {
+ return &BlockMap{
+ db: db,
+ folder: folder,
+ }
+}
+
+// Add files to the block map, ignoring any deleted or invalid files.
+func (m *BlockMap) Add(files []protocol.FileInfo) error {
+ batch := new(leveldb.Batch)
+ buf := make([]byte, 4)
+ for _, file := range files {
+ if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() {
+ continue
+ }
+
+ for i, block := range file.Blocks {
+ binary.BigEndian.PutUint32(buf, uint32(i))
+ batch.Put(m.blockKey(block.Hash, file.Name), buf)
+ }
+ }
+ return m.db.Write(batch, nil)
+}
+
+// Update block map state, removing any deleted or invalid files.
+func (m *BlockMap) Update(files []protocol.FileInfo) error {
+ batch := new(leveldb.Batch)
+ buf := make([]byte, 4)
+ for _, file := range files {
+ if file.IsDirectory() {
+ continue
+ }
+
+ if file.IsDeleted() || file.IsInvalid() {
+ for _, block := range file.Blocks {
+ batch.Delete(m.blockKey(block.Hash, file.Name))
+ }
+ continue
+ }
+
+ for i, block := range file.Blocks {
+ binary.BigEndian.PutUint32(buf, uint32(i))
+ batch.Put(m.blockKey(block.Hash, file.Name), buf)
+ }
+ }
+ return m.db.Write(batch, nil)
+}
+
+// Drop block map, removing all entries related to this block map from the db.
+func (m *BlockMap) Drop() error {
+ batch := new(leveldb.Batch)
+ iter := m.db.NewIterator(util.BytesPrefix(m.blockKey(nil, "")[:1+64]), nil)
+ defer iter.Release()
+ for iter.Next() {
+ batch.Delete(iter.Key())
+ }
+ if iter.Error() != nil {
+ return iter.Error()
+ }
+ return m.db.Write(batch, nil)
+}
+
+func (m *BlockMap) blockKey(hash []byte, file string) []byte {
+ return toBlockKey(hash, m.folder, file)
+}
+
+type BlockFinder struct {
+ db *leveldb.DB
+ folders []string
+ mut sync.RWMutex
+}
+
+func NewBlockFinder(db *leveldb.DB, cfg *config.ConfigWrapper) *BlockFinder {
+ if blockFinder != nil {
+ return blockFinder
+ }
+
+ f := &BlockFinder{
+ db: db,
+ }
+ f.Changed(cfg.Raw())
+ cfg.Subscribe(f)
+ return f
+}
+
+// Implements config.Handler interface
+func (f *BlockFinder) Changed(cfg config.Configuration) error {
+ folders := make([]string, len(cfg.Folders))
+ for i, folder := range cfg.Folders {
+ folders[i] = folder.ID
+ }
+
+ sort.Strings(folders)
+
+ f.mut.Lock()
+ f.folders = folders
+ f.mut.Unlock()
+
+ return nil
+}
+
+// An iterator function which iterates over all matching blocks for the given
+// hash. The iterator function has to return either true (if they are happy with
+// the block) or false to continue iterating for whatever reason.
+// The iterator finally returns the result, whether or not a satisfying block
+// was eventually found.
+func (f *BlockFinder) Iterate(hash []byte, iterFn func(string, string, uint32) bool) bool {
+ f.mut.RLock()
+ folders := f.folders
+ f.mut.RUnlock()
+ for _, folder := range folders {
+ key := toBlockKey(hash, folder, "")
+ iter := f.db.NewIterator(util.BytesPrefix(key), nil)
+ defer iter.Release()
+
+ for iter.Next() && iter.Error() == nil {
+ folder, file := fromBlockKey(iter.Key())
+ index := binary.BigEndian.Uint32(iter.Value())
+ if iterFn(folder, nativeFilename(file), index) {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+// m.blockKey returns a byte slice encoding the following information:
+// keyTypeBlock (1 byte)
+// folder (64 bytes)
+// block hash (64 bytes)
+// file name (variable size)
+func toBlockKey(hash []byte, folder, file string) []byte {
+ o := make([]byte, 1+64+64+len(file))
+ o[0] = keyTypeBlock
+ copy(o[1:], []byte(folder))
+ copy(o[1+64:], []byte(hash))
+ copy(o[1+64+64:], []byte(file))
+ return o
+}
+
+func fromBlockKey(data []byte) (string, string) {
+ if len(data) < 1+64+64+1 {
+ panic("Incorrect key length")
+ }
+ if data[0] != keyTypeBlock {
+ panic("Incorrect key type")
+ }
+
+ file := string(data[1+64+64:])
+
+ slice := data[1 : 1+64]
+ izero := bytes.IndexByte(slice, 0)
+ if izero > -1 {
+ return string(slice[:izero]), file
+ }
+ return string(slice), file
+}
diff --git a/internal/files/blockmap_test.go b/internal/files/blockmap_test.go
new file mode 100644
index 00000000..ea195261
--- /dev/null
+++ b/internal/files/blockmap_test.go
@@ -0,0 +1,235 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see .
+
+package files
+
+import (
+ "testing"
+
+ "github.com/syncthing/syncthing/internal/config"
+ "github.com/syncthing/syncthing/internal/protocol"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+func genBlocks(n int) []protocol.BlockInfo {
+ b := make([]protocol.BlockInfo, n)
+ for i := range b {
+ h := make([]byte, 32)
+ for j := range h {
+ h[j] = byte(i + j)
+ }
+ b[i].Size = uint32(i)
+ b[i].Hash = h
+ }
+ return b
+}
+
+var f1, f2, f3 protocol.FileInfo
+
+func init() {
+ blocks := genBlocks(30)
+
+ f1 = protocol.FileInfo{
+ Name: "f1",
+ Blocks: blocks[:10],
+ }
+
+ f2 = protocol.FileInfo{
+ Name: "f2",
+ Blocks: blocks[10:20],
+ }
+
+ f3 = protocol.FileInfo{
+ Name: "f3",
+ Blocks: blocks[20:],
+ }
+}
+
+func setup() (*leveldb.DB, *BlockFinder) {
+ // Setup
+
+ db, err := leveldb.Open(storage.NewMemStorage(), nil)
+ if err != nil {
+ panic(err)
+ }
+
+ wrapper := config.Wrap("", config.Configuration{})
+ wrapper.SetFolder(config.FolderConfiguration{
+ ID: "folder1",
+ })
+ wrapper.SetFolder(config.FolderConfiguration{
+ ID: "folder2",
+ })
+
+ return db, NewBlockFinder(db, wrapper)
+}
+
+func dbEmpty(db *leveldb.DB) bool {
+ iter := db.NewIterator(nil, nil)
+ defer iter.Release()
+ if iter.Next() {
+ return false
+ }
+ return true
+}
+
+func TestBlockMapAddUpdateWipe(t *testing.T) {
+ db, f := setup()
+
+ if !dbEmpty(db) {
+ t.Fatal("db not empty")
+ }
+
+ m := NewBlockMap(db, "folder1")
+
+ f3.Flags |= protocol.FlagDirectory
+
+ err := m.Add([]protocol.FileInfo{f1, f2, f3})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ if folder != "folder1" || file != "f1" || index != 0 {
+ t.Fatal("Mismatch")
+ }
+ return true
+ })
+
+ f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ if folder != "folder1" || file != "f2" || index != 0 {
+ t.Fatal("Mismatch")
+ }
+ return true
+ })
+
+ f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ t.Fatal("Unexpected block")
+ return true
+ })
+
+ f3.Flags = f1.Flags
+ f1.Flags |= protocol.FlagDeleted
+ f2.Flags |= protocol.FlagInvalid
+
+ // Should remove
+ err = m.Update([]protocol.FileInfo{f1, f2, f3})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ t.Fatal("Unexpected block")
+ return false
+ })
+
+ f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ t.Fatal("Unexpected block")
+ return false
+ })
+
+ f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ if folder != "folder1" || file != "f3" || index != 0 {
+ t.Fatal("Mismatch")
+ }
+ return true
+ })
+
+ err = m.Drop()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !dbEmpty(db) {
+ t.Fatal("db not empty")
+ }
+
+ // Should not add
+ err = m.Add([]protocol.FileInfo{f1, f2})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !dbEmpty(db) {
+ t.Fatal("db not empty")
+ }
+
+ f1.Flags = 0
+ f2.Flags = 0
+ f3.Flags = 0
+}
+
+func TestBlockMapFinderLookup(t *testing.T) {
+ db, f := setup()
+
+ m1 := NewBlockMap(db, "folder1")
+ m2 := NewBlockMap(db, "folder2")
+
+ err := m1.Add([]protocol.FileInfo{f1})
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = m2.Add([]protocol.FileInfo{f1})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ counter := 0
+ f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ counter++
+ switch counter {
+ case 1:
+ if folder != "folder1" || file != "f1" || index != 0 {
+ t.Fatal("Mismatch")
+ }
+ case 2:
+ if folder != "folder2" || file != "f1" || index != 0 {
+ t.Fatal("Mismatch")
+ }
+ default:
+ t.Fatal("Unexpected block")
+ }
+ return false
+ })
+ if counter != 2 {
+ t.Fatal("Incorrect count", counter)
+ }
+
+ f1.Flags |= protocol.FlagDeleted
+
+ err = m1.Update([]protocol.FileInfo{f1})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ counter = 0
+ f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+ counter++
+ switch counter {
+ case 1:
+ if folder != "folder2" || file != "f1" || index != 0 {
+ t.Fatal("Mismatch")
+ }
+ default:
+ t.Fatal("Unexpected block")
+ }
+ return false
+ })
+ if counter != 1 {
+ t.Fatal("Incorrect count")
+ }
+}
diff --git a/internal/files/leveldb.go b/internal/files/leveldb.go
index 0ac1b833..06c8bba6 100644
--- a/internal/files/leveldb.go
+++ b/internal/files/leveldb.go
@@ -49,6 +49,7 @@ func clock(v uint64) uint64 {
const (
keyTypeDevice = iota
keyTypeGlobal
+ keyTypeBlock
)
type fileVersion struct {
diff --git a/internal/files/set.go b/internal/files/set.go
index 11c28480..112c7386 100644
--- a/internal/files/set.go
+++ b/internal/files/set.go
@@ -42,6 +42,7 @@ type Set struct {
mutex sync.Mutex
folder string
db *leveldb.DB
+ blockmap *BlockMap
}
func NewSet(folder string, db *leveldb.DB) *Set {
@@ -49,6 +50,7 @@ func NewSet(folder string, db *leveldb.DB) *Set {
localVersion: make(map[protocol.DeviceID]uint64),
folder: folder,
db: db,
+ blockmap: NewBlockMap(db, folder),
}
var deviceID protocol.DeviceID
@@ -80,6 +82,10 @@ func (s *Set) Replace(device protocol.DeviceID, fs []protocol.FileInfo) {
// Reset the local version if all files were removed.
s.localVersion[device] = 0
}
+ if device == protocol.LocalDeviceID {
+ s.blockmap.Drop()
+ s.blockmap.Add(fs)
+ }
}
func (s *Set) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo) {
@@ -92,6 +98,10 @@ func (s *Set) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo
if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] {
s.localVersion[device] = lv
}
+ if device == protocol.LocalDeviceID {
+ s.blockmap.Drop()
+ s.blockmap.Add(fs)
+ }
}
func (s *Set) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
@@ -104,6 +114,9 @@ func (s *Set) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
if lv := ldbUpdate(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] {
s.localVersion[device] = lv
}
+ if device == protocol.LocalDeviceID {
+ s.blockmap.Update(fs)
+ }
}
func (s *Set) WithNeed(device protocol.DeviceID, fn fileIterator) {
@@ -179,6 +192,11 @@ func ListFolders(db *leveldb.DB) []string {
// database.
func DropFolder(db *leveldb.DB, folder string) {
ldbDropFolder(db, []byte(folder))
+ bm := &BlockMap{
+ db: db,
+ folder: folder,
+ }
+ bm.Drop()
}
func normalizeFilenames(fs []protocol.FileInfo) {
diff --git a/internal/model/model.go b/internal/model/model.go
index 527f0a55..a9cd6b15 100644
--- a/internal/model/model.go
+++ b/internal/model/model.go
@@ -81,8 +81,9 @@ type service interface {
}
type Model struct {
- cfg *config.ConfigWrapper
- db *leveldb.DB
+ cfg *config.ConfigWrapper
+ db *leveldb.DB
+ finder *files.BlockFinder
deviceName string
clientName string
@@ -137,6 +138,7 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s
protoConn: make(map[protocol.DeviceID]protocol.Connection),
rawConn: make(map[protocol.DeviceID]io.Closer),
deviceVer: make(map[protocol.DeviceID]string),
+ finder: files.NewBlockFinder(db, cfg),
}
var timeout = 20 * 60 // seconds
diff --git a/internal/model/model_test.go b/internal/model/model_test.go
index b37dd0bd..a55f0029 100644
--- a/internal/model/model_test.go
+++ b/internal/model/model_test.go
@@ -390,8 +390,12 @@ func TestIgnores(t *testing.T) {
}
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
- m := NewModel(config.Wrap("", config.Configuration{}), "device", "syncthing", "dev", db)
- m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+ fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
+ cfg := config.Wrap("/tmp", config.Configuration{
+ Folders: []config.FolderConfiguration{fcfg},
+ })
+ m := NewModel(cfg, "device", "syncthing", "dev", db)
+ m.AddFolder(fcfg)
expected := []string{
".*",
diff --git a/internal/model/puller.go b/internal/model/puller.go
index dc216093..2420cb9e 100644
--- a/internal/model/puller.go
+++ b/internal/model/puller.go
@@ -16,6 +16,7 @@
package model
import (
+ "bytes"
"errors"
"fmt"
"os"
@@ -23,6 +24,8 @@ import (
"sync"
"time"
+ "github.com/AudriusButkevicius/lfu-go"
+
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/osutil"
@@ -50,7 +53,7 @@ type pullBlockState struct {
}
// A copyBlocksState is passed to copy routine if the file has blocks to be
-// copied from the original.
+// copied.
type copyBlocksState struct {
*sharedPullerState
blocks []protocol.BlockInfo
@@ -236,24 +239,25 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
- var wg sync.WaitGroup
+ var copyWg sync.WaitGroup
+ var pullWg sync.WaitGroup
var doneWg sync.WaitGroup
for i := 0; i < ncopiers; i++ {
- wg.Add(1)
+ copyWg.Add(1)
go func() {
// copierRoutine finishes when copyChan is closed
- p.copierRoutine(copyChan, finisherChan)
- wg.Done()
+ p.copierRoutine(copyChan, pullChan, finisherChan)
+ copyWg.Done()
}()
}
for i := 0; i < npullers; i++ {
- wg.Add(1)
+ pullWg.Add(1)
go func() {
// pullerRoutine finishes when pullChan is closed
p.pullerRoutine(pullChan, finisherChan)
- wg.Done()
+ pullWg.Done()
}()
}
@@ -277,6 +281,9 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
// !!!
changed := 0
+
+ var deletions []protocol.FileInfo
+
files.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
// Needed items are delivered sorted lexicographically. This isn't
@@ -298,19 +305,16 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
}
switch {
- case protocol.IsDirectory(file.Flags) && protocol.IsDeleted(file.Flags):
- // A deleted directory
- p.deleteDir(file)
+ case protocol.IsDeleted(file.Flags):
+ // A deleted file or directory
+ deletions = append(deletions, file)
case protocol.IsDirectory(file.Flags):
// A new or changed directory
p.handleDir(file)
- case protocol.IsDeleted(file.Flags):
- // A deleted file
- p.deleteFile(file)
default:
// A new or changed file. This is the only case where we do stuff
// in the background; the other three are done synchronously.
- p.handleFile(file, copyChan, pullChan, finisherChan)
+ p.handleFile(file, copyChan, finisherChan)
}
changed++
@@ -318,18 +322,27 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
})
// Signal copy and puller routines that we are done with the in data for
- // this iteration
+ // this iteration. Wait for them to finish.
close(copyChan)
+ copyWg.Wait()
close(pullChan)
+ pullWg.Wait()
- // Wait for them to finish, then signal the finisher chan that there will
- // be no more input.
- wg.Wait()
+ // Signal the finisher chan that there will be no more input.
close(finisherChan)
// Wait for the finisherChan to finish.
doneWg.Wait()
+ for i := range deletions {
+ deletion := deletions[len(deletions)-i-1]
+ if deletion.IsDirectory() {
+ p.deleteDir(deletion)
+ } else {
+ p.deleteFile(deletion)
+ }
+ }
+
return changed
}
@@ -419,11 +432,15 @@ func (p *Puller) deleteFile(file protocol.FileInfo) {
// handleFile queues the copies and pulls as necessary for a single new or
// changed file.
-func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) {
+func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
curFile := p.model.CurrentFolderFile(p.folder, file.Name)
- copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks)
- if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 {
+ if len(curFile.Blocks) == len(file.Blocks) {
+ for i := range file.Blocks {
+ if !bytes.Equal(curFile.Blocks[i].Hash, file.Blocks[i].Hash) {
+ goto FilesAreDifferent
+ }
+ }
// We are supposed to copy the entire file, and then fetch nothing. We
// are only updating metadata, so we don't actually *need* to make the
// copy.
@@ -434,11 +451,14 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
return
}
+FilesAreDifferent:
+
// Figure out the absolute filenames we need once and for all
tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
realName := filepath.Join(p.dir, file.Name)
- var reuse bool
+ reused := 0
+ var blocks []protocol.BlockInfo
// Check for an old temporary file which might have some blocks we could
// reuse.
@@ -453,38 +473,25 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
existingBlocks[block.String()] = true
}
- // Since the blocks are already there, we don't need to copy them
- // nor we need to pull them, hence discard blocks which are already
- // there, if they are exactly the same...
- var newCopyBlocks []protocol.BlockInfo
- for _, block := range copyBlocks {
+ // Since the blocks are already there, we don't need to get them.
+ for _, block := range file.Blocks {
_, ok := existingBlocks[block.String()]
if !ok {
- newCopyBlocks = append(newCopyBlocks, block)
+ blocks = append(blocks, block)
}
}
- var newPullBlocks []protocol.BlockInfo
- for _, block := range pullBlocks {
- _, ok := existingBlocks[block.String()]
- if !ok {
- newPullBlocks = append(newPullBlocks, block)
- }
- }
-
- // If any blocks could be reused, let the sharedpullerstate know
- // which flags it is expected to set on the file.
- // Also update the list of work for the routines.
- if len(copyBlocks) != len(newCopyBlocks) || len(pullBlocks) != len(newPullBlocks) {
- reuse = true
- copyBlocks = newCopyBlocks
- pullBlocks = newPullBlocks
- } else {
+ // The sharedpullerstate will know which flags to use when opening the
+ // temp file depending if we are reusing any blocks or not.
+ reused = len(file.Blocks) - len(blocks)
+ if reused == 0 {
// Otherwise, discard the file ourselves in order for the
// sharedpuller not to panic when it fails to exlusively create a
// file which already exists
os.Remove(tempName)
}
+ } else {
+ blocks = file.Blocks
}
s := sharedPullerState{
@@ -492,43 +499,20 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
folder: p.folder,
tempName: tempName,
realName: realName,
- pullNeeded: len(pullBlocks),
- reuse: reuse,
- }
- if len(copyBlocks) > 0 {
- s.copyNeeded = 1
+ copyTotal: len(blocks),
+ copyNeeded: len(blocks),
+ reused: reused,
}
if debug {
- l.Debugf("%v need file %s; copy %d, pull %d, reuse %v", p, file.Name, len(copyBlocks), len(pullBlocks), reuse)
+ l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
}
- if len(copyBlocks) > 0 {
- cs := copyBlocksState{
- sharedPullerState: &s,
- blocks: copyBlocks,
- }
- copyChan <- cs
- }
-
- if len(pullBlocks) > 0 {
- for _, block := range pullBlocks {
- ps := pullBlockState{
- sharedPullerState: &s,
- block: block,
- }
- pullChan <- ps
- }
- }
-
- if len(pullBlocks) == 0 && len(copyBlocks) == 0 {
- if !reuse {
- panic("bug: nothing to do with file?")
- }
- // We have a temp file that we can reuse totally. Jump directly to the
- // finisher stage.
- finisherChan <- &s
+ cs := copyBlocksState{
+ sharedPullerState: &s,
+ blocks: blocks,
}
+ copyChan <- cs
}
// shortcutFile sets file mode and modification time, when that's the only
@@ -561,9 +545,9 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) {
p.model.updateLocal(p.folder, file)
}
-// copierRoutine reads pullerStates until the in channel closes and performs
-// the relevant copy.
-func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) {
+// copierRoutine reads copierStates until the in channel closes and performs
+// the relevant copies when possible, or passes it to the puller routine.
+func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
buf := make([]byte, protocol.BlockSize)
nextFile:
@@ -575,32 +559,70 @@ nextFile:
continue nextFile
}
- srcFd, err := state.sourceFile()
- if err != nil {
- // As above
- continue nextFile
- }
+ evictionChan := make(chan lfu.Eviction)
+
+ fdCache := lfu.New()
+ fdCache.UpperBound = 50
+ fdCache.LowerBound = 20
+ fdCache.EvictionChannel = evictionChan
+
+ go func() {
+ for item := range evictionChan {
+ item.Value.(*os.File).Close()
+ }
+ }()
for _, block := range state.blocks {
buf = buf[:int(block.Size)]
- _, err = srcFd.ReadAt(buf, block.Offset)
- if err != nil {
- state.earlyClose("src read", err)
- srcFd.Close()
- continue nextFile
+ success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
+ path := filepath.Join(p.model.folderCfgs[folder].Path, file)
+
+ var fd *os.File
+
+ fdi := fdCache.Get(path)
+ if fdi != nil {
+ fd = fdi.(*os.File)
+ } else {
+ fd, err = os.Open(path)
+ if err != nil {
+ return false
+ }
+ fdCache.Set(path, fd)
+ }
+
+ _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
+ if err != nil {
+ return false
+ }
+
+ _, err = dstFd.WriteAt(buf, block.Offset)
+ if err != nil {
+ state.earlyClose("dst write", err)
+ }
+ if file == state.file.Name {
+ state.copiedFromOrigin()
+ }
+ return true
+ })
+
+ if state.failed() != nil {
+ break
}
- _, err = dstFd.WriteAt(buf, block.Offset)
- if err != nil {
- state.earlyClose("dst write", err)
- srcFd.Close()
- continue nextFile
+ if !success {
+ state.pullStarted()
+ ps := pullBlockState{
+ sharedPullerState: state.sharedPullerState,
+ block: block,
+ }
+ pullChan <- ps
+ } else {
+ state.copyDone()
}
}
-
- srcFd.Close()
- state.copyDone()
+ fdCache.Evict(fdCache.Len())
+ close(evictionChan)
out <- state.sharedPullerState
}
}
diff --git a/internal/model/puller_test.go b/internal/model/puller_test.go
index b878e6da..8c76c30d 100644
--- a/internal/model/puller_test.go
+++ b/internal/model/puller_test.go
@@ -16,10 +16,13 @@
package model
import (
+ "os"
+ "path/filepath"
"testing"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/protocol"
+ "github.com/syncthing/syncthing/internal/scanner"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
@@ -47,7 +50,7 @@ func TestHandleFile(t *testing.T) {
// Copy: 2, 5, 8
// Pull: 1, 3, 4, 6, 7
- // Create existing file, and update local index
+ // Create existing file
existingFile := protocol.FileInfo{
Name: "filex",
Flags: 0,
@@ -65,6 +68,7 @@ func TestHandleFile(t *testing.T) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(config.Wrap("/tmp/test", config.Configuration{}), "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+ // Update index
m.updateLocal("default", existingFile)
p := Puller{
@@ -73,34 +77,20 @@ func TestHandleFile(t *testing.T) {
model: m,
}
- copyChan := make(chan copyBlocksState, 1) // Copy chan gets all blocks needed to copy in a wrapper struct
- pullChan := make(chan pullBlockState, 5) // Pull chan gets blocks one by one
+ copyChan := make(chan copyBlocksState, 1)
- p.handleFile(requiredFile, copyChan, pullChan, nil)
+ p.handleFile(requiredFile, copyChan, nil)
// Receive the results
toCopy := <-copyChan
- toPull := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan, <-pullChan}
- select {
- case <-pullChan:
- t.Error("Channel not empty!")
- default:
+ if len(toCopy.blocks) != 8 {
+ t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks))
}
- if len(toCopy.blocks) != 3 {
- t.Errorf("Unexpected count of copy blocks: %d != 3", len(toCopy.blocks))
- }
-
- for i, eq := range []int{2, 5, 8} {
- if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
- t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
- }
- }
-
- for i, eq := range []int{1, 3, 4, 6, 7} {
- if string(toPull[i].block.Hash) != string(blocks[eq].Hash) {
- t.Errorf("Block mismatch: %s != %s", toPull[i].block.String(), blocks[eq].String())
+ for i, block := range toCopy.blocks {
+ if string(block.Hash) != string(blocks[i+1].Hash) {
+ t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String())
}
}
}
@@ -114,7 +104,7 @@ func TestHandleFileWithTemp(t *testing.T) {
// Copy: 5, 8
// Pull: 1, 6
- // Create existing file, and update local index
+ // Create existing file
existingFile := protocol.FileInfo{
Name: "file",
Flags: 0,
@@ -132,6 +122,7 @@ func TestHandleFileWithTemp(t *testing.T) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(config.Wrap("/tmp/test", config.Configuration{}), "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+ // Update index
m.updateLocal("default", existingFile)
p := Puller{
@@ -140,34 +131,122 @@ func TestHandleFileWithTemp(t *testing.T) {
model: m,
}
- copyChan := make(chan copyBlocksState, 1) // Copy chan gets all blocks needed to copy in a wrapper struct
- pullChan := make(chan pullBlockState, 2) // Pull chan gets blocks one by one
+ copyChan := make(chan copyBlocksState, 1)
- p.handleFile(requiredFile, copyChan, pullChan, nil)
+ p.handleFile(requiredFile, copyChan, nil)
// Receive the results
toCopy := <-copyChan
- toPull := []pullBlockState{<-pullChan, <-pullChan}
- select {
- case <-pullChan:
- t.Error("Channel not empty!")
- default:
+ if len(toCopy.blocks) != 4 {
+ t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks))
}
- if len(toCopy.blocks) != 2 {
- t.Errorf("Unexpected count of copy blocks: %d != 2", len(toCopy.blocks))
- }
-
- for i, eq := range []int{5, 8} {
+ for i, eq := range []int{1, 5, 6, 8} {
if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
}
}
+}
- for i, eq := range []int{1, 6} {
- if string(toPull[i].block.Hash) != string(blocks[eq].Hash) {
- t.Errorf("Block mismatch: %s != %s", toPull[i].block.String(), blocks[eq].String())
+func TestCopierFinder(t *testing.T) {
+ // After diff between required and existing we should:
+ // Copy: 1, 2, 3, 4, 6, 7, 8
+ // Since there is no existing file, nor a temp file
+
+ // After dropping out blocks found locally:
+ // Pull: 1, 5, 6, 8
+
+ tempFile := filepath.Join("testdata", defTempNamer.TempName("file2"))
+ err := os.Remove(tempFile)
+ if err != nil && !os.IsNotExist(err) {
+ t.Error(err)
+ }
+
+ // Create existing file
+ existingFile := protocol.FileInfo{
+ Name: defTempNamer.TempName("file"),
+ Flags: 0,
+ Modified: 0,
+ Blocks: []protocol.BlockInfo{
+ blocks[0], blocks[2], blocks[3], blocks[4],
+ blocks[0], blocks[0], blocks[7], blocks[0],
+ },
+ }
+
+ // Create target file
+ requiredFile := existingFile
+ requiredFile.Blocks = blocks[1:]
+ requiredFile.Name = "file2"
+
+ fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
+ cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}}
+
+ db, _ := leveldb.Open(storage.NewMemStorage(), nil)
+ m := NewModel(config.Wrap("/tmp/test", cfg), "device", "syncthing", "dev", db)
+ m.AddFolder(fcfg)
+ // Update index
+ m.updateLocal("default", existingFile)
+
+ iterFn := func(folder, file string, index uint32) bool {
+ return true
+ }
+
+ // Verify that the blocks we say exist on file, really exist in the db.
+ for _, idx := range []int{2, 3, 4, 7} {
+ if m.finder.Iterate(blocks[idx].Hash, iterFn) == false {
+ t.Error("Didn't find block")
}
}
+
+ p := Puller{
+ folder: "default",
+ dir: "testdata",
+ model: m,
+ }
+
+ copyChan := make(chan copyBlocksState)
+ pullChan := make(chan pullBlockState, 4)
+ finisherChan := make(chan *sharedPullerState, 1)
+
+ // Run a single fetcher routine
+ go p.copierRoutine(copyChan, pullChan, finisherChan)
+
+ p.handleFile(requiredFile, copyChan, finisherChan)
+
+ pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan}
+ finish := <-finisherChan
+
+ select {
+ case <-pullChan:
+ t.Fatal("Finisher channel has data to be read")
+ case <-finisherChan:
+ t.Fatal("Finisher channel has data to be read")
+ default:
+ }
+
+ // Verify that the right blocks went into the pull list
+ for i, eq := range []int{1, 5, 6, 8} {
+ if string(pulls[i].block.Hash) != string(blocks[eq].Hash) {
+ t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String())
+ }
+ if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) {
+ t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String())
+ }
+ }
+
+ // Verify that the fetched blocks have actually been written to the temp file
+ blks, err := scanner.HashFile(tempFile, protocol.BlockSize)
+ if err != nil {
+ t.Log(err)
+ }
+
+ for _, eq := range []int{2, 3, 4, 7} {
+ if string(blks[eq-1].Hash) != string(blocks[eq].Hash) {
+ t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String())
+ }
+ }
+ finish.fd.Close()
+
+ os.Remove(tempFile)
}
diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go
index 1347e627..9557b0b6 100644
--- a/internal/model/sharedpullerstate.go
+++ b/internal/model/sharedpullerstate.go
@@ -31,13 +31,16 @@ type sharedPullerState struct {
folder string
tempName string
realName string
- reuse bool
+ reused int // Number of blocks reused from temporary file
// Mutable, must be locked for access
err error // The first error we hit
fd *os.File // The fd of the temp file
- copyNeeded int // Number of copy actions we expect to happen
- pullNeeded int // Number of block pulls we expect to happen
+ copyTotal int // Total number of copy actions for the whole job
+ pullTotal int // Total number of pull actions for the whole job
+ copyNeeded int // Number of copy actions still pending
+ pullNeeded int // Number of block pulls still pending
+ copyOrigin int // Number of blocks copied from the original file
closed bool // Set when the file has been closed
mut sync.Mutex // Protects the above
}
@@ -79,7 +82,7 @@ func (s *sharedPullerState) tempFile() (*os.File, error) {
// Attempt to create the temp file
flags := os.O_WRONLY
- if !s.reuse {
+ if s.reused == 0 {
flags |= os.O_CREATE | os.O_EXCL
}
fd, err := os.OpenFile(s.tempName, flags, 0644)
@@ -149,7 +152,25 @@ func (s *sharedPullerState) copyDone() {
s.mut.Lock()
s.copyNeeded--
if debug {
- l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.pullNeeded)
+ l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
+ }
+ s.mut.Unlock()
+}
+
+func (s *sharedPullerState) copiedFromOrigin() {
+ s.mut.Lock()
+ s.copyOrigin++
+ s.mut.Unlock()
+}
+
+func (s *sharedPullerState) pullStarted() {
+ s.mut.Lock()
+ s.copyTotal--
+ s.copyNeeded--
+ s.pullTotal++
+ s.pullNeeded++
+ if debug {
+ l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
}
s.mut.Unlock()
}
@@ -158,7 +179,7 @@ func (s *sharedPullerState) pullDone() {
s.mut.Lock()
s.pullNeeded--
if debug {
- l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded ->", s.pullNeeded)
+ l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
}
s.mut.Unlock()
}
diff --git a/internal/protocol/message.go b/internal/protocol/message.go
index 800787b9..b4714505 100644
--- a/internal/protocol/message.go
+++ b/internal/protocol/message.go
@@ -54,6 +54,10 @@ func (f FileInfo) IsInvalid() bool {
return IsInvalid(f.Flags)
}
+func (f FileInfo) IsDirectory() bool {
+ return IsDirectory(f.Flags)
+}
+
// Used for unmarshalling a FileInfo structure but skipping the actual block list
type FileInfoTruncated struct {
Name string // max:8192