Showing detailed sync progress (fixes #476)
based on commit by Audrius Butkevicius <audrius.butkevicius@gmail.com>
This commit is contained in:
@@ -82,9 +82,10 @@ type service interface {
|
||||
}
|
||||
|
||||
type Model struct {
|
||||
cfg *config.ConfigWrapper
|
||||
db *leveldb.DB
|
||||
finder *files.BlockFinder
|
||||
cfg *config.ConfigWrapper
|
||||
db *leveldb.DB
|
||||
finder *files.BlockFinder
|
||||
progressEmitter *ProgressEmitter
|
||||
|
||||
deviceName string
|
||||
clientName string
|
||||
@@ -142,7 +143,9 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s
|
||||
rawConn: make(map[protocol.DeviceID]io.Closer),
|
||||
deviceVer: make(map[protocol.DeviceID]string),
|
||||
finder: files.NewBlockFinder(db, cfg),
|
||||
progressEmitter: NewProgressEmitter(cfg),
|
||||
}
|
||||
go m.progressEmitter.Serve()
|
||||
|
||||
var timeout = 20 * 60 // seconds
|
||||
if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
|
||||
@@ -172,15 +175,16 @@ func (m *Model) StartFolderRW(folder string) {
|
||||
panic("cannot start already running folder " + folder)
|
||||
}
|
||||
p := &Puller{
|
||||
folder: folder,
|
||||
dir: cfg.Path,
|
||||
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
model: m,
|
||||
ignorePerms: cfg.IgnorePerms,
|
||||
lenientMtimes: cfg.LenientMtimes,
|
||||
copiers: cfg.Copiers,
|
||||
pullers: cfg.Pullers,
|
||||
finishers: cfg.Finishers,
|
||||
folder: folder,
|
||||
dir: cfg.Path,
|
||||
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
model: m,
|
||||
ignorePerms: cfg.IgnorePerms,
|
||||
lenientMtimes: cfg.LenientMtimes,
|
||||
progressEmitter: m.progressEmitter,
|
||||
copiers: cfg.Copiers,
|
||||
pullers: cfg.Pullers,
|
||||
finishers: cfg.Finishers,
|
||||
}
|
||||
m.folderRunners[folder] = p
|
||||
m.fmut.Unlock()
|
||||
@@ -392,6 +396,7 @@ func (m *Model) NeedSize(folder string) (files int, bytes int64) {
|
||||
return true
|
||||
})
|
||||
}
|
||||
bytes -= m.progressEmitter.BytesCompleted(folder)
|
||||
if debug {
|
||||
l.Debugf("%v NeedSize(%q): %d %d", m, folder, files, bytes)
|
||||
}
|
||||
|
||||
133
internal/model/progressemitter.go
Executable file
133
internal/model/progressemitter.go
Executable file
@@ -0,0 +1,133 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/config"
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
)
|
||||
|
||||
type ProgressEmitter struct {
|
||||
registry map[string]*sharedPullerState
|
||||
interval time.Duration
|
||||
last map[string]map[string]*pullerProgress
|
||||
mut sync.Mutex
|
||||
|
||||
timer *time.Timer
|
||||
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// Creates a new progress emitter which emits DownloadProgress events every
|
||||
// interval.
|
||||
func NewProgressEmitter(cfg *config.ConfigWrapper) *ProgressEmitter {
|
||||
t := &ProgressEmitter{
|
||||
stop: make(chan struct{}),
|
||||
registry: make(map[string]*sharedPullerState),
|
||||
last: make(map[string]map[string]*pullerProgress),
|
||||
timer: time.NewTimer(time.Millisecond),
|
||||
}
|
||||
t.Changed(cfg.Raw())
|
||||
cfg.Subscribe(t)
|
||||
return t
|
||||
}
|
||||
|
||||
// Starts progress emitter which starts emitting DownloadProgress events as
|
||||
// the progress happens.
|
||||
func (t *ProgressEmitter) Serve() {
|
||||
for {
|
||||
select {
|
||||
case <-t.stop:
|
||||
if debug {
|
||||
l.Debugln("progress emitter: stopping")
|
||||
}
|
||||
return
|
||||
case <-t.timer.C:
|
||||
if debug {
|
||||
l.Debugln("progress emitter: timer - looking after", len(t.registry))
|
||||
}
|
||||
output := make(map[string]map[string]*pullerProgress)
|
||||
t.mut.Lock()
|
||||
for _, puller := range t.registry {
|
||||
if output[puller.folder] == nil {
|
||||
output[puller.folder] = make(map[string]*pullerProgress)
|
||||
}
|
||||
output[puller.folder][puller.file.Name] = puller.Progress()
|
||||
}
|
||||
if !reflect.DeepEqual(t.last, output) {
|
||||
events.Default.Log(events.DownloadProgress, output)
|
||||
t.last = output
|
||||
if debug {
|
||||
l.Debugf("progress emitter: emitting %#v", output)
|
||||
}
|
||||
} else if debug {
|
||||
l.Debugln("progress emitter: nothing new")
|
||||
}
|
||||
if len(t.registry) != 0 {
|
||||
t.timer.Reset(t.interval)
|
||||
}
|
||||
t.mut.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Interface method to handle configuration changes
|
||||
func (t *ProgressEmitter) Changed(cfg config.Configuration) error {
|
||||
t.mut.Lock()
|
||||
defer t.mut.Unlock()
|
||||
|
||||
t.interval = time.Duration(cfg.Options.ProgressUpdateIntervalS) * time.Second
|
||||
if debug {
|
||||
l.Debugln("progress emitter: updated interval", t.interval)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stops the emitter.
|
||||
func (t *ProgressEmitter) Stop() {
|
||||
t.stop <- struct{}{}
|
||||
}
|
||||
|
||||
// Register a puller with the emitter which will start broadcasting pullers
|
||||
// progress.
|
||||
func (t *ProgressEmitter) Register(s *sharedPullerState) {
|
||||
t.mut.Lock()
|
||||
defer t.mut.Unlock()
|
||||
if debug {
|
||||
l.Debugln("progress emitter: registering", s.folder, s.file.Name)
|
||||
}
|
||||
if len(t.registry) == 0 {
|
||||
t.timer.Reset(t.interval)
|
||||
}
|
||||
t.registry[filepath.Join(s.folder, s.file.Name)] = s
|
||||
}
|
||||
|
||||
// Deregister a puller which will stop boardcasting pullers state.
|
||||
func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
|
||||
t.mut.Lock()
|
||||
defer t.mut.Unlock()
|
||||
if debug {
|
||||
l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
|
||||
}
|
||||
delete(t.registry, filepath.Join(s.folder, s.file.Name))
|
||||
}
|
||||
|
||||
// Returns number of bytes completed in the given folder.
|
||||
func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
|
||||
t.mut.Lock()
|
||||
defer t.mut.Unlock()
|
||||
|
||||
files, ok := t.last[folder]
|
||||
if ok {
|
||||
for _, s := range files {
|
||||
bytes += s.BytesDone
|
||||
}
|
||||
}
|
||||
if debug {
|
||||
l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
|
||||
}
|
||||
return
|
||||
}
|
||||
95
internal/model/progressemitter_test.go
Normal file
95
internal/model/progressemitter_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright (C) 2014 The Syncthing Authors.
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify it
|
||||
// under the terms of the GNU General Public License as published by the Free
|
||||
// Software Foundation, either version 3 of the License, or (at your option)
|
||||
// any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
// more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License along
|
||||
// with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/config"
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
)
|
||||
|
||||
var timeout = 10 * time.Millisecond
|
||||
|
||||
func expectEvent(w *events.Subscription, t *testing.T, size int) {
|
||||
event, err := w.Poll(timeout)
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error:", err)
|
||||
}
|
||||
if event.Type != events.DownloadProgress {
|
||||
t.Fatal("Unexpected event:", event)
|
||||
}
|
||||
data := event.Data.(map[string]map[string]*pullerProgress)
|
||||
if len(data) != size {
|
||||
t.Fatal("Unexpected event data size:", data)
|
||||
}
|
||||
}
|
||||
|
||||
func expectTimeout(w *events.Subscription, t *testing.T){
|
||||
_, err := w.Poll(timeout)
|
||||
if err != events.ErrTimeout {
|
||||
t.Fatal("Unexpected non-Timeout error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressEmitter(t *testing.T) {
|
||||
l.Debugln("test progress emitter")
|
||||
|
||||
w := events.Default.Subscribe(events.DownloadProgress)
|
||||
|
||||
c := config.Wrap("/tmp/test", config.Configuration{})
|
||||
c.SetOptions(config.OptionsConfiguration{
|
||||
ProgressUpdateIntervalS: 0,
|
||||
})
|
||||
|
||||
p := NewProgressEmitter(c)
|
||||
go p.Serve()
|
||||
|
||||
expectTimeout(w, t)
|
||||
|
||||
s := sharedPullerState{}
|
||||
p.Register(&s)
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
||||
s.copyDone()
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
||||
s.copiedFromOrigin()
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
||||
s.pullStarted()
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
||||
s.pullDone()
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
expectTimeout(w, t)
|
||||
|
||||
p.Deregister(&s)
|
||||
|
||||
expectEvent(w, t, 0)
|
||||
expectTimeout(w, t)
|
||||
|
||||
}
|
||||
@@ -65,17 +65,18 @@ var (
|
||||
)
|
||||
|
||||
type Puller struct {
|
||||
folder string
|
||||
dir string
|
||||
scanIntv time.Duration
|
||||
model *Model
|
||||
stop chan struct{}
|
||||
versioner versioner.Versioner
|
||||
ignorePerms bool
|
||||
lenientMtimes bool
|
||||
copiers int
|
||||
pullers int
|
||||
finishers int
|
||||
folder string
|
||||
dir string
|
||||
scanIntv time.Duration
|
||||
model *Model
|
||||
stop chan struct{}
|
||||
versioner versioner.Versioner
|
||||
ignorePerms bool
|
||||
lenientMtimes bool
|
||||
progressEmitter *ProgressEmitter
|
||||
copiers int
|
||||
pullers int
|
||||
finishers int
|
||||
}
|
||||
|
||||
// Serve will run scans and pulls. It will return when Stop()ed or on a
|
||||
@@ -527,9 +528,9 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
|
||||
folder: p.folder,
|
||||
tempName: tempName,
|
||||
realName: realName,
|
||||
copyTotal: len(blocks),
|
||||
copyNeeded: len(blocks),
|
||||
reused: reused,
|
||||
copyTotal: uint32(len(blocks)),
|
||||
copyNeeded: uint32(len(blocks)),
|
||||
reused: uint32(reused),
|
||||
}
|
||||
|
||||
if debug {
|
||||
@@ -598,6 +599,10 @@ nextFile:
|
||||
continue nextFile
|
||||
}
|
||||
|
||||
if p.progressEmitter != nil {
|
||||
p.progressEmitter.Register(state.sharedPullerState)
|
||||
}
|
||||
|
||||
evictionChan := make(chan lfu.Eviction)
|
||||
|
||||
fdCache := lfu.New()
|
||||
@@ -737,101 +742,109 @@ nextBlock:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Puller) performFinish(state *sharedPullerState) {
|
||||
if closed, err := state.finalClose(); closed {
|
||||
if debug {
|
||||
l.Debugln(p, "closing", state.file.Name)
|
||||
}
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Verify the file against expected hashes
|
||||
fd, err := os.Open(state.tempName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
return
|
||||
}
|
||||
err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks)
|
||||
fd.Close()
|
||||
if err != nil {
|
||||
l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)")
|
||||
return
|
||||
}
|
||||
|
||||
// Set the correct permission bits on the new file
|
||||
if !p.ignorePerms {
|
||||
err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Set the correct timestamp on the new file
|
||||
t := time.Unix(state.file.Modified, 0)
|
||||
err = os.Chtimes(state.tempName, t, t)
|
||||
if err != nil {
|
||||
if p.lenientMtimes {
|
||||
// We accept the failure with a warning here and allow the sync to
|
||||
// continue. We'll sync the new mtime back to the other devices later.
|
||||
// If they have the same problem & setting, we might never get in
|
||||
// sync.
|
||||
l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
|
||||
} else {
|
||||
l.Warnln("puller: final:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If we should use versioning, let the versioner archive the old
|
||||
// file before we replace it. Archiving a non-existent file is not
|
||||
// an error.
|
||||
if p.versioner != nil {
|
||||
err = p.versioner.Archive(state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If the target path is a symlink or a directory, we cannot copy
|
||||
// over it, hence remove it before proceeding.
|
||||
stat, err := os.Lstat(state.realName)
|
||||
isLink, _ := symlinks.IsSymlink(state.realName)
|
||||
if isLink || (err == nil && stat.IsDir()) {
|
||||
osutil.InWritableDir(os.Remove, state.realName)
|
||||
}
|
||||
// Replace the original content with the new one
|
||||
err = osutil.Rename(state.tempName, state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// If it's a symlink, the target of the symlink is inside the file.
|
||||
if state.file.IsSymlink() {
|
||||
content, err := ioutil.ReadFile(state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final: reading symlink:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove the file, and replace it with a symlink.
|
||||
err = osutil.InWritableDir(func(path string) error {
|
||||
os.Remove(path)
|
||||
return symlinks.Create(path, string(content), state.file.Flags)
|
||||
}, state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final: creating symlink:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Record the updated file in the index
|
||||
p.model.updateLocal(p.folder, state.file)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
|
||||
for state := range in {
|
||||
if closed, err := state.finalClose(); closed {
|
||||
if debug {
|
||||
l.Debugln(p, "closing", state.file.Name)
|
||||
}
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Verify the file against expected hashes
|
||||
fd, err := os.Open(state.tempName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
continue
|
||||
}
|
||||
err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks)
|
||||
fd.Close()
|
||||
if err != nil {
|
||||
l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)")
|
||||
continue
|
||||
}
|
||||
|
||||
// Set the correct permission bits on the new file
|
||||
if !p.ignorePerms {
|
||||
err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Set the correct timestamp on the new file
|
||||
t := time.Unix(state.file.Modified, 0)
|
||||
err = os.Chtimes(state.tempName, t, t)
|
||||
if err != nil {
|
||||
if p.lenientMtimes {
|
||||
// We accept the failure with a warning here and allow the sync to
|
||||
// continue. We'll sync the new mtime back to the other devices later.
|
||||
// If they have the same problem & setting, we might never get in
|
||||
// sync.
|
||||
l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
|
||||
} else {
|
||||
l.Warnln("puller: final:", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If we should use versioning, let the versioner archive the old
|
||||
// file before we replace it. Archiving a non-existent file is not
|
||||
// an error.
|
||||
if p.versioner != nil {
|
||||
err = p.versioner.Archive(state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If the target path is a symlink or a directory, we cannot copy
|
||||
// over it, hence remove it before proceeding.
|
||||
stat, err := os.Lstat(state.realName)
|
||||
isLink, _ := symlinks.IsSymlink(state.realName)
|
||||
if isLink || (err == nil && stat.IsDir()) {
|
||||
osutil.InWritableDir(os.Remove, state.realName)
|
||||
}
|
||||
// Replace the original content with the new one
|
||||
err = osutil.Rename(state.tempName, state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// If it's a symlink, the target of the symlink is inside the file.
|
||||
if state.file.IsSymlink() {
|
||||
content, err := ioutil.ReadFile(state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final: reading symlink:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove the file, and replace it with a symlink.
|
||||
err = osutil.InWritableDir(func(path string) error {
|
||||
os.Remove(path)
|
||||
return symlinks.Create(path, string(content), state.file.Flags)
|
||||
}, state.realName)
|
||||
if err != nil {
|
||||
l.Warnln("puller: final: creating symlink:", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Record the updated file in the index
|
||||
p.model.updateLocal(p.folder, state.file)
|
||||
p.performFinish(state)
|
||||
if state.closed && p.progressEmitter != nil {
|
||||
p.progressEmitter.Deregister(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,20 +31,32 @@ type sharedPullerState struct {
|
||||
folder string
|
||||
tempName string
|
||||
realName string
|
||||
reused int // Number of blocks reused from temporary file
|
||||
reused uint32 // Number of blocks reused from temporary file
|
||||
|
||||
// Mutable, must be locked for access
|
||||
err error // The first error we hit
|
||||
fd *os.File // The fd of the temp file
|
||||
copyTotal int // Total number of copy actions for the whole job
|
||||
pullTotal int // Total number of pull actions for the whole job
|
||||
copyNeeded int // Number of copy actions still pending
|
||||
pullNeeded int // Number of block pulls still pending
|
||||
copyOrigin int // Number of blocks copied from the original file
|
||||
copyTotal uint32 // Total number of copy actions for the whole job
|
||||
pullTotal uint32 // Total number of pull actions for the whole job
|
||||
copyOrigin uint32 // Number of blocks copied from the original file
|
||||
copyNeeded uint32 // Number of copy actions still pending
|
||||
pullNeeded uint32 // Number of block pulls still pending
|
||||
closed bool // Set when the file has been closed
|
||||
mut sync.Mutex // Protects the above
|
||||
}
|
||||
|
||||
// A momentary state representing the progress of the puller
|
||||
type pullerProgress struct {
|
||||
Total uint32
|
||||
Reused uint32
|
||||
CopiedFromOrigin uint32
|
||||
CopiedFromElsewhere uint32
|
||||
Pulled uint32
|
||||
Pulling uint32
|
||||
BytesDone int64
|
||||
BytesTotal int64
|
||||
}
|
||||
|
||||
// tempFile returns the fd for the temporary file, reusing an open fd
|
||||
// or creating the file as necessary.
|
||||
func (s *sharedPullerState) tempFile() (*os.File, error) {
|
||||
@@ -208,3 +220,21 @@ func (s *sharedPullerState) finalClose() (bool, error) {
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Returns the momentarily progress for the puller
|
||||
func (s *sharedPullerState) Progress() *pullerProgress {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
total := s.reused + s.copyTotal + s.pullTotal
|
||||
done := total - s.copyNeeded - s.pullNeeded
|
||||
return &pullerProgress{
|
||||
Total: total,
|
||||
Reused: s.reused,
|
||||
CopiedFromOrigin: s.copyOrigin,
|
||||
CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
|
||||
Pulled: s.pullTotal - s.pullNeeded,
|
||||
Pulling: s.pullNeeded,
|
||||
BytesTotal: protocol.BlocksToSize(total),
|
||||
BytesDone: protocol.BlocksToSize(done),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user