all: Add folder pause, make pauses permanent (fixes #3407, fixes #215, fixes #3001)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3520
This commit is contained in:
Audrius Butkevicius
2016-12-21 18:41:25 +00:00
committed by Jakob Borg
parent 0725e3af38
commit bab7c8ebbf
20 changed files with 548 additions and 307 deletions

View File

@@ -93,12 +93,12 @@ type Model struct {
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
fmut sync.RWMutex // protects the above
conn map[protocol.DeviceID]connections.Connection
closed map[protocol.DeviceID]chan struct{}
helloMessages map[protocol.DeviceID]protocol.HelloResult
devicePaused map[protocol.DeviceID]bool
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
pmut sync.RWMutex // protects the above
conn map[protocol.DeviceID]connections.Connection
closed map[protocol.DeviceID]chan struct{}
helloMessages map[protocol.DeviceID]protocol.HelloResult
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders
pmut sync.RWMutex // protects the above
}
type folderFactory func(*Model, config.FolderConfiguration, versioner.Versioner, *fs.MtimeFS) service
@@ -134,33 +134,33 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
l.Debugln(line)
},
}),
cfg: cfg,
db: ldb,
finder: db.NewBlockFinder(ldb),
progressEmitter: NewProgressEmitter(cfg),
id: id,
shortID: id.Short(),
cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles,
protectedFiles: protectedFiles,
deviceName: deviceName,
clientName: clientName,
clientVersion: clientVersion,
folderCfgs: make(map[string]config.FolderConfiguration),
folderFiles: make(map[string]*db.FileSet),
folderDevices: make(folderDeviceSet),
deviceFolders: make(map[protocol.DeviceID][]string),
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
folderIgnores: make(map[string]*ignore.Matcher),
folderRunners: make(map[string]service),
folderRunnerTokens: make(map[string][]suture.ServiceToken),
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
conn: make(map[protocol.DeviceID]connections.Connection),
closed: make(map[protocol.DeviceID]chan struct{}),
helloMessages: make(map[protocol.DeviceID]protocol.HelloResult),
devicePaused: make(map[protocol.DeviceID]bool),
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
fmut: sync.NewRWMutex(),
pmut: sync.NewRWMutex(),
cfg: cfg,
db: ldb,
finder: db.NewBlockFinder(ldb),
progressEmitter: NewProgressEmitter(cfg),
id: id,
shortID: id.Short(),
cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles,
protectedFiles: protectedFiles,
deviceName: deviceName,
clientName: clientName,
clientVersion: clientVersion,
folderCfgs: make(map[string]config.FolderConfiguration),
folderFiles: make(map[string]*db.FileSet),
folderDevices: make(folderDeviceSet),
deviceFolders: make(map[protocol.DeviceID][]string),
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
folderIgnores: make(map[string]*ignore.Matcher),
folderRunners: make(map[string]service),
folderRunnerTokens: make(map[string][]suture.ServiceToken),
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
conn: make(map[protocol.DeviceID]connections.Connection),
closed: make(map[protocol.DeviceID]chan struct{}),
helloMessages: make(map[protocol.DeviceID]protocol.HelloResult),
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
remotePausedFolders: make(map[protocol.DeviceID][]string),
fmut: sync.NewRWMutex(),
pmut: sync.NewRWMutex(),
}
if cfg.Options().ProgressUpdateIntervalS > -1 {
go m.progressEmitter.Serve()
@@ -183,8 +183,10 @@ func (m *Model) StartDeadlockDetector(timeout time.Duration) {
// StartFolder constructs the folder service and starts it.
func (m *Model) StartFolder(folder string) {
m.fmut.Lock()
m.pmut.Lock()
folderType := m.startFolderLocked(folder)
folderCfg := m.folderCfgs[folder]
m.pmut.Unlock()
m.fmut.Unlock()
l.Infof("Ready to synchronize %s (%s)", folderCfg.Description(), folderType)
@@ -218,6 +220,11 @@ func (m *Model) startFolderLocked(folder string) config.FolderType {
}
}
// Close connections to affected devices
for _, id := range cfg.DeviceIDs() {
m.closeLocked(id)
}
v, ok := fs.Sequence(protocol.LocalDeviceID), true
indexHasFiles := ok && v > 0
if !indexHasFiles {
@@ -366,13 +373,16 @@ func (m *Model) RestartFolder(cfg config.FolderConfiguration) {
m.pmut.Lock()
m.tearDownFolderLocked(cfg.ID)
m.addFolderLocked(cfg)
folderType := m.startFolderLocked(cfg.ID)
if !cfg.Paused {
m.addFolderLocked(cfg)
folderType := m.startFolderLocked(cfg.ID)
l.Infoln("Restarted folder", cfg.Description(), fmt.Sprintf("(%s)", folderType))
} else {
l.Infoln("Paused folder", cfg.Description())
}
m.pmut.Unlock()
m.fmut.Unlock()
l.Infoln("Restarted folder", cfg.ID, fmt.Sprintf("(%s)", folderType))
}
type ConnectionInfo struct {
@@ -405,7 +415,7 @@ func (m *Model) ConnectionStats() map[string]interface{} {
res := make(map[string]interface{})
devs := m.cfg.Devices()
conns := make(map[string]ConnectionInfo, len(devs))
for device := range devs {
for device, deviceCfg := range devs {
hello := m.helloMessages[device]
versionString := hello.ClientVersion
if hello.ClientName != "syncthing" {
@@ -413,7 +423,7 @@ func (m *Model) ConnectionStats() map[string]interface{} {
}
ci := ConnectionInfo{
ClientVersion: strings.TrimSpace(versionString),
Paused: m.devicePaused[device],
Paused: deviceCfg.Paused,
}
if conn, ok := m.conn[device]; ok {
ci.Type = conn.Type()
@@ -783,7 +793,17 @@ func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
}
m.fmut.Lock()
var paused []string
for _, folder := range cm.Folders {
if folder.Paused {
paused = append(paused, folder.ID)
continue
}
if cfg, ok := m.cfg.Folder(folder.ID); ok && cfg.Paused {
continue
}
if !m.folderSharedWithLocked(folder.ID, deviceID) {
events.Default.Log(events.FolderRejected, map[string]string{
"folder": folder.ID,
@@ -871,6 +891,10 @@ func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
go sendIndexes(conn, folder.ID, fs, m.folderIgnores[folder.ID], startSequence, dbLocation, dropSymlinks)
}
m.pmut.Lock()
m.remotePausedFolders[deviceID] = paused
m.pmut.Unlock()
// This breaks if we send multiple CM messages during the same connection.
if len(tempIndexFolders) > 0 {
m.pmut.RLock()
@@ -1058,6 +1082,7 @@ func (m *Model) Closed(conn protocol.Connection, err error) {
delete(m.conn, device)
delete(m.helloMessages, device)
delete(m.deviceDownloads, device)
delete(m.remotePausedFolders, device)
closed := m.closed[device]
delete(m.closed, device)
m.pmut.Unlock()
@@ -1070,6 +1095,24 @@ func (m *Model) Closed(conn protocol.Connection, err error) {
close(closed)
}
// close will close the underlying connection for a given device
func (m *Model) close(device protocol.DeviceID) {
m.pmut.Lock()
m.closeLocked(device)
m.pmut.Unlock()
}
// closeLocked will close the underlying connection for a given device
func (m *Model) closeLocked(device protocol.DeviceID) {
conn, ok := m.conn[device]
if !ok {
// There is no connection to close
return
}
closeRawConn(conn)
}
// Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface.
func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset int64, hash []byte, fromTemporary bool, buf []byte) error {
@@ -1257,16 +1300,15 @@ func (m *Model) SetIgnores(folder string, content []string) error {
// This allows us to extract some information from the Hello message
// and add it to a list of known devices ahead of any checks.
func (m *Model) OnHello(remoteID protocol.DeviceID, addr net.Addr, hello protocol.HelloResult) error {
if m.IsPaused(remoteID) {
return errDevicePaused
}
if m.cfg.IgnoredDevice(remoteID) {
return errDeviceIgnored
}
if _, ok := m.cfg.Device(remoteID); ok {
if cfg, ok := m.cfg.Device(remoteID); ok {
// The device exists
if cfg.Paused {
return errDevicePaused
}
return nil
}
@@ -1349,17 +1391,6 @@ func (m *Model) AddConnection(conn connections.Connection, hello protocol.HelloR
m.deviceWasSeen(deviceID)
}
func (m *Model) PauseDevice(device protocol.DeviceID) {
m.pmut.Lock()
m.devicePaused[device] = true
conn, ok := m.conn[device]
m.pmut.Unlock()
if ok {
closeRawConn(conn)
}
events.Default.Log(events.DevicePaused, map[string]string{"device": device.String()})
}
func (m *Model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate) {
if !m.folderSharedWith(folder, device) {
return
@@ -1385,20 +1416,6 @@ func (m *Model) DownloadProgress(device protocol.DeviceID, folder string, update
})
}
func (m *Model) ResumeDevice(device protocol.DeviceID) {
m.pmut.Lock()
m.devicePaused[device] = false
m.pmut.Unlock()
events.Default.Log(events.DeviceResumed, map[string]string{"device": device.String()})
}
func (m *Model) IsPaused(device protocol.DeviceID) bool {
m.pmut.Lock()
paused := m.devicePaused[device]
m.pmut.Unlock()
return paused
}
func (m *Model) deviceStatRef(deviceID protocol.DeviceID) *stats.DeviceStatisticsReference {
m.fmut.Lock()
defer m.fmut.Unlock()
@@ -1983,6 +2000,7 @@ func (m *Model) generateClusterConfig(device protocol.DeviceID) protocol.Cluster
IgnorePermissions: folderCfg.IgnorePerms,
IgnoreDelete: folderCfg.IgnoreDelete,
DisableTempIndexes: folderCfg.DisableTempIndexes,
Paused: folderCfg.Paused,
}
// Devices are sorted, so we always get the same order.
@@ -2192,7 +2210,13 @@ func (m *Model) Availability(folder, file string, version protocol.Vector, block
}
var availabilities []Availability
next:
for _, device := range fs.Availability(file) {
for _, pausedFolder := range m.remotePausedFolders[device] {
if pausedFolder == folder {
continue next
}
}
_, ok := m.conn[device]
if ok {
availabilities = append(availabilities, Availability{ID: device, FromTemporary: false})
@@ -2350,16 +2374,6 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
l.Debugln(m, "adding folder", folderID)
m.AddFolder(cfg)
m.StartFolder(folderID)
// Drop connections to all devices that can now share the new
// folder.
m.pmut.Lock()
for _, dev := range cfg.DeviceIDs() {
if conn, ok := m.conn[dev]; ok {
closeRawConn(conn)
}
}
m.pmut.Unlock()
}
}
@@ -2381,6 +2395,15 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
if !reflect.DeepEqual(fromCfgCopy, toCfgCopy) {
m.RestartFolder(toCfg)
}
// Emit the folder pause/resume event
if fromCfg.Paused != toCfg.Paused {
eventType := events.FolderResumed
if toCfg.Paused {
eventType = events.FolderPaused
}
events.Default.Log(eventType, map[string]string{"id": toCfg.ID, "label": toCfg.Label})
}
}
// Removing a device. We actually don't need to do anything.
@@ -2390,6 +2413,24 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
// At some point model.Close() will get called for that device which will
// clean residue device state that is not part of any folder.
// Pausing a device, unpausing is handled by the connection service.
fromDevices := mapDeviceConfigs(from.Devices)
toDevices := mapDeviceConfigs(to.Devices)
for deviceID, toCfg := range toDevices {
fromCfg, ok := fromDevices[deviceID]
if !ok || fromCfg.Paused == toCfg.Paused {
continue
}
if toCfg.Paused {
l.Infoln("Pausing", deviceID)
m.close(deviceID)
events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
} else {
events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})
}
}
// Some options don't require restart as those components handle it fine
// by themselves.
from.Options.URAccepted = to.Options.URAccepted
@@ -2431,6 +2472,16 @@ func mapDevices(devices []protocol.DeviceID) map[protocol.DeviceID]struct{} {
return m
}
// mapDeviceConfigs returns a map of device ID to device configuration for the given
// slice of folder configurations.
func mapDeviceConfigs(devices []config.DeviceConfiguration) map[protocol.DeviceID]config.DeviceConfiguration {
m := make(map[protocol.DeviceID]config.DeviceConfiguration, len(devices))
for _, dev := range devices {
m[dev.DeviceID] = dev
}
return m
}
func symlinkInvalid(folder string, fi db.FileIntf) bool {
if !symlinks.Supported && fi.IsSymlink() && !fi.IsInvalid() && !fi.IsDeleted() {
symlinkWarning.Do(func() {