lib/model, lib/scanner: Efficient inserts/deletes in the middle of the file
GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3527
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/symlinks"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"github.com/syncthing/syncthing/lib/versioner"
|
||||
"github.com/syncthing/syncthing/lib/weakhash"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -92,6 +93,7 @@ type rwFolder struct {
|
||||
checkFreeSpace bool
|
||||
ignoreDelete bool
|
||||
fsync bool
|
||||
useWeakHash bool
|
||||
|
||||
copiers int
|
||||
pullers int
|
||||
@@ -128,6 +130,7 @@ func newRWFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Ver
|
||||
checkFreeSpace: cfg.MinDiskFreePct != 0,
|
||||
ignoreDelete: cfg.IgnoreDelete,
|
||||
fsync: cfg.Fsync,
|
||||
useWeakHash: !cfg.DisableWeakHash,
|
||||
|
||||
queue: newJobQueue(),
|
||||
pullTimer: time.NewTimer(time.Second),
|
||||
@@ -1169,7 +1172,7 @@ func (f *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
|
||||
created: time.Now(),
|
||||
}
|
||||
|
||||
l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), reused)
|
||||
l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused))
|
||||
|
||||
cs := copyBlocksState{
|
||||
sharedPullerState: &s,
|
||||
@@ -1231,6 +1234,21 @@ func (f *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
|
||||
}
|
||||
f.model.fmut.RUnlock()
|
||||
|
||||
var weakHashFinder *weakhash.Finder
|
||||
if f.useWeakHash {
|
||||
hashesToFind := make([]uint32, 0, len(state.blocks))
|
||||
for _, block := range state.blocks {
|
||||
if block.WeakHash != 0 {
|
||||
hashesToFind = append(hashesToFind, block.WeakHash)
|
||||
}
|
||||
}
|
||||
|
||||
weakHashFinder, err = weakhash.NewFinder(state.realName, protocol.BlockSize, hashesToFind)
|
||||
if err != nil {
|
||||
l.Debugln("weak hasher", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, block := range state.blocks {
|
||||
if f.allowSparse && state.reused == 0 && block.IsEmpty() {
|
||||
// The block is a block of all zeroes, and we are not reusing
|
||||
@@ -1245,45 +1263,70 @@ func (f *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
|
||||
}
|
||||
|
||||
buf = buf[:int(block.Size)]
|
||||
found := f.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
|
||||
inFile, err := rootedJoinedPath(folderRoots[folder], file)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
fd, err := os.Open(inFile)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
|
||||
fd.Close()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
hash, err := scanner.VerifyBuffer(buf, block)
|
||||
if err != nil {
|
||||
if hash != nil {
|
||||
l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
|
||||
err = f.model.finder.Fix(folder, file, index, block.Hash, hash)
|
||||
if err != nil {
|
||||
l.Warnln("finder fix:", err)
|
||||
}
|
||||
} else {
|
||||
l.Debugln("Finder failed to verify buffer", err)
|
||||
}
|
||||
return false
|
||||
found, err := weakHashFinder.Iterate(block.WeakHash, buf, func(offset int64) bool {
|
||||
if _, err := scanner.VerifyBuffer(buf, block); err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
_, err = dstFd.WriteAt(buf, block.Offset)
|
||||
if err != nil {
|
||||
state.fail("dst write", err)
|
||||
|
||||
}
|
||||
if file == state.file.Name {
|
||||
if offset == block.Offset {
|
||||
state.copiedFromOrigin()
|
||||
} else {
|
||||
state.copiedFromOriginShifted()
|
||||
}
|
||||
return true
|
||||
|
||||
return false
|
||||
})
|
||||
if err != nil {
|
||||
l.Debugln("weak hasher iter", err)
|
||||
}
|
||||
|
||||
if !found {
|
||||
found = f.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
|
||||
inFile, err := rootedJoinedPath(folderRoots[folder], file)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
fd, err := os.Open(inFile)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
|
||||
fd.Close()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
hash, err := scanner.VerifyBuffer(buf, block)
|
||||
if err != nil {
|
||||
if hash != nil {
|
||||
l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
|
||||
err = f.model.finder.Fix(folder, file, index, block.Hash, hash)
|
||||
if err != nil {
|
||||
l.Warnln("finder fix:", err)
|
||||
}
|
||||
} else {
|
||||
l.Debugln("Finder failed to verify buffer", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
_, err = dstFd.WriteAt(buf, block.Offset)
|
||||
if err != nil {
|
||||
state.fail("dst write", err)
|
||||
}
|
||||
if file == state.file.Name {
|
||||
state.copiedFromOrigin()
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
if state.failed() != nil {
|
||||
break
|
||||
@@ -1300,6 +1343,7 @@ func (f *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
|
||||
state.copyDone(block)
|
||||
}
|
||||
}
|
||||
weakHashFinder.Close()
|
||||
out <- state.sharedPullerState
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,12 +7,15 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/db"
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/scanner"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
@@ -72,6 +75,8 @@ func setUpRwFolder(model *Model) rwFolder {
|
||||
stateTracker: newStateTracker("default"),
|
||||
model: model,
|
||||
},
|
||||
|
||||
mtimeFS: fs.NewMtimeFS(db.NewNamespacedKV(model.db, "mtime")),
|
||||
dir: "testdata",
|
||||
queue: newJobQueue(),
|
||||
errors: make(map[string]string),
|
||||
@@ -199,7 +204,7 @@ func TestCopierFinder(t *testing.T) {
|
||||
|
||||
select {
|
||||
case <-pullChan:
|
||||
t.Fatal("Finisher channel has data to be read")
|
||||
t.Fatal("Pull channel has data to be read")
|
||||
case <-finisherChan:
|
||||
t.Fatal("Finisher channel has data to be read")
|
||||
default:
|
||||
@@ -240,6 +245,132 @@ func TestCopierFinder(t *testing.T) {
|
||||
os.Remove(tempFile)
|
||||
}
|
||||
|
||||
func TestWeakHash(t *testing.T) {
|
||||
tempFile := filepath.Join("testdata", defTempNamer.TempName("weakhash"))
|
||||
var shift int64 = 10
|
||||
var size int64 = 1 << 20
|
||||
expectBlocks := int(size / protocol.BlockSize)
|
||||
expectPulls := int(shift / protocol.BlockSize)
|
||||
if shift > 0 {
|
||||
expectPulls++
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
for _, path := range []string{tempFile, "testdata/weakhash"} {
|
||||
os.Remove(path)
|
||||
}
|
||||
}
|
||||
|
||||
cleanup()
|
||||
defer cleanup()
|
||||
|
||||
f, err := os.Create("testdata/weakhash")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer f.Close()
|
||||
_, err = io.CopyN(f, rand.Reader, size)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
info, err := f.Stat()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Create two files, second file has `shifted` bytes random prefix, yet
|
||||
// both are of the same length, for example:
|
||||
// File 1: abcdefgh
|
||||
// File 2: xyabcdef
|
||||
f.Seek(0, os.SEEK_SET)
|
||||
existing, err := scanner.Blocks(f, protocol.BlockSize, size, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
f.Seek(0, os.SEEK_SET)
|
||||
remainder := io.LimitReader(f, size-shift)
|
||||
prefix := io.LimitReader(rand.Reader, shift)
|
||||
nf := io.MultiReader(prefix, remainder)
|
||||
desired, err := scanner.Blocks(nf, protocol.BlockSize, size, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
existingFile := protocol.FileInfo{
|
||||
Name: "weakhash",
|
||||
Blocks: existing,
|
||||
Size: size,
|
||||
ModifiedS: info.ModTime().Unix(),
|
||||
ModifiedNs: int32(info.ModTime().Nanosecond()),
|
||||
}
|
||||
desiredFile := protocol.FileInfo{
|
||||
Name: "weakhash",
|
||||
Size: size,
|
||||
Blocks: desired,
|
||||
ModifiedS: info.ModTime().Unix() + 1,
|
||||
}
|
||||
|
||||
// Setup the model/pull environment
|
||||
m := setUpModel(existingFile)
|
||||
fo := setUpRwFolder(m)
|
||||
copyChan := make(chan copyBlocksState)
|
||||
pullChan := make(chan pullBlockState, expectBlocks)
|
||||
finisherChan := make(chan *sharedPullerState, 1)
|
||||
|
||||
// Run a single fetcher routine
|
||||
go fo.copierRoutine(copyChan, pullChan, finisherChan)
|
||||
|
||||
// Test 1 - no weak hashing, file gets fully repulled (`expectBlocks` pulls).
|
||||
fo.handleFile(desiredFile, copyChan, finisherChan)
|
||||
|
||||
var pulls []pullBlockState
|
||||
for len(pulls) < expectBlocks {
|
||||
select {
|
||||
case pull := <-pullChan:
|
||||
pulls = append(pulls, pull)
|
||||
case <-time.After(time.Second):
|
||||
t.Error("timed out")
|
||||
}
|
||||
}
|
||||
finish := <-finisherChan
|
||||
|
||||
select {
|
||||
case <-pullChan:
|
||||
t.Fatal("Pull channel has data to be read")
|
||||
case <-finisherChan:
|
||||
t.Fatal("Finisher channel has data to be read")
|
||||
default:
|
||||
}
|
||||
|
||||
finish.fd.Close()
|
||||
if err := os.Remove(tempFile); err != nil && !os.IsNotExist(err) {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Test 2 - using weak hash, expectPulls blocks pulled.
|
||||
fo.useWeakHash = true
|
||||
fo.handleFile(desiredFile, copyChan, finisherChan)
|
||||
|
||||
pulls = pulls[:0]
|
||||
for len(pulls) < expectPulls {
|
||||
select {
|
||||
case pull := <-pullChan:
|
||||
pulls = append(pulls, pull)
|
||||
case <-time.After(time.Second):
|
||||
t.Error("timed out")
|
||||
}
|
||||
}
|
||||
|
||||
finish = <-finisherChan
|
||||
finish.fd.Close()
|
||||
|
||||
expectShifted := expectBlocks - expectPulls
|
||||
if finish.copyOriginShifted != expectShifted {
|
||||
t.Errorf("did not copy %d shifted", expectShifted)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that updating a file removes it's old blocks from the blockmap
|
||||
func TestCopierCleanup(t *testing.T) {
|
||||
iterFn := func(folder, file string, index int32) bool {
|
||||
|
||||
@@ -31,30 +31,32 @@ type sharedPullerState struct {
|
||||
created time.Time
|
||||
|
||||
// Mutable, must be locked for access
|
||||
err error // The first error we hit
|
||||
fd *os.File // The fd of the temp file
|
||||
copyTotal int // Total number of copy actions for the whole job
|
||||
pullTotal int // Total number of pull actions for the whole job
|
||||
copyOrigin int // Number of blocks copied from the original file
|
||||
copyNeeded int // Number of copy actions still pending
|
||||
pullNeeded int // Number of block pulls still pending
|
||||
updated time.Time // Time when any of the counters above were last updated
|
||||
closed bool // True if the file has been finalClosed.
|
||||
available []int32 // Indexes of the blocks that are available in the temporary file
|
||||
availableUpdated time.Time // Time when list of available blocks was last updated
|
||||
mut sync.RWMutex // Protects the above
|
||||
err error // The first error we hit
|
||||
fd *os.File // The fd of the temp file
|
||||
copyTotal int // Total number of copy actions for the whole job
|
||||
pullTotal int // Total number of pull actions for the whole job
|
||||
copyOrigin int // Number of blocks copied from the original file
|
||||
copyOriginShifted int // Number of blocks copied from the original file but shifted
|
||||
copyNeeded int // Number of copy actions still pending
|
||||
pullNeeded int // Number of block pulls still pending
|
||||
updated time.Time // Time when any of the counters above were last updated
|
||||
closed bool // True if the file has been finalClosed.
|
||||
available []int32 // Indexes of the blocks that are available in the temporary file
|
||||
availableUpdated time.Time // Time when list of available blocks was last updated
|
||||
mut sync.RWMutex // Protects the above
|
||||
}
|
||||
|
||||
// A momentary state representing the progress of the puller
|
||||
type pullerProgress struct {
|
||||
Total int `json:"total"`
|
||||
Reused int `json:"reused"`
|
||||
CopiedFromOrigin int `json:"copiedFromOrigin"`
|
||||
CopiedFromElsewhere int `json:"copiedFromElsewhere"`
|
||||
Pulled int `json:"pulled"`
|
||||
Pulling int `json:"pulling"`
|
||||
BytesDone int64 `json:"bytesDone"`
|
||||
BytesTotal int64 `json:"bytesTotal"`
|
||||
Total int `json:"total"`
|
||||
Reused int `json:"reused"`
|
||||
CopiedFromOrigin int `json:"copiedFromOrigin"`
|
||||
CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
|
||||
CopiedFromElsewhere int `json:"copiedFromElsewhere"`
|
||||
Pulled int `json:"pulled"`
|
||||
Pulling int `json:"pulling"`
|
||||
BytesDone int64 `json:"bytesDone"`
|
||||
BytesTotal int64 `json:"bytesTotal"`
|
||||
}
|
||||
|
||||
// A lockedWriterAt synchronizes WriteAt calls with an external mutex.
|
||||
@@ -241,6 +243,14 @@ func (s *sharedPullerState) copiedFromOrigin() {
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) copiedFromOriginShifted() {
|
||||
s.mut.Lock()
|
||||
s.copyOrigin++
|
||||
s.copyOriginShifted++
|
||||
s.updated = time.Now()
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
func (s *sharedPullerState) pullStarted() {
|
||||
s.mut.Lock()
|
||||
s.copyTotal--
|
||||
|
||||
Reference in New Issue
Block a user