diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index a2ad531f..883ce921 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -1201,9 +1201,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch continue } - if f.model.progressEmitter != nil { - f.model.progressEmitter.Register(state.sharedPullerState) - } + f.model.progressEmitter.Register(state.sharedPullerState) folderFilesystems := make(map[string]fs.Filesystem) var folders []string @@ -1550,9 +1548,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpda blockStatsMut.Unlock() } - if f.model.progressEmitter != nil { - f.model.progressEmitter.Deregister(state) - } + f.model.progressEmitter.Deregister(state) events.Default.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, diff --git a/lib/model/model.go b/lib/model/model.go index 70fee0ed..f06ad61b 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -218,9 +218,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio fmut: sync.NewRWMutex(), pmut: sync.NewRWMutex(), } - if cfg.Options().ProgressUpdateIntervalS > -1 { - m.Add(m.progressEmitter) - } + m.Add(m.progressEmitter) scanLimiter.setCapacity(cfg.Options().MaxConcurrentScans) cfg.Subscribe(m) diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index d2b77d97..d905c6a6 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -17,11 +17,13 @@ import ( ) type ProgressEmitter struct { - registry map[string]*sharedPullerState + registry map[string]map[string]*sharedPullerState // folder: name: puller interval time.Duration minBlocks int sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages. - connections map[string][]protocol.Connection + connections map[protocol.DeviceID]protocol.Connection + foldersByConns map[protocol.DeviceID][]string + disabled bool mut sync.Mutex timer *time.Timer @@ -34,10 +36,11 @@ type ProgressEmitter struct { func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter { t := &ProgressEmitter{ stop: make(chan struct{}), - registry: make(map[string]*sharedPullerState), + registry: make(map[string]map[string]*sharedPullerState), timer: time.NewTimer(time.Millisecond), sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), - connections: make(map[string][]protocol.Connection), + connections: make(map[protocol.DeviceID]protocol.Connection), + foldersByConns: make(map[protocol.DeviceID][]string), mut: sync.NewMutex(), } @@ -62,20 +65,21 @@ func (t *ProgressEmitter) Serve() { l.Debugln("progress emitter: timer - looking after", len(t.registry)) newLastUpdated := lastUpdate - newCount = len(t.registry) - for _, puller := range t.registry { - updated := puller.Updated() - if updated.After(newLastUpdated) { - newLastUpdated = updated + newCount = t.lenRegistryLocked() + for _, pullers := range t.registry { + for _, puller := range pullers { + if updated := puller.Updated(); updated.After(newLastUpdated) { + newLastUpdated = updated + } } } if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount { lastUpdate = newLastUpdated lastCount = newCount - t.sendDownloadProgressEvent() + t.sendDownloadProgressEventLocked() if len(t.connections) > 0 { - t.sendDownloadProgressMessages() + t.sendDownloadProgressMessagesLocked() } } else { l.Debugln("progress emitter: nothing new") @@ -89,30 +93,29 @@ func (t *ProgressEmitter) Serve() { } } -func (t *ProgressEmitter) sendDownloadProgressEvent() { - // registry lock already held +func (t *ProgressEmitter) sendDownloadProgressEventLocked() { output := make(map[string]map[string]*pullerProgress) - for _, puller := range t.registry { - if output[puller.folder] == nil { - output[puller.folder] = make(map[string]*pullerProgress) + for folder, pullers := range t.registry { + if len(pullers) == 0 { + continue + } + output[folder] = make(map[string]*pullerProgress) + for name, puller := range pullers { + output[folder][name] = puller.Progress() } - output[puller.folder][puller.file.Name] = puller.Progress() } events.Default.Log(events.DownloadProgress, output) l.Debugf("progress emitter: emitting %#v", output) } -func (t *ProgressEmitter) sendDownloadProgressMessages() { - // registry lock already held - sharedFolders := make(map[protocol.DeviceID][]string) - deviceConns := make(map[protocol.DeviceID]protocol.Connection) - subscribers := t.connections - for folder, conns := range subscribers { - for _, conn := range conns { - id := conn.ID() - - deviceConns[id] = conn - sharedFolders[id] = append(sharedFolders[id], folder) +func (t *ProgressEmitter) sendDownloadProgressMessagesLocked() { + for id, conn := range t.connections { + for _, folder := range t.foldersByConns[id] { + pullers, ok := t.registry[folder] + if !ok { + // There's never been any puller registered for this folder yet + continue + } state, ok := t.sentDownloadStates[id] if !ok { @@ -122,8 +125,8 @@ func (t *ProgressEmitter) sendDownloadProgressMessages() { t.sentDownloadStates[id] = state } - var activePullers []*sharedPullerState - for _, puller := range t.registry { + activePullers := make([]*sharedPullerState, 0, len(pullers)) + for _, puller := range pullers { if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks { continue } @@ -143,7 +146,7 @@ func (t *ProgressEmitter) sendDownloadProgressMessages() { // Clean up sentDownloadStates for devices which we are no longer connected to. for id := range t.sentDownloadStates { - _, ok := deviceConns[id] + _, ok := t.connections[id] if !ok { // Null out outstanding entries for device delete(t.sentDownloadStates, id) @@ -152,13 +155,12 @@ func (t *ProgressEmitter) sendDownloadProgressMessages() { // If a folder was unshared from some device, tell it that all temp files // are now gone. - for id, sharedDeviceFolders := range sharedFolders { - state := t.sentDownloadStates[id] - nextFolder: + for id, state := range t.sentDownloadStates { // For each of the folders that the state is aware of, // try to match it with a shared folder we've discovered above, + nextFolder: for _, folder := range state.folders() { - for _, existingFolder := range sharedDeviceFolders { + for _, existingFolder := range t.foldersByConns[id] { if existingFolder == folder { continue nextFolder } @@ -189,12 +191,23 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo t.mut.Lock() defer t.mut.Unlock() - t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second - if t.interval < time.Second { - t.interval = time.Second + switch { + case t.disabled && to.Options.ProgressUpdateIntervalS >= 0: + t.disabled = false + l.Debugln("progress emitter: enabled") + fallthrough + case !t.disabled && from.Options.ProgressUpdateIntervalS != to.Options.ProgressUpdateIntervalS: + t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second + if t.interval < time.Second { + t.interval = time.Second + } + l.Debugln("progress emitter: updated interval", t.interval) + case !t.disabled && to.Options.ProgressUpdateIntervalS < 0: + t.clearLocked() + t.disabled = true + l.Debugln("progress emitter: disabled") } t.minBlocks = to.Options.TempIndexMinBlocks - l.Debugln("progress emitter: updated interval", t.interval) return true } @@ -209,13 +222,18 @@ func (t *ProgressEmitter) Stop() { func (t *ProgressEmitter) Register(s *sharedPullerState) { t.mut.Lock() defer t.mut.Unlock() + if t.disabled { + l.Debugln("progress emitter: disabled, skip registering") + return + } l.Debugln("progress emitter: registering", s.folder, s.file.Name) - if len(t.registry) == 0 { + if t.emptyLocked() { t.timer.Reset(t.interval) } - // Separate the folder ID (arbitrary string) and the file name by "//" - // because it never appears in a valid file name. - t.registry[s.folder+"//"+s.file.Name] = s + if _, ok := t.registry[s.folder]; !ok { + t.registry[s.folder] = make(map[string]*sharedPullerState) + } + t.registry[s.folder][s.file.Name] = s } // Deregister a puller which will stop broadcasting pullers state. @@ -223,9 +241,13 @@ func (t *ProgressEmitter) Deregister(s *sharedPullerState) { t.mut.Lock() defer t.mut.Unlock() - l.Debugln("progress emitter: deregistering", s.folder, s.file.Name) + if t.disabled { + l.Debugln("progress emitter: disabled, skip deregistering") + return + } - delete(t.registry, s.folder+"//"+s.file.Name) + l.Debugln("progress emitter: deregistering", s.folder, s.file.Name) + delete(t.registry[s.folder], s.file.Name) } // BytesCompleted returns the number of bytes completed in the given folder. @@ -233,10 +255,8 @@ func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) { t.mut.Lock() defer t.mut.Unlock() - for _, s := range t.registry { - if s.folder == folder { - bytes += s.Progress().BytesDone - } + for _, s := range t.registry[folder] { + bytes += s.Progress().BytesDone } l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes) return @@ -249,40 +269,53 @@ func (t *ProgressEmitter) String() string { func (t *ProgressEmitter) lenRegistry() int { t.mut.Lock() defer t.mut.Unlock() - return len(t.registry) + return t.lenRegistryLocked() +} + +func (t *ProgressEmitter) lenRegistryLocked() (out int) { + for _, pullers := range t.registry { + out += len(pullers) + } + return out +} + +func (t *ProgressEmitter) emptyLocked() bool { + for _, pullers := range t.registry { + if len(pullers) != 0 { + return false + } + } + return true } func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) { t.mut.Lock() - for _, folder := range folders { - t.connections[folder] = append(t.connections[folder], conn) - } - t.mut.Unlock() + defer t.mut.Unlock() + t.connections[conn.ID()] = conn + t.foldersByConns[conn.ID()] = folders } func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) { t.mut.Lock() - left := make(map[string][]protocol.Connection, len(t.connections)) - for folder, conns := range t.connections { - connsLeft := connsWithout(conns, conn) - if len(connsLeft) > 0 { - left[folder] = connsLeft - } - } - t.connections = left - t.mut.Unlock() + defer t.mut.Unlock() + delete(t.connections, conn.ID()) + delete(t.foldersByConns, conn.ID()) } -func connsWithout(conns []protocol.Connection, conn protocol.Connection) []protocol.Connection { - if len(conns) == 0 { - return nil - } - - newConns := make([]protocol.Connection, 0, len(conns)-1) - for _, existingConn := range conns { - if existingConn != conn { - newConns = append(newConns, existingConn) +func (t *ProgressEmitter) clearLocked() { + for id, state := range t.sentDownloadStates { + conn, ok := t.connections[id] + if !ok { + continue + } + for _, folder := range state.folders() { + if updates := state.cleanup(folder); len(updates) > 0 { + conn.DownloadProgress(folder, updates) + } } } - return newConns + t.registry = make(map[string]map[string]*sharedPullerState) + t.sentDownloadStates = make(map[protocol.DeviceID]*sentDownloadState) + t.connections = make(map[protocol.DeviceID]protocol.Connection) + t.foldersByConns = make(map[protocol.DeviceID][]string) } diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index 5e60dae0..55df0803 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -114,6 +114,9 @@ func TestSendDownloadProgressMessages(t *testing.T) { p := NewProgressEmitter(c) p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"}) + p.registry["folder"] = make(map[string]*sharedPullerState) + p.registry["folder2"] = make(map[string]*sharedPullerState) + p.registry["folderXXX"] = make(map[string]*sharedPullerState) expect := func(updateIdx int, state *sharedPullerState, updateType protocol.FileDownloadProgressUpdateType, version protocol.Vector, blocks []int32, remove bool) { messageIdx := -1 @@ -202,39 +205,39 @@ func TestSendDownloadProgressMessages(t *testing.T) { mut: sync.NewRWMutex(), availableUpdated: time.Now(), } - p.registry["1"] = state1 + p.registry["folder"]["1"] = state1 // Has no blocks, hence no message is sent - p.sendDownloadProgressMessages() + sendMsgs(p) expectEmpty() // Returns update for puller with new extra blocks state1.available = []int32{1} - p.sendDownloadProgressMessages() + sendMsgs(p) expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true) expectEmpty() // Does nothing if nothing changes - p.sendDownloadProgressMessages() + sendMsgs(p) expectEmpty() // Does nothing if timestamp updated, but no new blocks (should never happen) state1.availableUpdated = tick() - p.sendDownloadProgressMessages() + sendMsgs(p) expectEmpty() // Does not return an update if date blocks change but date does not (should never happen) state1.available = []int32{1, 2} - p.sendDownloadProgressMessages() + sendMsgs(p) expectEmpty() // If the date and blocks changes, returns only the diff state1.availableUpdated = tick() - p.sendDownloadProgressMessages() + sendMsgs(p) expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true) expectEmpty() @@ -242,7 +245,7 @@ func TestSendDownloadProgressMessages(t *testing.T) { // Returns forget and update if puller version has changed state1.file.Version = v2 - p.sendDownloadProgressMessages() + sendMsgs(p) expect(0, state1, protocol.UpdateTypeForget, v1, nil, false) expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true) @@ -254,7 +257,7 @@ func TestSendDownloadProgressMessages(t *testing.T) { state1.availableUpdated = tick() state1.created = tick() - p.sendDownloadProgressMessages() + sendMsgs(p) expect(0, state1, protocol.UpdateTypeForget, v2, nil, false) expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1}, true) @@ -265,7 +268,7 @@ func TestSendDownloadProgressMessages(t *testing.T) { state1.available = nil state1.availableUpdated = tick() - p.sendDownloadProgressMessages() + sendMsgs(p) expect(0, state1, protocol.UpdateTypeForget, v2, nil, false) expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true) @@ -308,11 +311,11 @@ func TestSendDownloadProgressMessages(t *testing.T) { available: []int32{1, 2, 3}, availableUpdated: time.Now(), } - p.registry["2"] = state2 - p.registry["3"] = state3 - p.registry["4"] = state4 + p.registry["folder2"]["2"] = state2 + p.registry["folder"]["3"] = state3 + p.registry["folder2"]["4"] = state4 - p.sendDownloadProgressMessages() + sendMsgs(p) expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false) expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true) @@ -326,10 +329,10 @@ func TestSendDownloadProgressMessages(t *testing.T) { state2.available = []int32{1, 2, 3, 4, 5} state2.availableUpdated = tick() - delete(p.registry, "3") - delete(p.registry, "4") + delete(p.registry["folder"], "3") + delete(p.registry["folder2"], "4") - p.sendDownloadProgressMessages() + sendMsgs(p) expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false) expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true) @@ -338,8 +341,8 @@ func TestSendDownloadProgressMessages(t *testing.T) { expectEmpty() // Deletions are sent only once (actual bug I found writing the tests) - p.sendDownloadProgressMessages() - p.sendDownloadProgressMessages() + sendMsgs(p) + sendMsgs(p) expectEmpty() // Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers @@ -392,31 +395,31 @@ func TestSendDownloadProgressMessages(t *testing.T) { available: []int32{1, 2, 3}, availableUpdated: time.Now(), } - p.registry["5"] = state5 - p.registry["6"] = state6 - p.registry["7"] = state7 - p.registry["8"] = state8 + p.registry["folder"]["5"] = state5 + p.registry["folder"]["6"] = state6 + p.registry["folderXXX"]["7"] = state7 + p.registry["folder"]["8"] = state8 - p.sendDownloadProgressMessages() + sendMsgs(p) expectEmpty() // Device is no longer subscribed to a particular folder - delete(p.registry, "1") // Clean up first - delete(p.registry, "2") // Clean up first + delete(p.registry["folder"], "1") // Clean up first + delete(p.registry["folder2"], "2") // Clean up first - p.sendDownloadProgressMessages() + sendMsgs(p) expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true) expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true) expectEmpty() - p.registry["1"] = state1 - p.registry["2"] = state2 - p.registry["3"] = state3 - p.registry["4"] = state4 + p.registry["folder"]["1"] = state1 + p.registry["folder2"]["2"] = state2 + p.registry["folder"]["3"] = state3 + p.registry["folder2"]["4"] = state4 - p.sendDownloadProgressMessages() + sendMsgs(p) expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false) expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true) @@ -427,7 +430,7 @@ func TestSendDownloadProgressMessages(t *testing.T) { p.temporaryIndexUnsubscribe(fc) p.temporaryIndexSubscribe(fc, []string{"folder"}) - p.sendDownloadProgressMessages() + sendMsgs(p) // See progressemitter.go for explanation why this is commented out. // Search for state.cleanup @@ -439,9 +442,15 @@ func TestSendDownloadProgressMessages(t *testing.T) { // Cleanup when device no longer exists p.temporaryIndexUnsubscribe(fc) - p.sendDownloadProgressMessages() + sendMsgs(p) _, ok := p.sentDownloadStates[fc.ID()] if ok { t.Error("Should not be there") } } + +func sendMsgs(p *ProgressEmitter) { + p.mut.Lock() + defer p.mut.Unlock() + p.sendDownloadProgressMessagesLocked() +}