Push model data instead of pull (fixes #1434)
This commit is contained in:
@@ -49,6 +49,11 @@ var (
|
||||
eventSub *events.BufferedSubscription
|
||||
)
|
||||
|
||||
var (
|
||||
lastEventRequest time.Time
|
||||
lastEventRequestMut sync.Mutex
|
||||
)
|
||||
|
||||
func init() {
|
||||
l.AddHandler(logger.LevelWarn, showGuiError)
|
||||
sub := events.Default.Subscribe(events.AllEvents)
|
||||
@@ -179,6 +184,9 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
|
||||
ReadTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
csrv := &folderSummarySvc{model: m}
|
||||
go csrv.Serve()
|
||||
|
||||
go func() {
|
||||
err := srv.Serve(listener)
|
||||
if err != nil {
|
||||
@@ -293,8 +301,14 @@ func restGetCompletion(m *model.Model, w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) {
|
||||
var qs = r.URL.Query()
|
||||
var folder = qs.Get("folder")
|
||||
qs := r.URL.Query()
|
||||
folder := qs.Get("folder")
|
||||
res := folderSummary(m, folder)
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
json.NewEncoder(w).Encode(res)
|
||||
}
|
||||
|
||||
func folderSummary(m *model.Model, folder string) map[string]interface{} {
|
||||
var res = make(map[string]interface{})
|
||||
|
||||
res["invalid"] = cfg.Folders()[folder].Invalid
|
||||
@@ -322,8 +336,7 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
json.NewEncoder(w).Encode(res)
|
||||
return res
|
||||
}
|
||||
|
||||
func restPostOverride(m *model.Model, w http.ResponseWriter, r *http.Request) {
|
||||
@@ -598,6 +611,10 @@ func restGetEvents(w http.ResponseWriter, r *http.Request) {
|
||||
since, _ := strconv.Atoi(sinceStr)
|
||||
limit, _ := strconv.Atoi(limitStr)
|
||||
|
||||
lastEventRequestMut.Lock()
|
||||
lastEventRequest = time.Now()
|
||||
lastEventRequestMut.Unlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
|
||||
// Flush before blocking, to indicate that we've received the request
|
||||
|
||||
@@ -61,7 +61,10 @@ const (
|
||||
exitUpgrading = 4
|
||||
)
|
||||
|
||||
const bepProtocolName = "bep/1.0"
|
||||
const (
|
||||
bepProtocolName = "bep/1.0"
|
||||
pingEventInterval = time.Minute
|
||||
)
|
||||
|
||||
var l = logger.DefaultLogger
|
||||
|
||||
@@ -613,7 +616,7 @@ func syncthingMain() {
|
||||
}
|
||||
|
||||
events.Default.Log(events.StartupComplete, nil)
|
||||
go generateEvents()
|
||||
go generatePingEvents()
|
||||
|
||||
code := <-stop
|
||||
|
||||
@@ -701,9 +704,9 @@ func defaultConfig(myName string) config.Configuration {
|
||||
return newCfg
|
||||
}
|
||||
|
||||
func generateEvents() {
|
||||
func generatePingEvents() {
|
||||
for {
|
||||
time.Sleep(300 * time.Second)
|
||||
time.Sleep(pingEventInterval)
|
||||
events.Default.Log(events.Ping, nil)
|
||||
}
|
||||
}
|
||||
|
||||
159
cmd/syncthing/summarysvc.go
Normal file
159
cmd/syncthing/summarysvc.go
Normal file
@@ -0,0 +1,159 @@
|
||||
// Copyright (C) 2015 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
"github.com/syncthing/syncthing/internal/model"
|
||||
"github.com/thejerf/suture"
|
||||
)
|
||||
|
||||
// The folderSummarySvc adds summary information events (FolderSummary and
|
||||
// FolderCompletion) into the event stream at certain intervals.
|
||||
type folderSummarySvc struct {
|
||||
model *model.Model
|
||||
srv suture.Service
|
||||
stop chan struct{}
|
||||
|
||||
// For keeping track of folders to recalculate for
|
||||
foldersMut sync.Mutex
|
||||
folders map[string]struct{}
|
||||
}
|
||||
|
||||
func (c *folderSummarySvc) Serve() {
|
||||
srv := suture.NewSimple("folderSummarySvc")
|
||||
srv.Add(serviceFunc(c.listenForUpdates))
|
||||
srv.Add(serviceFunc(c.calculateSummaries))
|
||||
|
||||
c.stop = make(chan struct{})
|
||||
c.folders = make(map[string]struct{})
|
||||
c.srv = srv
|
||||
|
||||
srv.Serve()
|
||||
}
|
||||
|
||||
func (c *folderSummarySvc) Stop() {
|
||||
// c.srv.Stop() is mostly a no-op here, but we need to call it anyway so
|
||||
// c.srv doesn't try to restart the serviceFuncs when they exit after we
|
||||
// close the stop channel.
|
||||
c.srv.Stop()
|
||||
close(c.stop)
|
||||
}
|
||||
|
||||
// listenForUpdates subscribes to the event bus and makes note of folders that
|
||||
// need their data recalculated.
|
||||
func (c *folderSummarySvc) listenForUpdates() {
|
||||
sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated)
|
||||
defer events.Default.Unsubscribe(sub)
|
||||
|
||||
for {
|
||||
// This loop needs to be fast so we don't miss too many events.
|
||||
|
||||
select {
|
||||
case ev := <-sub.C():
|
||||
// Whenever the local or remote index is updated for a given
|
||||
// folder we make a note of it.
|
||||
|
||||
data := ev.Data.(map[string]interface{})
|
||||
folder := data["folder"].(string)
|
||||
c.foldersMut.Lock()
|
||||
c.folders[folder] = struct{}{}
|
||||
c.foldersMut.Unlock()
|
||||
|
||||
case <-c.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculateSummaries periodically recalculates folder summaries and
|
||||
// completion percentage, and sends the results on the event bus.
|
||||
func (c *folderSummarySvc) calculateSummaries() {
|
||||
const pumpInterval = 2 * time.Second
|
||||
pump := time.NewTimer(pumpInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-pump.C:
|
||||
// We only recalculate sumamries if someone is listening to events
|
||||
// (a request to /rest/events has been made within the last
|
||||
// pingEventInterval).
|
||||
|
||||
lastEventRequestMut.Lock()
|
||||
// XXX: Reaching out to a global var here is very ugly :( Should
|
||||
// we make the gui stuff a proper object with methods on it that
|
||||
// we can query about this kind of thing?
|
||||
last := lastEventRequest
|
||||
lastEventRequestMut.Unlock()
|
||||
|
||||
t0 := time.Now()
|
||||
if time.Since(last) < pingEventInterval {
|
||||
for _, folder := range c.foldersToHandle() {
|
||||
// The folder summary contains how many bytes, files etc
|
||||
// are in the folder and how in sync we are.
|
||||
data := folderSummary(c.model, folder)
|
||||
events.Default.Log(events.FolderSummary, map[string]interface{}{
|
||||
"folder": folder,
|
||||
"summary": data,
|
||||
})
|
||||
|
||||
for _, devCfg := range cfg.Folders()[folder].Devices {
|
||||
if devCfg.DeviceID.Equals(myID) {
|
||||
// We already know about ourselves.
|
||||
continue
|
||||
}
|
||||
if !c.model.ConnectedTo(devCfg.DeviceID) {
|
||||
// We're not interested in disconnected devices.
|
||||
continue
|
||||
}
|
||||
|
||||
// Get completion percentage of this folder for the
|
||||
// remote device.
|
||||
comp := c.model.Completion(devCfg.DeviceID, folder)
|
||||
events.Default.Log(events.FolderCompletion, map[string]interface{}{
|
||||
"folder": folder,
|
||||
"device": devCfg.DeviceID.String(),
|
||||
"completion": comp,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We don't want to spend all our time calculating summaries. Lets
|
||||
// set an arbitrary limit at not spending more than about 30% of
|
||||
// our time here...
|
||||
wait := 2*time.Since(t0) + pumpInterval
|
||||
pump.Reset(wait)
|
||||
|
||||
case <-c.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// foldersToHandle returns the list of folders needing a summary update, and
|
||||
// clears the list.
|
||||
func (c *folderSummarySvc) foldersToHandle() []string {
|
||||
c.foldersMut.Lock()
|
||||
res := make([]string, 0, len(c.folders))
|
||||
for folder := range c.folders {
|
||||
res = append(res, folder)
|
||||
delete(c.folders, folder)
|
||||
}
|
||||
c.foldersMut.Unlock()
|
||||
return res
|
||||
}
|
||||
|
||||
// serviceFunc wraps a function to create a suture.Service without stop
|
||||
// functionality.
|
||||
type serviceFunc func()
|
||||
|
||||
func (f serviceFunc) Serve() { f() }
|
||||
func (f serviceFunc) Stop() {}
|
||||
Reference in New Issue
Block a user