Add scan percentages (fixes #1030)
This commit is contained in:
@@ -19,24 +19,27 @@ import (
|
||||
// workers are used in parallel. The outbox will become closed when the inbox
|
||||
// is closed and all items handled.
|
||||
|
||||
func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo) {
|
||||
func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter *uint64, done chan struct{}) {
|
||||
wg := sync.NewWaitGroup()
|
||||
wg.Add(workers)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
hashFiles(dir, blockSize, outbox, inbox)
|
||||
hashFiles(dir, blockSize, outbox, inbox, counter)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
if done != nil {
|
||||
close(done)
|
||||
}
|
||||
close(outbox)
|
||||
}()
|
||||
}
|
||||
|
||||
func HashFile(path string, blockSize int) ([]protocol.BlockInfo, error) {
|
||||
func HashFile(path string, blockSize int, sizeHint int64, counter *uint64) ([]protocol.BlockInfo, error) {
|
||||
fd, err := os.Open(path)
|
||||
if err != nil {
|
||||
if debug {
|
||||
@@ -44,27 +47,29 @@ func HashFile(path string, blockSize int) ([]protocol.BlockInfo, error) {
|
||||
}
|
||||
return []protocol.BlockInfo{}, err
|
||||
}
|
||||
|
||||
fi, err := fd.Stat()
|
||||
if err != nil {
|
||||
fd.Close()
|
||||
if debug {
|
||||
l.Debugln("stat:", err)
|
||||
}
|
||||
return []protocol.BlockInfo{}, err
|
||||
}
|
||||
defer fd.Close()
|
||||
return Blocks(fd, blockSize, fi.Size())
|
||||
|
||||
if sizeHint == 0 {
|
||||
fi, err := fd.Stat()
|
||||
if err != nil {
|
||||
if debug {
|
||||
l.Debugln("stat:", err)
|
||||
}
|
||||
return []protocol.BlockInfo{}, err
|
||||
}
|
||||
sizeHint = fi.Size()
|
||||
}
|
||||
|
||||
return Blocks(fd, blockSize, sizeHint, counter)
|
||||
}
|
||||
|
||||
func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo) {
|
||||
func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter *uint64) {
|
||||
for f := range inbox {
|
||||
if f.IsDirectory() || f.IsDeleted() || f.IsSymlink() {
|
||||
outbox <- f
|
||||
continue
|
||||
if f.IsDirectory() || f.IsDeleted() {
|
||||
panic("Bug. Asked to hash a directory or a deleted file.")
|
||||
}
|
||||
|
||||
blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize)
|
||||
blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, f.CachedSize, counter)
|
||||
if err != nil {
|
||||
if debug {
|
||||
l.Debugln("hash error:", f.Name, err)
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/syncthing/protocol"
|
||||
)
|
||||
@@ -18,7 +19,7 @@ import (
|
||||
var SHA256OfNothing = []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}
|
||||
|
||||
// Blocks returns the blockwise hash of the reader.
|
||||
func Blocks(r io.Reader, blocksize int, sizehint int64) ([]protocol.BlockInfo, error) {
|
||||
func Blocks(r io.Reader, blocksize int, sizehint int64, counter *uint64) ([]protocol.BlockInfo, error) {
|
||||
var blocks []protocol.BlockInfo
|
||||
if sizehint > 0 {
|
||||
blocks = make([]protocol.BlockInfo, 0, int(sizehint/int64(blocksize)))
|
||||
@@ -36,6 +37,10 @@ func Blocks(r io.Reader, blocksize int, sizehint int64) ([]protocol.BlockInfo, e
|
||||
break
|
||||
}
|
||||
|
||||
if counter != nil {
|
||||
atomic.AddUint64(counter, uint64(n))
|
||||
}
|
||||
|
||||
b := protocol.BlockInfo{
|
||||
Size: int32(n),
|
||||
Offset: offset,
|
||||
|
||||
@@ -51,7 +51,7 @@ var blocksTestData = []struct {
|
||||
func TestBlocks(t *testing.T) {
|
||||
for _, test := range blocksTestData {
|
||||
buf := bytes.NewBuffer(test.data)
|
||||
blocks, err := Blocks(buf, test.blocksize, 0)
|
||||
blocks, err := Blocks(buf, test.blocksize, 0, nil)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -105,8 +105,8 @@ var diffTestData = []struct {
|
||||
|
||||
func TestDiff(t *testing.T) {
|
||||
for i, test := range diffTestData {
|
||||
a, _ := Blocks(bytes.NewBufferString(test.a), test.s, 0)
|
||||
b, _ := Blocks(bytes.NewBufferString(test.b), test.s, 0)
|
||||
a, _ := Blocks(bytes.NewBufferString(test.a), test.s, 0, nil)
|
||||
b, _ := Blocks(bytes.NewBufferString(test.b), test.s, 0, nil)
|
||||
_, d := BlockDiff(a, b)
|
||||
if len(d) != len(test.d) {
|
||||
t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d))
|
||||
|
||||
@@ -12,11 +12,13 @@ import (
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/syncthing/protocol"
|
||||
"github.com/syncthing/syncthing/lib/db"
|
||||
"github.com/syncthing/syncthing/lib/events"
|
||||
"github.com/syncthing/syncthing/lib/ignore"
|
||||
"github.com/syncthing/syncthing/lib/osutil"
|
||||
"github.com/syncthing/syncthing/lib/symlinks"
|
||||
@@ -39,6 +41,8 @@ func init() {
|
||||
}
|
||||
|
||||
type Walker struct {
|
||||
// Folder for which the walker has been created
|
||||
Folder string
|
||||
// Dir is the base directory for the walk
|
||||
Dir string
|
||||
// Limit walking to these paths within Dir, or no limit if Sub is empty
|
||||
@@ -66,6 +70,9 @@ type Walker struct {
|
||||
Hashers int
|
||||
// Our vector clock id
|
||||
ShortID uint64
|
||||
// Optional progress tick interval which defines how often FolderScanProgress
|
||||
// events are emitted. Negative number means disabled.
|
||||
ProgressTickIntervalS int
|
||||
}
|
||||
|
||||
type TempNamer interface {
|
||||
@@ -92,12 +99,13 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
files := make(chan protocol.FileInfo)
|
||||
hashedFiles := make(chan protocol.FileInfo)
|
||||
newParallelHasher(w.Dir, w.BlockSize, w.Hashers, hashedFiles, files)
|
||||
toHashChan := make(chan protocol.FileInfo)
|
||||
finishedChan := make(chan protocol.FileInfo)
|
||||
|
||||
// A routine which walks the filesystem tree, and sends files which have
|
||||
// been modified to the counter routine.
|
||||
go func() {
|
||||
hashFiles := w.walkAndHashFiles(files, hashedFiles)
|
||||
hashFiles := w.walkAndHashFiles(toHashChan, finishedChan)
|
||||
if len(w.Subs) == 0 {
|
||||
filepath.Walk(w.Dir, hashFiles)
|
||||
} else {
|
||||
@@ -105,10 +113,77 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
|
||||
filepath.Walk(filepath.Join(w.Dir, sub), hashFiles)
|
||||
}
|
||||
}
|
||||
close(files)
|
||||
close(toHashChan)
|
||||
}()
|
||||
|
||||
return hashedFiles, nil
|
||||
// We're not required to emit scan progress events, just kick off hashers,
|
||||
// and feed inputs directly from the walker.
|
||||
if w.ProgressTickIntervalS < 0 {
|
||||
newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil)
|
||||
return finishedChan, nil
|
||||
}
|
||||
|
||||
// Defaults to every 2 seconds.
|
||||
if w.ProgressTickIntervalS == 0 {
|
||||
w.ProgressTickIntervalS = 2
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Duration(w.ProgressTickIntervalS) * time.Second)
|
||||
|
||||
// We need to emit progress events, hence we create a routine which buffers
|
||||
// the list of files to be hashed, counts the total number of
|
||||
// bytes to hash, and once no more files need to be hashed (chan gets closed),
|
||||
// start a routine which periodically emits FolderScanProgress events,
|
||||
// until a stop signal is sent by the parallel hasher.
|
||||
// Parallel hasher is stopped by this routine when we close the channel over
|
||||
// which it receives the files we ask it to hash.
|
||||
go func() {
|
||||
var filesToHash []protocol.FileInfo
|
||||
var total, progress uint64
|
||||
for file := range toHashChan {
|
||||
filesToHash = append(filesToHash, file)
|
||||
total += uint64(file.CachedSize)
|
||||
}
|
||||
|
||||
realToHashChan := make(chan protocol.FileInfo)
|
||||
done := make(chan struct{})
|
||||
newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, &progress, done)
|
||||
|
||||
// A routine which actually emits the FolderScanProgress events
|
||||
// every w.ProgressTicker ticks, until the hasher routines terminate.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
if debug {
|
||||
l.Debugln("Walk progress done", w.Dir, w.Subs, w.BlockSize, w.Matcher)
|
||||
}
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
current := atomic.LoadUint64(&progress)
|
||||
if debug {
|
||||
l.Debugf("Walk %s %s current progress %d/%d (%d%%)", w.Dir, w.Subs, current, total, current*100/total)
|
||||
}
|
||||
events.Default.Log(events.FolderScanProgress, map[string]interface{}{
|
||||
"folder": w.Folder,
|
||||
"current": current,
|
||||
"total": total,
|
||||
})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for _, file := range filesToHash {
|
||||
if debug {
|
||||
l.Debugln("real to hash:", file.Name)
|
||||
}
|
||||
realToHashChan <- file
|
||||
}
|
||||
close(realToHashChan)
|
||||
}()
|
||||
|
||||
return finishedChan, nil
|
||||
}
|
||||
|
||||
func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.WalkFunc {
|
||||
@@ -241,7 +316,7 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.
|
||||
return skip
|
||||
}
|
||||
|
||||
blocks, err := Blocks(strings.NewReader(target), w.BlockSize, 0)
|
||||
blocks, err := Blocks(strings.NewReader(target), w.BlockSize, 0, nil)
|
||||
if err != nil {
|
||||
if debug {
|
||||
l.Debugln("hash link error:", p, err)
|
||||
@@ -272,10 +347,10 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.
|
||||
}
|
||||
|
||||
if debug {
|
||||
l.Debugln("symlink to hash:", p, f)
|
||||
l.Debugln("symlink changedb:", p, f)
|
||||
}
|
||||
|
||||
fchan <- f
|
||||
dchan <- f
|
||||
|
||||
return skip
|
||||
}
|
||||
@@ -349,10 +424,11 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.
|
||||
}
|
||||
|
||||
f := protocol.FileInfo{
|
||||
Name: rn,
|
||||
Version: cf.Version.Update(w.ShortID),
|
||||
Flags: flags,
|
||||
Modified: mtime.Unix(),
|
||||
Name: rn,
|
||||
Version: cf.Version.Update(w.ShortID),
|
||||
Flags: flags,
|
||||
Modified: mtime.Unix(),
|
||||
CachedSize: info.Size(),
|
||||
}
|
||||
if debug {
|
||||
l.Debugln("to hash:", p, f)
|
||||
|
||||
@@ -149,8 +149,9 @@ func TestVerify(t *testing.T) {
|
||||
// data should be an even multiple of blocksize long
|
||||
data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut e")
|
||||
buf := bytes.NewBuffer(data)
|
||||
var progress uint64
|
||||
|
||||
blocks, err := Blocks(buf, blocksize, 0)
|
||||
blocks, err := Blocks(buf, blocksize, 0, &progress)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -158,6 +159,10 @@ func TestVerify(t *testing.T) {
|
||||
t.Fatalf("Incorrect number of blocks %d != %d", len(blocks), exp)
|
||||
}
|
||||
|
||||
if uint64(len(data)) != progress {
|
||||
t.Fatalf("Incorrect counter value %d != %d", len(data), progress)
|
||||
}
|
||||
|
||||
buf = bytes.NewBuffer(data)
|
||||
err = Verify(buf, blocksize, blocks)
|
||||
t.Log(err)
|
||||
|
||||
Reference in New Issue
Block a user