diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go
index 1ea2cbc2..5fa63e10 100644
--- a/cmd/syncthing/gui.go
+++ b/cmd/syncthing/gui.go
@@ -32,6 +32,7 @@ import (
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/logger"
+ "github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay"
@@ -85,7 +86,7 @@ type modelIntf interface {
CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool)
CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool)
ResetFolder(folder string)
- Availability(folder, file string) []protocol.DeviceID
+ Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []model.Availability
GetIgnores(folder string) ([]string, []string, error)
SetIgnores(folder string, content []string) error
PauseDevice(device protocol.DeviceID)
@@ -696,7 +697,7 @@ func (s *apiService) getDBFile(w http.ResponseWriter, r *http.Request) {
return
}
- av := s.model.Availability(folder, file)
+ av := s.model.Availability(folder, file, protocol.Vector{}, protocol.BlockInfo{})
sendJSON(w, map[string]interface{}{
"global": jsonFileInfo(gf),
"local": jsonFileInfo(lf),
diff --git a/cmd/syncthing/mocked_model_test.go b/cmd/syncthing/mocked_model_test.go
index 141c9f75..4db67082 100644
--- a/cmd/syncthing/mocked_model_test.go
+++ b/cmd/syncthing/mocked_model_test.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/syncthing/syncthing/lib/db"
+ "github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/stats"
)
@@ -57,7 +58,7 @@ func (m *mockedModel) CurrentGlobalFile(folder string, file string) (protocol.Fi
func (m *mockedModel) ResetFolder(folder string) {
}
-func (m *mockedModel) Availability(folder, file string) []protocol.DeviceID {
+func (m *mockedModel) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []model.Availability {
return nil
}
diff --git a/lib/config/config_test.go b/lib/config/config_test.go
index 70241e85..2067746b 100644
--- a/lib/config/config_test.go
+++ b/lib/config/config_test.go
@@ -62,6 +62,7 @@ func TestDefaultValues(t *testing.T) {
ReleasesURL: "https://api.github.com/repos/syncthing/syncthing/releases?per_page=30",
AlwaysLocalNets: []string{},
OverwriteNames: false,
+ TempIndexMinBlocks: 10,
}
cfg := New(device1)
@@ -192,6 +193,7 @@ func TestOverriddenValues(t *testing.T) {
ReleasesURL: "https://localhost/releases",
AlwaysLocalNets: []string{},
OverwriteNames: true,
+ TempIndexMinBlocks: 100,
}
cfg, err := Load("testdata/overridenvalues.xml", device1)
diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go
index f5c5504b..3cab0d25 100644
--- a/lib/config/folderconfiguration.go
+++ b/lib/config/folderconfiguration.go
@@ -37,6 +37,7 @@ type FolderConfiguration struct {
PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"`
MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"`
DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"`
+ DisableTempIndexes bool `xml:"disableTempIndexes" json:"disableTempIndexes"`
Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved
cachedPath string
diff --git a/lib/config/optionsconfiguration.go b/lib/config/optionsconfiguration.go
index cfca3161..35c3834f 100644
--- a/lib/config/optionsconfiguration.go
+++ b/lib/config/optionsconfiguration.go
@@ -40,6 +40,7 @@ type OptionsConfiguration struct {
ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=30"`
AlwaysLocalNets []string `xml:"alwaysLocalNet" json:"alwaysLocalNets"`
OverwriteNames bool `xml:"overwriteNames" json:"overwriteNames" default:"false"`
+ TempIndexMinBlocks int `xml:"tempIndexMinBlocks" json:"tempIndexMinBlocks" default:"10"`
DeprecatedUPnPEnabled bool `xml:"upnpEnabled"`
DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes"`
diff --git a/lib/config/testdata/overridenvalues.xml b/lib/config/testdata/overridenvalues.xml
index bc60ab39..01341bb3 100755
--- a/lib/config/testdata/overridenvalues.xml
+++ b/lib/config/testdata/overridenvalues.xml
@@ -35,5 +35,6 @@
true
https://localhost/releases
true
+ 100
diff --git a/lib/model/deviceactivity.go b/lib/model/deviceactivity.go
index f6c5ba66..35fae7c7 100644
--- a/lib/model/deviceactivity.go
+++ b/lib/model/deviceactivity.go
@@ -26,28 +26,30 @@ func newDeviceActivity() *deviceActivity {
}
}
-func (m *deviceActivity) leastBusy(availability []protocol.DeviceID) protocol.DeviceID {
+func (m *deviceActivity) leastBusy(availability []Availability) (Availability, bool) {
m.mut.Lock()
low := 2<<30 - 1
- var selected protocol.DeviceID
- for _, device := range availability {
- if usage := m.act[device]; usage < low {
+ found := false
+ var selected Availability
+ for _, info := range availability {
+ if usage := m.act[info.ID]; usage < low {
low = usage
- selected = device
+ selected = info
+ found = true
}
}
m.mut.Unlock()
- return selected
+ return selected, found
}
-func (m *deviceActivity) using(device protocol.DeviceID) {
+func (m *deviceActivity) using(availability Availability) {
m.mut.Lock()
- m.act[device]++
+ m.act[availability.ID]++
m.mut.Unlock()
}
-func (m *deviceActivity) done(device protocol.DeviceID) {
+func (m *deviceActivity) done(availability Availability) {
m.mut.Lock()
- m.act[device]--
+ m.act[availability.ID]--
m.mut.Unlock()
}
diff --git a/lib/model/deviceactivity_test.go b/lib/model/deviceactivity_test.go
index 46595771..04d617a2 100644
--- a/lib/model/deviceactivity_test.go
+++ b/lib/model/deviceactivity_test.go
@@ -13,46 +13,48 @@ import (
)
func TestDeviceActivity(t *testing.T) {
- n0 := protocol.DeviceID([32]byte{1, 2, 3, 4})
- n1 := protocol.DeviceID([32]byte{5, 6, 7, 8})
- n2 := protocol.DeviceID([32]byte{9, 10, 11, 12})
- devices := []protocol.DeviceID{n0, n1, n2}
+ n0 := Availability{protocol.DeviceID([32]byte{1, 2, 3, 4}), false}
+ n1 := Availability{protocol.DeviceID([32]byte{5, 6, 7, 8}), true}
+ n2 := Availability{protocol.DeviceID([32]byte{9, 10, 11, 12}), false}
+ devices := []Availability{n0, n1, n2}
na := newDeviceActivity()
- if lb := na.leastBusy(devices); lb != n0 {
+ if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
- if lb := na.leastBusy(devices); lb != n0 {
+ if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should still be n0 (%v) not %v", n0, lb)
}
- na.using(na.leastBusy(devices))
- if lb := na.leastBusy(devices); lb != n1 {
+ lb, _ := na.leastBusy(devices)
+ na.using(lb)
+ if lb, ok := na.leastBusy(devices); !ok || lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
}
-
- na.using(na.leastBusy(devices))
- if lb := na.leastBusy(devices); lb != n2 {
+ lb, _ = na.leastBusy(devices)
+ na.using(lb)
+ if lb, ok := na.leastBusy(devices); !ok || lb != n2 {
t.Errorf("Least busy device should be n2 (%v) not %v", n2, lb)
}
- na.using(na.leastBusy(devices))
- if lb := na.leastBusy(devices); lb != n0 {
+ lb, _ = na.leastBusy(devices)
+ na.using(lb)
+ if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
na.done(n1)
- if lb := na.leastBusy(devices); lb != n1 {
+ if lb, ok := na.leastBusy(devices); !ok || lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
}
na.done(n2)
- if lb := na.leastBusy(devices); lb != n1 {
+ if lb, ok := na.leastBusy(devices); !ok || lb != n1 {
t.Errorf("Least busy device should still be n1 (%v) not %v", n1, lb)
}
na.done(n0)
- if lb := na.leastBusy(devices); lb != n0 {
+ if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
}
diff --git a/lib/model/devicedownloadstate.go b/lib/model/devicedownloadstate.go
new file mode 100644
index 00000000..f95b2631
--- /dev/null
+++ b/lib/model/devicedownloadstate.go
@@ -0,0 +1,156 @@
+// Copyright (C) 2015 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+ "github.com/syncthing/syncthing/lib/protocol"
+ "github.com/syncthing/syncthing/lib/sync"
+)
+
+// deviceFolderFileDownloadState holds current download state of a file that
+// a remote device has advertised. blockIndexes represends indexes within
+// FileInfo.Blocks that the remote device already has, and version represents
+// the version of the file that the remote device is downloading.
+type deviceFolderFileDownloadState struct {
+ blockIndexes []int32
+ version protocol.Vector
+}
+
+// deviceFolderDownloadState holds current download state of all files that
+// a remote device is currently downloading in a specific folder.
+type deviceFolderDownloadState struct {
+ mut sync.RWMutex
+ files map[string]deviceFolderFileDownloadState
+ numberOfBlocksInProgress int
+}
+
+// Has returns wether a block at that specific index, and that specific version of the file
+// is currently available on the remote device for pulling from a temporary file.
+func (p *deviceFolderDownloadState) Has(file string, version protocol.Vector, index int32) bool {
+ p.mut.RLock()
+ defer p.mut.RUnlock()
+
+ local, ok := p.files[file]
+
+ if !ok || !local.version.Equal(version) {
+ return false
+ }
+
+ for _, existingIndex := range local.blockIndexes {
+ if existingIndex == index {
+ return true
+ }
+ }
+ return false
+}
+
+// Update updates internal state of what has been downloaded into the temporary
+// files by the remote device for this specific folder.
+func (p *deviceFolderDownloadState) Update(updates []protocol.FileDownloadProgressUpdate) {
+ p.mut.Lock()
+ defer p.mut.Unlock()
+
+ for _, update := range updates {
+ local, ok := p.files[update.Name]
+ if update.UpdateType == protocol.UpdateTypeForget && ok && local.version.Equal(update.Version) {
+ p.numberOfBlocksInProgress -= len(local.blockIndexes)
+ delete(p.files, update.Name)
+ } else if update.UpdateType == protocol.UpdateTypeAppend {
+ if !ok {
+ local = deviceFolderFileDownloadState{
+ blockIndexes: update.BlockIndexes,
+ version: update.Version,
+ }
+ } else if !local.version.Equal(update.Version) {
+ p.numberOfBlocksInProgress -= len(local.blockIndexes)
+ local.blockIndexes = append(local.blockIndexes[:0], update.BlockIndexes...)
+ local.version = update.Version
+ } else {
+ local.blockIndexes = append(local.blockIndexes, update.BlockIndexes...)
+ }
+ p.files[update.Name] = local
+ p.numberOfBlocksInProgress += len(update.BlockIndexes)
+ }
+ }
+}
+
+// NumberOfBlocksInProgress returns the number of blocks the device has downloaded
+// for a specific folder.
+func (p *deviceFolderDownloadState) NumberOfBlocksInProgress() int {
+ p.mut.RLock()
+ n := p.numberOfBlocksInProgress
+ p.mut.RUnlock()
+ return n
+}
+
+// deviceDownloadState represents the state of all in progress downloads
+// for all folders of a specific device.
+type deviceDownloadState struct {
+ mut sync.RWMutex
+ folders map[string]*deviceFolderDownloadState
+ numberOfBlocksInProgress int
+}
+
+// Update updates internal state of what has been downloaded into the temporary
+// files by the remote device for this specific folder.
+func (t *deviceDownloadState) Update(folder string, updates []protocol.FileDownloadProgressUpdate) {
+ t.mut.RLock()
+ f, ok := t.folders[folder]
+ t.mut.RUnlock()
+
+ if !ok {
+ f = &deviceFolderDownloadState{
+ mut: sync.NewRWMutex(),
+ files: make(map[string]deviceFolderFileDownloadState),
+ }
+ t.mut.Lock()
+ t.folders[folder] = f
+ t.mut.Unlock()
+ }
+
+ f.Update(updates)
+}
+
+// Has returns wether block at that specific index, and that specific version of the file
+// is currently available on the remote device for pulling from a temporary file.
+func (t *deviceDownloadState) Has(folder, file string, version protocol.Vector, index int32) bool {
+ if t == nil {
+ return false
+ }
+ t.mut.RLock()
+ f, ok := t.folders[folder]
+ t.mut.RUnlock()
+
+ if !ok {
+ return false
+ }
+
+ return f.Has(file, version, index)
+}
+
+// NumberOfBlocksInProgress returns the number of blocks the device has downloaded
+// for all folders.
+func (t *deviceDownloadState) NumberOfBlocksInProgress() int {
+ if t == nil {
+ return 0
+ }
+
+ n := 0
+ t.mut.RLock()
+ for _, folder := range t.folders {
+ n += folder.NumberOfBlocksInProgress()
+ }
+ t.mut.RUnlock()
+ return n
+}
+
+func newDeviceDownloadState() *deviceDownloadState {
+ return &deviceDownloadState{
+ mut: sync.NewRWMutex(),
+ folders: make(map[string]*deviceFolderDownloadState),
+ }
+}
diff --git a/lib/model/devicedownloadstate_test.go b/lib/model/devicedownloadstate_test.go
new file mode 100644
index 00000000..62e122c7
--- /dev/null
+++ b/lib/model/devicedownloadstate_test.go
@@ -0,0 +1,111 @@
+// Copyright (C) 2016 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+ "testing"
+
+ "github.com/syncthing/syncthing/lib/protocol"
+)
+
+func TestDeviceDownloadState(t *testing.T) {
+ v1 := (protocol.Vector{}).Update(0)
+ v2 := (protocol.Vector{}).Update(1)
+
+ // file 1 version 1 part 1
+ f1v1p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v1, []int32{0, 1, 2}}
+ f1v1p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v1, []int32{3, 4, 5}}
+ f1v1del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f1", v1, nil}
+ f1v2p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v2, []int32{10, 11, 12}}
+ f1v2p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v2, []int32{13, 14, 15}}
+ f1v2del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f1", v2, nil}
+
+ f2v1p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f2", v1, []int32{20, 21, 22}}
+ f2v1p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f2", v1, []int32{23, 24, 25}}
+ f2v1del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f2", v1, nil}
+
+ tests := []struct {
+ updates []protocol.FileDownloadProgressUpdate
+ shouldHaveIndexesFrom []protocol.FileDownloadProgressUpdate
+ shouldNotHaveIndexesFrom []protocol.FileDownloadProgressUpdate
+ }{
+ { //1
+ []protocol.FileDownloadProgressUpdate{f1v1p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p2, f1v2p1, f1v2p2},
+ },
+ { //2
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2},
+ []protocol.FileDownloadProgressUpdate{f1v2p1, f1v2p2},
+ },
+ { //3
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v1del},
+ nil,
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v2p2}},
+ { //4
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2del},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2},
+ []protocol.FileDownloadProgressUpdate{f1v2p1, f1v2p2},
+ },
+ { //5
+ // v2 replaces old v1 data
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1},
+ []protocol.FileDownloadProgressUpdate{f1v2p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p2},
+ },
+ { //6
+ // v1 delete on v2 data does nothing
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v1del},
+ []protocol.FileDownloadProgressUpdate{f1v2p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p2},
+ },
+ { //7
+ // v2 replacees v1, v2 gets deleted, and v2 part 2 gets added.
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v2del, f1v2p2},
+ []protocol.FileDownloadProgressUpdate{f1v2p2},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1},
+ },
+ // Multiple files in one go
+ { //8
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p2, f2v1p2},
+ },
+ { //9
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1, f2v1del},
+ []protocol.FileDownloadProgressUpdate{f1v1p1},
+ []protocol.FileDownloadProgressUpdate{f2v1p1, f2v1p1},
+ },
+ { //10
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1del, f2v1p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1},
+ []protocol.FileDownloadProgressUpdate{f1v1p2, f2v1p2},
+ },
+ }
+
+ for i, test := range tests {
+ s := newDeviceDownloadState()
+ s.Update("folder", test.updates)
+
+ for _, expected := range test.shouldHaveIndexesFrom {
+ for _, n := range expected.BlockIndexes {
+ if !s.Has("folder", expected.Name, expected.Version, n) {
+ t.Error("Test", i+1, "error:", expected.Name, expected.Version, "missing", n)
+ }
+ }
+ }
+
+ for _, unexpected := range test.shouldNotHaveIndexesFrom {
+ for _, n := range unexpected.BlockIndexes {
+ if s.Has("folder", unexpected.Name, unexpected.Version, n) {
+ t.Error("Test", i+1, "error:", unexpected.Name, unexpected.Version, "has extra", n)
+ }
+ }
+ }
+ }
+}
diff --git a/lib/model/model.go b/lib/model/model.go
index f2550adb..fdf7b3e4 100644
--- a/lib/model/model.go
+++ b/lib/model/model.go
@@ -60,6 +60,11 @@ type service interface {
getState() (folderState, time.Time, error)
}
+type Availability struct {
+ ID protocol.DeviceID `json:"id"`
+ FromTemporary bool `json:"fromTemporary"`
+}
+
type Model struct {
*suture.Supervisor
@@ -87,10 +92,12 @@ type Model struct {
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
fmut sync.RWMutex // protects the above
- conn map[protocol.DeviceID]Connection
- helloMessages map[protocol.DeviceID]protocol.HelloMessage
- devicePaused map[protocol.DeviceID]bool
- pmut sync.RWMutex // protects the above
+ conn map[protocol.DeviceID]Connection
+ helloMessages map[protocol.DeviceID]protocol.HelloMessage
+ deviceClusterConf map[protocol.DeviceID]protocol.ClusterConfigMessage
+ devicePaused map[protocol.DeviceID]bool
+ deviceDownloads map[protocol.DeviceID]*deviceDownloadState
+ pmut sync.RWMutex // protects the above
}
var (
@@ -129,10 +136,11 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
conn: make(map[protocol.DeviceID]Connection),
helloMessages: make(map[protocol.DeviceID]protocol.HelloMessage),
+ deviceClusterConf: make(map[protocol.DeviceID]protocol.ClusterConfigMessage),
devicePaused: make(map[protocol.DeviceID]bool),
-
- fmut: sync.NewRWMutex(),
- pmut: sync.NewRWMutex(),
+ deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
+ fmut: sync.NewRWMutex(),
+ pmut: sync.NewRWMutex(),
}
if cfg.Options().ProgressUpdateIntervalS > -1 {
go m.progressEmitter.Serve()
@@ -388,6 +396,11 @@ func (m *Model) Completion(device protocol.DeviceID, folder string) float64 {
return true
})
+ // This might might be more than it really is, because some blocks can be of a smaller size.
+ m.pmut.RLock()
+ need -= int64(m.deviceDownloads[device].NumberOfBlocksInProgress() * protocol.BlockSize)
+ m.pmut.RUnlock()
+
needRatio := float64(need) / float64(tot)
completionPct := 100 * (1 - needRatio)
l.Debugf("%v Completion(%s, %q): %f (%d / %d = %f)", m, device, folder, completionPct, need, tot, needRatio)
@@ -609,6 +622,10 @@ func (m *Model) folderSharedWithUnlocked(folder string, deviceID protocol.Device
func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfigMessage) {
// Check the peer device's announced folders against our own. Emits events
// for folders that we don't expect (unknown or not shared).
+ // Also, collect a list of folders we do share, and if he's interested in
+ // temporary indexes, subscribe the connection.
+
+ tempIndexFolders := make([]string, 0, len(cm.Folders))
m.fmut.Lock()
nextFolder:
@@ -635,9 +652,24 @@ nextFolder:
l.Infof("Unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.ID, deviceID)
continue
}
+ if folder.Flags&protocol.FlagFolderDisabledTempIndexes == 0 {
+ tempIndexFolders = append(tempIndexFolders, folder.ID)
+ }
}
m.fmut.Unlock()
+ // This breaks if we send multiple CM messages during the same connection.
+ if len(tempIndexFolders) > 0 {
+ m.pmut.RLock()
+ conn, ok := m.conn[deviceID]
+ m.pmut.RUnlock()
+ // In case we've got ClusterConfig, and the connection disappeared
+ // from infront of our nose.
+ if ok {
+ m.progressEmitter.temporaryIndexSubscribe(conn, tempIndexFolders)
+ }
+ }
+
var changed bool
if m.cfg.Devices()[deviceID].Introducer {
@@ -645,9 +677,6 @@ nextFolder:
// and devices and add what we are missing.
for _, folder := range cm.Folders {
- // If we don't have this folder yet, skip it. Ideally, we'd
- // offer up something in the GUI to create the folder, but for the
- // moment we only handle folders that we already have.
if _, ok := m.folderDevices[folder.ID]; !ok {
continue
}
@@ -736,10 +765,13 @@ func (m *Model) Close(device protocol.DeviceID, err error) {
conn, ok := m.conn[device]
if ok {
+ m.progressEmitter.temporaryIndexUnsubscribe(conn)
closeRawConn(conn)
}
delete(m.conn, device)
delete(m.helloMessages, device)
+ delete(m.deviceClusterConf, device)
+ delete(m.deviceDownloads, device)
m.pmut.Unlock()
}
@@ -752,19 +784,20 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
if !m.folderSharedWith(folder, deviceID) {
l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder)
- return protocol.ErrInvalid
+ return protocol.ErrNoSuchFile
}
- if flags != 0 {
- // We don't currently support or expect any flags.
- return protocol.ErrInvalid
+ if flags != 0 && flags != protocol.FlagFromTemporary {
+ // We currently support only no flags, or FromTemporary flag.
+ return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags)
}
if deviceID != protocol.LocalDeviceID {
- l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf))
+ l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d f=%d", m, deviceID, folder, name, offset, len(buf), flags)
}
m.fmut.RLock()
- folderPath := m.folderCfgs[folder].Path()
+ folderCfg := m.folderCfgs[folder]
+ folderPath := folderCfg.Path()
folderIgnores := m.folderIgnores[folder]
m.fmut.RUnlock()
@@ -802,8 +835,6 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
}
}
- var reader io.ReaderAt
- var err error
if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 {
target, _, err := symlinks.Read(fn)
if err != nil {
@@ -813,28 +844,30 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
}
return protocol.ErrGeneric
}
- reader = strings.NewReader(target)
- } else {
- // Cannot easily cache fd's because we might need to delete the file
- // at any moment.
- reader, err = os.Open(fn)
- if err != nil {
- l.Debugln("os.Open:", err)
- if os.IsNotExist(err) {
- return protocol.ErrNoSuchFile
- }
+ if _, err := strings.NewReader(target).ReadAt(buf, offset); err != nil {
+ l.Debugln("symlink.Reader.ReadAt", err)
return protocol.ErrGeneric
}
-
- defer reader.(*os.File).Close()
+ return nil
}
- _, err = reader.ReadAt(buf, offset)
- if err != nil {
- l.Debugln("reader.ReadAt:", err)
+ // Only check temp files if the flag is set, and if we are set to advertise
+ // the temp indexes.
+ if flags&protocol.FlagFromTemporary != 0 && !folderCfg.DisableTempIndexes {
+ tempFn := filepath.Join(folderPath, defTempNamer.TempName(name))
+ if err := readOffsetIntoBuf(tempFn, offset, buf); err == nil {
+ return nil
+ }
+ // Fall through to reading from a non-temp file, just incase the temp
+ // file has finished downloading.
+ }
+
+ err := readOffsetIntoBuf(fn, offset, buf)
+ if os.IsNotExist(err) {
+ return protocol.ErrNoSuchFile
+ } else if err != nil {
return protocol.ErrGeneric
}
-
return nil
}
@@ -986,6 +1019,7 @@ func (m *Model) AddConnection(conn Connection, hello protocol.HelloMessage) {
panic("add existing device")
}
m.conn[deviceID] = conn
+ m.deviceDownloads[deviceID] = newDeviceDownloadState()
m.helloMessages[deviceID] = hello
@@ -1040,6 +1074,24 @@ func (m *Model) PauseDevice(device protocol.DeviceID) {
events.Default.Log(events.DevicePaused, map[string]string{"device": device.String()})
}
+func (m *Model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate, flags uint32, options []protocol.Option) {
+ if !m.folderSharedWith(folder, device) {
+ return
+ }
+
+ m.fmut.RLock()
+ cfg, ok := m.folderCfgs[folder]
+ m.fmut.RUnlock()
+
+ if !ok || cfg.ReadOnly || cfg.DisableTempIndexes {
+ return
+ }
+
+ m.pmut.RLock()
+ m.deviceDownloads[device].Update(folder, updates)
+ m.pmut.RUnlock()
+}
+
func (m *Model) ResumeDevice(device protocol.DeviceID) {
m.pmut.Lock()
m.devicePaused[device] = false
@@ -1211,7 +1263,7 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
})
}
-func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) {
+func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
m.pmut.RLock()
nc, ok := m.conn[deviceID]
m.pmut.RUnlock()
@@ -1220,9 +1272,9 @@ func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, o
return nil, fmt.Errorf("requestGlobal: no such device: %s", deviceID)
}
- l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x f=%x op=%s", m, deviceID, folder, name, offset, size, hash, flags, options)
+ l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x ft=%t op=%s", m, deviceID, folder, name, offset, size, hash, fromTemporary)
- return nc.Request(folder, name, offset, size, hash, flags, options)
+ return nc.Request(folder, name, offset, size, hash, fromTemporary)
}
func (m *Model) AddFolder(cfg config.FolderConfiguration) {
@@ -1553,6 +1605,9 @@ func (m *Model) generateClusterConfig(device protocol.DeviceID) protocol.Cluster
if folderCfg.IgnoreDelete {
flags |= protocol.FlagFolderIgnoreDelete
}
+ if folderCfg.DisableTempIndexes {
+ flags |= protocol.FlagFolderDisabledTempIndexes
+ }
protocolFolder.Flags = flags
for _, device := range m.folderDevices[folder] {
// DeviceID is a value type, but with an underlying array. Copy it
@@ -1736,7 +1791,7 @@ func (m *Model) GlobalDirectoryTree(folder, prefix string, levels int, dirsonly
return output
}
-func (m *Model) Availability(folder, file string) []protocol.DeviceID {
+func (m *Model) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []Availability {
// Acquire this lock first, as the value returned from foldersFiles can
// get heavily modified on Close()
m.pmut.RLock()
@@ -1744,19 +1799,27 @@ func (m *Model) Availability(folder, file string) []protocol.DeviceID {
m.fmut.RLock()
fs, ok := m.folderFiles[folder]
+ devices := m.folderDevices[folder]
m.fmut.RUnlock()
if !ok {
return nil
}
- availableDevices := []protocol.DeviceID{}
+ var availabilities []Availability
for _, device := range fs.Availability(file) {
_, ok := m.conn[device]
if ok {
- availableDevices = append(availableDevices, device)
+ availabilities = append(availabilities, Availability{ID: device, FromTemporary: false})
}
}
- return availableDevices
+
+ for _, device := range devices {
+ if m.deviceDownloads[device].Has(folder, file, version, int32(block.Offset/protocol.BlockSize)) {
+ availabilities = append(availabilities, Availability{ID: device, FromTemporary: true})
+ }
+ }
+
+ return availabilities
}
// BringToFront bumps the given files priority in the job queue.
@@ -2086,6 +2149,21 @@ func stringSliceWithout(ss []string, s string) []string {
return ss
}
+func readOffsetIntoBuf(file string, offset int64, buf []byte) error {
+ fd, err := os.Open(file)
+ if err != nil {
+ l.Debugln("readOffsetIntoBuf.Open", file, err)
+ return err
+ }
+
+ defer fd.Close()
+ _, err = fd.ReadAt(buf, offset)
+ if err != nil {
+ l.Debugln("readOffsetIntoBuf.ReadAt", file, err)
+ }
+ return err
+}
+
// The exists function is expected to return true for all known paths
// (excluding "" and ".")
func unifySubs(dirs []string, exists func(dir string) bool) []string {
diff --git a/lib/model/model_test.go b/lib/model/model_test.go
index 828437cc..24c7352f 100644
--- a/lib/model/model_test.go
+++ b/lib/model/model_test.go
@@ -203,9 +203,17 @@ func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) {
b.ReportAllocs()
}
+type downloadProgressMessage struct {
+ folder string
+ updates []protocol.FileDownloadProgressUpdate
+ flags uint32
+ options []protocol.Option
+}
+
type FakeConnection struct {
- id protocol.DeviceID
- requestData []byte
+ id protocol.DeviceID
+ requestData []byte
+ downloadProgressMessages []downloadProgressMessage
}
func (FakeConnection) Close() error {
@@ -235,7 +243,7 @@ func (FakeConnection) IndexUpdate(string, []protocol.FileInfo, uint32, []protoco
return nil
}
-func (f FakeConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) {
+func (f FakeConnection) Request(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
return f.requestData, nil
}
@@ -253,6 +261,15 @@ func (FakeConnection) Statistics() protocol.Statistics {
return protocol.Statistics{}
}
+func (f *FakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate, flags uint32, options []protocol.Option) {
+ f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{
+ folder: folder,
+ updates: updates,
+ flags: flags,
+ options: options,
+ })
+}
+
func BenchmarkRequest(b *testing.B) {
db := db.OpenMemory()
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db, nil)
@@ -271,7 +288,7 @@ func BenchmarkRequest(b *testing.B) {
}
}
- fc := FakeConnection{
+ fc := &FakeConnection{
id: device1,
requestData: []byte("some data to return"),
}
@@ -284,7 +301,7 @@ func BenchmarkRequest(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, 0, nil)
+ data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, false)
if err != nil {
b.Error(err)
}
@@ -318,7 +335,7 @@ func TestDeviceRename(t *testing.T) {
conn := Connection{
&net.TCPConn{},
- FakeConnection{
+ &FakeConnection{
id: device1,
requestData: []byte("some data to return"),
},
diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go
index 70b7ba9d..a4d225d7 100755
--- a/lib/model/progressemitter.go
+++ b/lib/model/progressemitter.go
@@ -9,19 +9,22 @@ package model
import (
"fmt"
"path/filepath"
- "reflect"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
+ "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
type ProgressEmitter struct {
- registry map[string]*sharedPullerState
- interval time.Duration
- last map[string]map[string]*pullerProgress
- mut sync.Mutex
+ registry map[string]*sharedPullerState
+ interval time.Duration
+ minBlocks int
+ lastUpdate time.Time
+ sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages.
+ connections map[string][]protocol.Connection
+ mut sync.Mutex
timer *time.Timer
@@ -32,11 +35,12 @@ type ProgressEmitter struct {
// DownloadProgress events every interval.
func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
t := &ProgressEmitter{
- stop: make(chan struct{}),
- registry: make(map[string]*sharedPullerState),
- last: make(map[string]map[string]*pullerProgress),
- timer: time.NewTimer(time.Millisecond),
- mut: sync.NewMutex(),
+ stop: make(chan struct{}),
+ registry: make(map[string]*sharedPullerState),
+ timer: time.NewTimer(time.Millisecond),
+ sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
+ connections: make(map[string][]protocol.Connection),
+ mut: sync.NewMutex(),
}
t.CommitConfiguration(config.Configuration{}, cfg.Raw())
@@ -48,6 +52,8 @@ func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
// Serve starts the progress emitter which starts emitting DownloadProgress
// events as the progress happens.
func (t *ProgressEmitter) Serve() {
+ var lastUpdate time.Time
+ var lastCount, newCount int
for {
select {
case <-t.stop:
@@ -56,21 +62,28 @@ func (t *ProgressEmitter) Serve() {
case <-t.timer.C:
t.mut.Lock()
l.Debugln("progress emitter: timer - looking after", len(t.registry))
- output := make(map[string]map[string]*pullerProgress)
+
+ newLastUpdated := lastUpdate
+ newCount = len(t.registry)
for _, puller := range t.registry {
- if output[puller.folder] == nil {
- output[puller.folder] = make(map[string]*pullerProgress)
+ updated := puller.Updated()
+ if updated.After(newLastUpdated) {
+ newLastUpdated = updated
}
- output[puller.folder][puller.file.Name] = puller.Progress()
}
- if !reflect.DeepEqual(t.last, output) {
- events.Default.Log(events.DownloadProgress, output)
- t.last = output
- l.Debugf("progress emitter: emitting %#v", output)
+
+ if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount {
+ lastUpdate = newLastUpdated
+ lastCount = newCount
+ t.sendDownloadProgressEvent()
+ if len(t.connections) > 0 {
+ t.sendDownloadProgressMessages()
+ }
} else {
l.Debugln("progress emitter: nothing new")
}
- if len(t.registry) != 0 {
+
+ if newCount != 0 {
t.timer.Reset(t.interval)
}
t.mut.Unlock()
@@ -78,6 +91,95 @@ func (t *ProgressEmitter) Serve() {
}
}
+func (t *ProgressEmitter) sendDownloadProgressEvent() {
+ // registry lock already held
+ output := make(map[string]map[string]*pullerProgress)
+ for _, puller := range t.registry {
+ if output[puller.folder] == nil {
+ output[puller.folder] = make(map[string]*pullerProgress)
+ }
+ output[puller.folder][puller.file.Name] = puller.Progress()
+ }
+ events.Default.Log(events.DownloadProgress, output)
+ l.Debugf("progress emitter: emitting %#v", output)
+}
+
+func (t *ProgressEmitter) sendDownloadProgressMessages() {
+ // registry lock already held
+ sharedFolders := make(map[protocol.DeviceID][]string)
+ deviceConns := make(map[protocol.DeviceID]protocol.Connection)
+ subscribers := t.connections
+ for folder, conns := range subscribers {
+ for _, conn := range conns {
+ id := conn.ID()
+
+ deviceConns[id] = conn
+ sharedFolders[id] = append(sharedFolders[id], folder)
+
+ state, ok := t.sentDownloadStates[id]
+ if !ok {
+ state = &sentDownloadState{
+ folderStates: make(map[string]*sentFolderDownloadState),
+ }
+ t.sentDownloadStates[id] = state
+ }
+
+ var activePullers []*sharedPullerState
+ for _, puller := range t.registry {
+ if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks {
+ continue
+ }
+ activePullers = append(activePullers, puller)
+ }
+
+ // For every new puller that hasn't yet been seen, it will send all the blocks the puller has available
+ // For every existing puller, it will check for new blocks, and send update for the new blocks only
+ // For every puller that we've seen before but is no longer there, we will send a forget message
+ updates := state.update(folder, activePullers)
+
+ if len(updates) > 0 {
+ conn.DownloadProgress(folder, updates, 0, nil)
+ }
+ }
+ }
+
+ // Clean up sentDownloadStates for devices which we are no longer connected to.
+ for id := range t.sentDownloadStates {
+ _, ok := deviceConns[id]
+ if !ok {
+ // Null out outstanding entries for device
+ delete(t.sentDownloadStates, id)
+ }
+ }
+
+ // If a folder was unshared from some device, tell it that all temp files
+ // are now gone.
+ for id, sharedDeviceFolders := range sharedFolders {
+ state := t.sentDownloadStates[id]
+ nextFolder:
+ // For each of the folders that the state is aware of,
+ // try to match it with a shared folder we've discovered above,
+ for _, folder := range state.folders() {
+ for _, existingFolder := range sharedDeviceFolders {
+ if existingFolder == folder {
+ continue nextFolder
+ }
+ }
+
+ // If we fail to find that folder, we tell the state to forget about it
+ // and return us a list of updates which would clean up the state
+ // on the remote end.
+ updates := state.cleanup(folder)
+ if len(updates) > 0 {
+ // XXX: Don't send this now, as the only way we've unshared a folder
+ // is by breaking the connection and reconnecting, hence sending
+ // forget messages for some random folder currently makes no sense.
+ // deviceConns[id].DownloadProgress(folder, updates, 0, nil)
+ }
+ }
+ }
+}
+
// VerifyConfiguration implements the config.Committer interface
func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error {
return nil
@@ -89,6 +191,7 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo
defer t.mut.Unlock()
t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
+ t.minBlocks = to.Options.TempIndexMinBlocks
l.Debugln("progress emitter: updated interval", t.interval)
return true
@@ -115,7 +218,9 @@ func (t *ProgressEmitter) Register(s *sharedPullerState) {
func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
t.mut.Lock()
defer t.mut.Unlock()
+
l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
+
delete(t.registry, filepath.Join(s.folder, s.file.Name))
}
@@ -142,3 +247,38 @@ func (t *ProgressEmitter) lenRegistry() int {
defer t.mut.Unlock()
return len(t.registry)
}
+
+func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
+ t.mut.Lock()
+ for _, folder := range folders {
+ t.connections[folder] = append(t.connections[folder], conn)
+ }
+ t.mut.Unlock()
+}
+
+func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
+ t.mut.Lock()
+ left := make(map[string][]protocol.Connection, len(t.connections))
+ for folder, conns := range t.connections {
+ connsLeft := connsWithout(conns, conn)
+ if len(connsLeft) > 0 {
+ left[folder] = connsLeft
+ }
+ }
+ t.connections = left
+ t.mut.Unlock()
+}
+
+func connsWithout(conns []protocol.Connection, conn protocol.Connection) []protocol.Connection {
+ if len(conns) == 0 {
+ return nil
+ }
+
+ newConns := make([]protocol.Connection, 0, len(conns)-1)
+ for _, existingConn := range conns {
+ if existingConn != conn {
+ newConns = append(newConns, existingConn)
+ }
+ }
+ return newConns
+}
diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go
index cd8bba22..bb20c224 100644
--- a/lib/model/progressemitter_test.go
+++ b/lib/model/progressemitter_test.go
@@ -7,34 +7,46 @@
package model
import (
+ "fmt"
+ "path/filepath"
+ "runtime"
"testing"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
+ "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
-var timeout = 10 * time.Millisecond
+var timeout = 100 * time.Millisecond
+
+func caller(skip int) string {
+ _, file, line, ok := runtime.Caller(skip + 1)
+ if !ok {
+ return "unknown"
+ }
+ return fmt.Sprintf("%s:%d", filepath.Base(file), line)
+}
func expectEvent(w *events.Subscription, t *testing.T, size int) {
event, err := w.Poll(timeout)
if err != nil {
- t.Fatal("Unexpected error:", err)
+ t.Fatal("Unexpected error:", err, "at", caller(1))
}
if event.Type != events.DownloadProgress {
- t.Fatal("Unexpected event:", event)
+ t.Fatal("Unexpected event:", event, "at", caller(1))
}
data := event.Data.(map[string]map[string]*pullerProgress)
if len(data) != size {
- t.Fatal("Unexpected event data size:", data)
+ t.Fatal("Unexpected event data size:", data, "at", caller(1))
}
}
func expectTimeout(w *events.Subscription, t *testing.T) {
_, err := w.Poll(timeout)
if err != events.ErrTimeout {
- t.Fatal("Unexpected non-Timeout error:", err)
+ t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1))
}
}
@@ -52,14 +64,15 @@ func TestProgressEmitter(t *testing.T) {
expectTimeout(w, t)
s := sharedPullerState{
- mut: sync.NewMutex(),
+ updated: time.Now(),
+ mut: sync.NewRWMutex(),
}
p.Register(&s)
expectEvent(w, t, 1)
expectTimeout(w, t)
- s.copyDone()
+ s.copyDone(protocol.BlockInfo{})
expectEvent(w, t, 1)
expectTimeout(w, t)
@@ -74,7 +87,7 @@ func TestProgressEmitter(t *testing.T) {
expectEvent(w, t, 1)
expectTimeout(w, t)
- s.pullDone()
+ s.pullDone(protocol.BlockInfo{})
expectEvent(w, t, 1)
expectTimeout(w, t)
@@ -85,3 +98,335 @@ func TestProgressEmitter(t *testing.T) {
expectTimeout(w, t)
}
+
+func TestSendDownloadProgressMessages(t *testing.T) {
+
+ c := config.Wrap("/tmp/test", config.Configuration{})
+ c.SetOptions(config.OptionsConfiguration{
+ ProgressUpdateIntervalS: 0,
+ TempIndexMinBlocks: 10,
+ })
+
+ fc := &FakeConnection{}
+
+ p := NewProgressEmitter(c)
+ p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})
+
+ expect := func(updateIdx int, state *sharedPullerState, updateType uint32, version protocol.Vector, blocks []int32, remove bool) {
+ messageIdx := -1
+ for i, msg := range fc.downloadProgressMessages {
+ if msg.folder == state.folder {
+ messageIdx = i
+ break
+ }
+ }
+ if messageIdx < 0 {
+ t.Errorf("Message for folder %s does not exist at %s", state.folder, caller(1))
+ }
+
+ msg := fc.downloadProgressMessages[messageIdx]
+
+ // Don't know the index (it's random due to iterating maps)
+ if updateIdx == -1 {
+ for i, upd := range msg.updates {
+ if upd.Name == state.file.Name {
+ updateIdx = i
+ break
+ }
+ }
+ }
+
+ if updateIdx == -1 {
+ t.Errorf("Could not find update for %s at %s", state.file.Name, caller(1))
+ }
+
+ if updateIdx > len(msg.updates)-1 {
+ t.Errorf("Update at index %d does not exist at %s", updateIdx, caller(1))
+ }
+
+ update := msg.updates[updateIdx]
+
+ if update.UpdateType != updateType {
+ t.Errorf("Wrong update type at %s", caller(1))
+ }
+
+ if !update.Version.Equal(version) {
+ t.Errorf("Wrong version at %s", caller(1))
+ }
+
+ if len(update.BlockIndexes) != len(blocks) {
+ t.Errorf("Wrong indexes. Have %d expect %d at %s", len(update.BlockIndexes), len(blocks), caller(1))
+ }
+ for i := range update.BlockIndexes {
+ if update.BlockIndexes[i] != blocks[i] {
+ t.Errorf("Index %d incorrect at %s", i, caller(1))
+ }
+ }
+
+ if remove {
+ fc.downloadProgressMessages = append(fc.downloadProgressMessages[:messageIdx], fc.downloadProgressMessages[messageIdx+1:]...)
+ }
+ }
+ expectEmpty := func() {
+ if len(fc.downloadProgressMessages) > 0 {
+ t.Errorf("Still have something at %s: %#v", caller(1), fc.downloadProgressMessages)
+ }
+ }
+
+ now := time.Now()
+ tick := func() time.Time {
+ now = now.Add(time.Second)
+ return now
+ }
+
+ if len(fc.downloadProgressMessages) != 0 {
+ t.Error("Expected no requests")
+ }
+
+ v1 := (protocol.Vector{}).Update(0)
+ v2 := (protocol.Vector{}).Update(1)
+
+ // Requires more than 10 blocks to work.
+ blocks := make([]protocol.BlockInfo, 11, 11)
+
+ state1 := &sharedPullerState{
+ folder: "folder",
+ file: protocol.FileInfo{
+ Name: "state1",
+ Version: v1,
+ Blocks: blocks,
+ },
+ mut: sync.NewRWMutex(),
+ availableUpdated: time.Now(),
+ }
+ p.registry["1"] = state1
+
+ // Has no blocks, hence no message is sent
+ p.sendDownloadProgressMessages()
+ expectEmpty()
+
+ // Returns update for puller with new extra blocks
+ state1.available = []int32{1}
+ p.sendDownloadProgressMessages()
+
+ expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true)
+ expectEmpty()
+
+ // Does nothing if nothing changes
+ p.sendDownloadProgressMessages()
+ expectEmpty()
+
+ // Does nothing if timestamp updated, but no new blocks (should never happen)
+ state1.availableUpdated = tick()
+
+ p.sendDownloadProgressMessages()
+ expectEmpty()
+
+ // Does not return an update if date blocks change but date does not (should never happen)
+ state1.available = []int32{1, 2}
+
+ p.sendDownloadProgressMessages()
+ expectEmpty()
+
+ // If the date and blocks changes, returns only the diff
+ state1.availableUpdated = tick()
+
+ p.sendDownloadProgressMessages()
+
+ expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true)
+ expectEmpty()
+
+ // Returns forget and update if puller version has changed
+ state1.file.Version = v2
+
+ p.sendDownloadProgressMessages()
+
+ expect(0, state1, protocol.UpdateTypeForget, v1, nil, false)
+ expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true)
+ expectEmpty()
+
+ // Sends an empty update if new file exists, but does not have any blocks yet. (To indicate that the old blocks are no longer available)
+ state1.file.Version = v1
+ state1.available = nil
+ state1.availableUpdated = tick()
+
+ p.sendDownloadProgressMessages()
+
+ expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
+ expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true)
+ expectEmpty()
+
+ // Updates for multiple files and folders can be combined
+ state1.available = []int32{1, 2, 3}
+ state1.availableUpdated = tick()
+
+ state2 := &sharedPullerState{
+ folder: "folder2",
+ file: protocol.FileInfo{
+ Name: "state2",
+ Version: v1,
+ Blocks: blocks,
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ state3 := &sharedPullerState{
+ folder: "folder",
+ file: protocol.FileInfo{
+ Name: "state3",
+ Version: v1,
+ Blocks: blocks,
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ state4 := &sharedPullerState{
+ folder: "folder2",
+ file: protocol.FileInfo{
+ Name: "state4",
+ Version: v1,
+ Blocks: blocks,
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ p.registry["2"] = state2
+ p.registry["3"] = state3
+ p.registry["4"] = state4
+
+ p.sendDownloadProgressMessages()
+
+ expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
+ expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
+ expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
+ expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
+ expectEmpty()
+
+ // Returns forget if puller no longer exists, as well as updates if it has been updated.
+ state1.available = []int32{1, 2, 3, 4, 5}
+ state1.availableUpdated = tick()
+ state2.available = []int32{1, 2, 3, 4, 5}
+ state2.availableUpdated = tick()
+
+ delete(p.registry, "3")
+ delete(p.registry, "4")
+
+ p.sendDownloadProgressMessages()
+
+ expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
+ expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true)
+ expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
+ expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
+ expectEmpty()
+
+ // Deletions are sent only once (actual bug I found writing the tests)
+ p.sendDownloadProgressMessages()
+ p.sendDownloadProgressMessages()
+ expectEmpty()
+
+ // Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers
+ // Directory
+ state5 := &sharedPullerState{
+ folder: "folder",
+ file: protocol.FileInfo{
+ Name: "state5",
+ Version: v1,
+ Flags: protocol.FlagDirectory,
+ Blocks: blocks,
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ // Symlink
+ state6 := &sharedPullerState{
+ folder: "folder",
+ file: protocol.FileInfo{
+ Name: "state6",
+ Version: v1,
+ Flags: protocol.FlagSymlink,
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ // Some other directory
+ state7 := &sharedPullerState{
+ folder: "folderXXX",
+ file: protocol.FileInfo{
+ Name: "state7",
+ Version: v1,
+ Blocks: blocks,
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ // Less than 10 blocks
+ state8 := &sharedPullerState{
+ folder: "folder",
+ file: protocol.FileInfo{
+ Name: "state8",
+ Version: v1,
+ Blocks: blocks[:3],
+ },
+ mut: sync.NewRWMutex(),
+ available: []int32{1, 2, 3},
+ availableUpdated: time.Now(),
+ }
+ p.registry["5"] = state5
+ p.registry["6"] = state6
+ p.registry["7"] = state7
+ p.registry["8"] = state8
+
+ p.sendDownloadProgressMessages()
+
+ expectEmpty()
+
+ // Device is no longer subscribed to a particular folder
+ delete(p.registry, "1") // Clean up first
+ delete(p.registry, "2") // Clean up first
+
+ p.sendDownloadProgressMessages()
+ expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true)
+ expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true)
+
+ expectEmpty()
+
+ p.registry["1"] = state1
+ p.registry["2"] = state2
+ p.registry["3"] = state3
+ p.registry["4"] = state4
+
+ p.sendDownloadProgressMessages()
+
+ expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
+ expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
+ expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
+ expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
+ expectEmpty()
+
+ p.temporaryIndexUnsubscribe(fc)
+ p.temporaryIndexSubscribe(fc, []string{"folder"})
+
+ p.sendDownloadProgressMessages()
+
+ // See progressemitter.go for explanation why this is commented out.
+ // Search for state.cleanup
+ //expect(-1, state2, protocol.UpdateTypeForget, v1, nil, false)
+ //expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
+
+ expectEmpty()
+
+ // Cleanup when device no longer exists
+ p.temporaryIndexUnsubscribe(fc)
+
+ p.sendDownloadProgressMessages()
+ _, ok := p.sentDownloadStates[fc.ID()]
+ if ok {
+ t.Error("Should not be there")
+ }
+}
diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go
index a0b17e4f..3947f38f 100644
--- a/lib/model/rwfolder.go
+++ b/lib/model/rwfolder.go
@@ -970,9 +970,9 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
scanner.PopulateOffsets(file.Blocks)
- reused := 0
var blocks []protocol.BlockInfo
var blocksSize int64
+ var reused []int32
// Check for an old temporary file which might have some blocks we could
// reuse.
@@ -988,25 +988,27 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
}
// Since the blocks are already there, we don't need to get them.
- for _, block := range file.Blocks {
+ for i, block := range file.Blocks {
_, ok := existingBlocks[block.String()]
if !ok {
blocks = append(blocks, block)
blocksSize += int64(block.Size)
+ } else {
+ reused = append(reused, int32(i))
}
}
// The sharedpullerstate will know which flags to use when opening the
// temp file depending if we are reusing any blocks or not.
- reused = len(file.Blocks) - len(blocks)
- if reused == 0 {
+ if len(reused) == 0 {
// Otherwise, discard the file ourselves in order for the
// sharedpuller not to panic when it fails to exclusively create a
// file which already exists
osutil.InWritableDir(osutil.Remove, tempName)
}
} else {
- blocks = file.Blocks
+ // Copy the blocks, as we don't want to shuffle them on the FileInfo
+ blocks = append(blocks, file.Blocks...)
blocksSize = file.Size()
}
@@ -1018,6 +1020,12 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
}
}
+ // Shuffle the blocks
+ for i := range blocks {
+ j := rand.Intn(i + 1)
+ blocks[i], blocks[j] = blocks[j], blocks[i]
+ }
+
events.Default.Log(events.ItemStarted, map[string]string{
"folder": p.folder,
"item": file.Name,
@@ -1026,17 +1034,20 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
})
s := sharedPullerState{
- file: file,
- folder: p.folder,
- tempName: tempName,
- realName: realName,
- copyTotal: len(blocks),
- copyNeeded: len(blocks),
- reused: reused,
- ignorePerms: p.ignorePermissions(file),
- version: curFile.Version,
- mut: sync.NewMutex(),
- sparse: p.allowSparse,
+ file: file,
+ folder: p.folder,
+ tempName: tempName,
+ realName: realName,
+ copyTotal: len(blocks),
+ copyNeeded: len(blocks),
+ reused: len(reused),
+ updated: time.Now(),
+ available: reused,
+ availableUpdated: time.Now(),
+ ignorePerms: p.ignorePermissions(file),
+ version: curFile.Version,
+ mut: sync.NewRWMutex(),
+ sparse: p.allowSparse,
}
l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
@@ -1184,7 +1195,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
}
pullChan <- ps
} else {
- state.copyDone()
+ state.copyDone(block)
}
}
out <- state.sharedPullerState
@@ -1210,19 +1221,19 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
if p.allowSparse && state.reused == 0 && state.block.IsEmpty() {
// There is no need to request a block of all zeroes. Pretend we
// requested it and handled it correctly.
- state.pullDone()
+ state.pullDone(state.block)
out <- state.sharedPullerState
continue
}
var lastError error
- potentialDevices := p.model.Availability(p.folder, state.file.Name)
+ candidates := p.model.Availability(p.folder, state.file.Name, state.file.Version, state.block)
for {
// Select the least busy device to pull the block from. If we found no
// feasible device at all, fail the block (and in the long run, the
// file).
- selected := activity.leastBusy(potentialDevices)
- if selected == (protocol.DeviceID{}) {
+ selected, found := activity.leastBusy(candidates)
+ if !found {
if lastError != nil {
state.fail("pull", lastError)
} else {
@@ -1231,12 +1242,12 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
break
}
- potentialDevices = removeDevice(potentialDevices, selected)
+ candidates = removeAvailability(candidates, selected)
// Fetch the block, while marking the selected device as in use so that
// leastBusy can select another device when someone else asks.
activity.using(selected)
- buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil)
+ buf, lastError := p.model.requestGlobal(selected.ID, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
activity.done(selected)
if lastError != nil {
l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
@@ -1256,7 +1267,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
if err != nil {
state.fail("save", err)
} else {
- state.pullDone()
+ state.pullDone(state.block)
}
break
}
@@ -1481,14 +1492,24 @@ func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool {
return false
}
-func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID {
- for i := range devices {
- if devices[i] == device {
- devices[i] = devices[len(devices)-1]
- return devices[:len(devices)-1]
+func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
+ for i := range cfg.Folders {
+ folder := &cfg.Folders[i]
+ if folder.ID == folderID {
+ folder.Invalid = err.Error()
+ return
}
}
- return devices
+}
+
+func removeAvailability(availabilities []Availability, availability Availability) []Availability {
+ for i := range availabilities {
+ if availabilities[i] == availability {
+ availabilities[i] = availabilities[len(availabilities)-1]
+ return availabilities[:len(availabilities)-1]
+ }
+ }
+ return availabilities
}
func (p *rwFolder) moveForConflict(name string) error {
diff --git a/lib/model/rwfolder_test.go b/lib/model/rwfolder_test.go
index a7fb4b28..62f0e5b0 100644
--- a/lib/model/rwfolder_test.go
+++ b/lib/model/rwfolder_test.go
@@ -104,9 +104,16 @@ func TestHandleFile(t *testing.T) {
t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks))
}
- for i, block := range toCopy.blocks {
- if string(block.Hash) != string(blocks[i+1].Hash) {
- t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String())
+ for _, block := range blocks[1:] {
+ found := false
+ for _, toCopyBlock := range toCopy.blocks {
+ if string(toCopyBlock.Hash) == string(block.Hash) {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("Did not find block %s", block.String())
}
}
}
@@ -138,9 +145,17 @@ func TestHandleFileWithTemp(t *testing.T) {
t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks))
}
- for i, eq := range []int{1, 5, 6, 8} {
- if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
- t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
+ for _, idx := range []int{1, 5, 6, 8} {
+ found := false
+ block := blocks[idx]
+ for _, toCopyBlock := range toCopy.blocks {
+ if string(toCopyBlock.Hash) == string(block.Hash) {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("Did not find block %s", block.String())
}
}
}
@@ -187,13 +202,22 @@ func TestCopierFinder(t *testing.T) {
default:
}
- // Verify that the right blocks went into the pull list
- for i, eq := range []int{1, 5, 6, 8} {
- if string(pulls[i].block.Hash) != string(blocks[eq].Hash) {
- t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String())
+ // Verify that the right blocks went into the pull list.
+ // They are pulled in random order.
+ for _, idx := range []int{1, 5, 6, 8} {
+ found := false
+ block := blocks[idx]
+ for _, pulledBlock := range pulls {
+ if string(pulledBlock.block.Hash) == string(block.Hash) {
+ found = true
+ break
+ }
}
- if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) {
- t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String())
+ if !found {
+ t.Errorf("Did not find block %s", block.String())
+ }
+ if string(finish.file.Blocks[idx-1].Hash) != string(blocks[idx].Hash) {
+ t.Errorf("Block %d mismatch: %s != %s", idx, finish.file.Blocks[idx-1].String(), blocks[idx].String())
}
}
diff --git a/lib/model/sentdownloadstate.go b/lib/model/sentdownloadstate.go
new file mode 100644
index 00000000..9fd50b0b
--- /dev/null
+++ b/lib/model/sentdownloadstate.go
@@ -0,0 +1,184 @@
+// Copyright (C) 2015 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+ "time"
+
+ "github.com/syncthing/syncthing/lib/protocol"
+)
+
+// sentFolderFileDownloadState represents a state of what we've announced as available
+// to some remote device for a specific file.
+type sentFolderFileDownloadState struct {
+ blockIndexes []int32
+ version protocol.Vector
+ updated time.Time
+}
+
+// sentFolderDownloadState represents a state of what we've announced as available
+// to some remote device for a specific folder.
+type sentFolderDownloadState struct {
+ files map[string]*sentFolderFileDownloadState
+}
+
+// update takes a set of currently active sharedPullerStates, and returns a list
+// of updates which we need to send to the client to become up to date.
+func (s *sentFolderDownloadState) update(pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
+ var name string
+ var updates []protocol.FileDownloadProgressUpdate
+ seen := make(map[string]struct{}, len(pullers))
+
+ for _, puller := range pullers {
+ name = puller.file.Name
+
+ seen[name] = struct{}{}
+
+ pullerBlockIndexes := puller.Available()
+ pullerVersion := puller.file.Version
+ pullerBlockIndexesUpdated := puller.AvailableUpdated()
+
+ localFile, ok := s.files[name]
+
+ // New file we haven't seen before
+ if !ok {
+ // Only send an update if the file actually has some blocks.
+ if len(pullerBlockIndexes) > 0 {
+ s.files[name] = &sentFolderFileDownloadState{
+ blockIndexes: pullerBlockIndexes,
+ updated: pullerBlockIndexesUpdated,
+ version: pullerVersion,
+ }
+
+ updates = append(updates, protocol.FileDownloadProgressUpdate{
+ Name: name,
+ Version: pullerVersion,
+ UpdateType: protocol.UpdateTypeAppend,
+ BlockIndexes: pullerBlockIndexes,
+ })
+ }
+ continue
+ }
+
+ // Existing file we've already sent an update for.
+ if pullerBlockIndexesUpdated.Equal(localFile.updated) && pullerVersion.Equal(localFile.version) {
+ // The file state hasn't changed, go to next.
+ continue
+ }
+
+ if !pullerVersion.Equal(localFile.version) {
+ // The version has changed, clean up whatever we had for the old
+ // file, and advertise the new file.
+ updates = append(updates, protocol.FileDownloadProgressUpdate{
+ Name: name,
+ Version: localFile.version,
+ UpdateType: protocol.UpdateTypeForget,
+ })
+ updates = append(updates, protocol.FileDownloadProgressUpdate{
+ Name: name,
+ Version: pullerVersion,
+ UpdateType: protocol.UpdateTypeAppend,
+ BlockIndexes: pullerBlockIndexes,
+ })
+ localFile.blockIndexes = pullerBlockIndexes
+ localFile.updated = pullerBlockIndexesUpdated
+ localFile.version = pullerVersion
+ continue
+ }
+
+ // Relies on the fact that sharedPullerState.Available() should always
+ // append.
+ newBlocks := pullerBlockIndexes[len(localFile.blockIndexes):]
+
+ localFile.blockIndexes = append(localFile.blockIndexes, newBlocks...)
+ localFile.updated = pullerBlockIndexesUpdated
+
+ // If there are new blocks, send the update.
+ if len(newBlocks) > 0 {
+ updates = append(updates, protocol.FileDownloadProgressUpdate{
+ Name: name,
+ Version: localFile.version,
+ UpdateType: protocol.UpdateTypeAppend,
+ BlockIndexes: newBlocks,
+ })
+ }
+ }
+
+ // For each file that we are tracking, see if there still is a puller for it
+ // if not, the file completed or errored out.
+ for name, info := range s.files {
+ _, ok := seen[name]
+ if !ok {
+ updates = append(updates, protocol.FileDownloadProgressUpdate{
+ Name: name,
+ Version: info.version,
+ UpdateType: protocol.UpdateTypeForget,
+ })
+ delete(s.files, name)
+ }
+ }
+
+ return updates
+}
+
+// destroy removes all stored state, and returns a set of updates we need to
+// dispatch to clean up the state on the remote end.
+func (s *sentFolderDownloadState) destroy() []protocol.FileDownloadProgressUpdate {
+ updates := make([]protocol.FileDownloadProgressUpdate, 0, len(s.files))
+ for name, info := range s.files {
+ updates = append(updates, protocol.FileDownloadProgressUpdate{
+ Name: name,
+ Version: info.version,
+ UpdateType: protocol.UpdateTypeForget,
+ })
+ delete(s.files, name)
+ }
+ return updates
+}
+
+// sentDownloadState represents a state of what we've announced as available
+// to some remote device. It is used from within the progress emitter
+// which only has one routine, hence is deemed threadsafe.
+type sentDownloadState struct {
+ folderStates map[string]*sentFolderDownloadState
+}
+
+// update receives a folder, and a slice of pullers that are currently available
+// for the given folder, and according to the state of what we've seen before
+// returns a set of updates which we should send to the remote device to make
+// it aware of everything that we currently have available.
+func (s *sentDownloadState) update(folder string, pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
+ fs, ok := s.folderStates[folder]
+ if !ok {
+ fs = &sentFolderDownloadState{
+ files: make(map[string]*sentFolderFileDownloadState),
+ }
+ s.folderStates[folder] = fs
+ }
+ return fs.update(pullers)
+}
+
+// folders returns a set of folders this state is currently aware off.
+func (s *sentDownloadState) folders() []string {
+ folders := make([]string, 0, len(s.folderStates))
+ for key := range s.folderStates {
+ folders = append(folders, key)
+ }
+ return folders
+}
+
+// cleanup cleans up all state related to a folder, and returns a set of updates
+// which would clean up the state on the remote device.
+func (s *sentDownloadState) cleanup(folder string) []protocol.FileDownloadProgressUpdate {
+ fs, ok := s.folderStates[folder]
+ if ok {
+ updates := fs.destroy()
+ delete(s.folderStates, folder)
+ return updates
+ }
+ return nil
+}
diff --git a/lib/model/sharedpullerstate.go b/lib/model/sharedpullerstate.go
index 509b1fcf..f079fdad 100644
--- a/lib/model/sharedpullerstate.go
+++ b/lib/model/sharedpullerstate.go
@@ -10,6 +10,7 @@ import (
"io"
"os"
"path/filepath"
+ "time"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/protocol"
@@ -30,15 +31,18 @@ type sharedPullerState struct {
sparse bool
// Mutable, must be locked for access
- err error // The first error we hit
- fd *os.File // The fd of the temp file
- copyTotal int // Total number of copy actions for the whole job
- pullTotal int // Total number of pull actions for the whole job
- copyOrigin int // Number of blocks copied from the original file
- copyNeeded int // Number of copy actions still pending
- pullNeeded int // Number of block pulls still pending
- closed bool // True if the file has been finalClosed.
- mut sync.Mutex // Protects the above
+ err error // The first error we hit
+ fd *os.File // The fd of the temp file
+ copyTotal int // Total number of copy actions for the whole job
+ pullTotal int // Total number of pull actions for the whole job
+ copyOrigin int // Number of blocks copied from the original file
+ copyNeeded int // Number of copy actions still pending
+ pullNeeded int // Number of block pulls still pending
+ updated time.Time // Time when any of the counters above were last updated
+ closed bool // True if the file has been finalClosed.
+ available []int32 // Indexes of the blocks that are available in the temporary file
+ availableUpdated time.Time // Time when list of available blocks was last updated
+ mut sync.RWMutex // Protects the above
}
// A momentary state representing the progress of the puller
@@ -56,7 +60,7 @@ type pullerProgress struct {
// A lockedWriterAt synchronizes WriteAt calls with an external mutex.
// WriteAt() is goroutine safe by itself, but not against for example Close().
type lockedWriterAt struct {
- mut *sync.Mutex
+ mut *sync.RWMutex
wr io.WriterAt
}
@@ -196,15 +200,19 @@ func (s *sharedPullerState) failLocked(context string, err error) {
}
func (s *sharedPullerState) failed() error {
- s.mut.Lock()
- defer s.mut.Unlock()
+ s.mut.RLock()
+ err := s.err
+ s.mut.RUnlock()
- return s.err
+ return err
}
-func (s *sharedPullerState) copyDone() {
+func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
s.mut.Lock()
s.copyNeeded--
+ s.updated = time.Now()
+ s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
+ s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
s.mut.Unlock()
}
@@ -212,6 +220,7 @@ func (s *sharedPullerState) copyDone() {
func (s *sharedPullerState) copiedFromOrigin() {
s.mut.Lock()
s.copyOrigin++
+ s.updated = time.Now()
s.mut.Unlock()
}
@@ -221,13 +230,17 @@ func (s *sharedPullerState) pullStarted() {
s.copyNeeded--
s.pullTotal++
s.pullNeeded++
+ s.updated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
s.mut.Unlock()
}
-func (s *sharedPullerState) pullDone() {
+func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
s.mut.Lock()
s.pullNeeded--
+ s.updated = time.Now()
+ s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
+ s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
s.mut.Unlock()
}
@@ -265,10 +278,10 @@ func (s *sharedPullerState) finalClose() (bool, error) {
return true, s.err
}
-// Returns the momentarily progress for the puller
+// Progress returns the momentarily progress for the puller
func (s *sharedPullerState) Progress() *pullerProgress {
- s.mut.Lock()
- defer s.mut.Unlock()
+ s.mut.RLock()
+ defer s.mut.RUnlock()
total := s.reused + s.copyTotal + s.pullTotal
done := total - s.copyNeeded - s.pullNeeded
return &pullerProgress{
@@ -282,3 +295,27 @@ func (s *sharedPullerState) Progress() *pullerProgress {
BytesDone: db.BlocksToSize(done),
}
}
+
+// Updated returns the time when any of the progress related counters was last updated.
+func (s *sharedPullerState) Updated() time.Time {
+ s.mut.RLock()
+ t := s.updated
+ s.mut.RUnlock()
+ return t
+}
+
+// AvailableUpdated returns the time last time list of available blocks was updated
+func (s *sharedPullerState) AvailableUpdated() time.Time {
+ s.mut.RLock()
+ t := s.availableUpdated
+ s.mut.RUnlock()
+ return t
+}
+
+// Available returns blocks available in the current temporary file
+func (s *sharedPullerState) Available() []int32 {
+ s.mut.RLock()
+ blocks := s.available
+ s.mut.RUnlock()
+ return blocks
+}
diff --git a/lib/model/sharedpullerstate_test.go b/lib/model/sharedpullerstate_test.go
index edcafde0..315c3d41 100644
--- a/lib/model/sharedpullerstate_test.go
+++ b/lib/model/sharedpullerstate_test.go
@@ -16,7 +16,7 @@ import (
func TestSourceFileOK(t *testing.T) {
s := sharedPullerState{
realName: "testdata/foo",
- mut: sync.NewMutex(),
+ mut: sync.NewRWMutex(),
}
fd, err := s.sourceFile()
@@ -45,7 +45,7 @@ func TestSourceFileOK(t *testing.T) {
func TestSourceFileBad(t *testing.T) {
s := sharedPullerState{
realName: "nonexistent",
- mut: sync.NewMutex(),
+ mut: sync.NewRWMutex(),
}
fd, err := s.sourceFile()
@@ -71,7 +71,7 @@ func TestReadOnlyDir(t *testing.T) {
s := sharedPullerState{
tempName: "testdata/read_only_dir/.temp_name",
- mut: sync.NewMutex(),
+ mut: sync.NewRWMutex(),
}
fd, err := s.tempFile()
diff --git a/lib/protocol/common_test.go b/lib/protocol/common_test.go
index c3b8cce5..849b4e1d 100644
--- a/lib/protocol/common_test.go
+++ b/lib/protocol/common_test.go
@@ -49,6 +49,9 @@ func (t *TestModel) Close(deviceID DeviceID, err error) {
func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
}
+func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate, uint32, []Option) {
+}
+
func (t *TestModel) closedError() error {
select {
case <-t.closedCh:
diff --git a/lib/protocol/message.go b/lib/protocol/message.go
index 92626b75..9e195661 100644
--- a/lib/protocol/message.go
+++ b/lib/protocol/message.go
@@ -138,6 +138,13 @@ type ClusterConfigMessage struct {
Options []Option // max:64
}
+type DownloadProgressMessage struct {
+ Folder string // max:64
+ Updates []FileDownloadProgressUpdate // max:1000000
+ Flags uint32
+ Options []Option // max:64
+}
+
func (o *ClusterConfigMessage) GetOption(key string) string {
for _, option := range o.Options {
if option.Key == key {
@@ -166,6 +173,13 @@ type Device struct {
Options []Option // max:64
}
+type FileDownloadProgressUpdate struct {
+ UpdateType uint32
+ Name string // max:8192
+ Version Vector
+ BlockIndexes []int32 // max:1000000
+}
+
type Option struct {
Key string // max:64
Value string // max:1024
diff --git a/lib/protocol/message_xdr.go b/lib/protocol/message_xdr.go
index a7d1cec4..206bd394 100644
--- a/lib/protocol/message_xdr.go
+++ b/lib/protocol/message_xdr.go
@@ -690,6 +690,135 @@ func (o *ClusterConfigMessage) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
/*
+DownloadProgressMessage Structure:
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ Folder (length + padded data) \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Number of Updates |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ Zero or more FileDownloadProgressUpdate Structures \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Flags |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Number of Options |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ Zero or more Option Structures \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct DownloadProgressMessage {
+ string Folder<64>;
+ FileDownloadProgressUpdate Updates<1000000>;
+ unsigned int Flags;
+ Option Options<64>;
+}
+
+*/
+
+func (o DownloadProgressMessage) XDRSize() int {
+ return 4 + len(o.Folder) + xdr.Padding(len(o.Folder)) +
+ 4 + xdr.SizeOfSlice(o.Updates) + 4 +
+ 4 + xdr.SizeOfSlice(o.Options)
+}
+
+func (o DownloadProgressMessage) MarshalXDR() ([]byte, error) {
+ buf := make([]byte, o.XDRSize())
+ m := &xdr.Marshaller{Data: buf}
+ return buf, o.MarshalXDRInto(m)
+}
+
+func (o DownloadProgressMessage) MustMarshalXDR() []byte {
+ bs, err := o.MarshalXDR()
+ if err != nil {
+ panic(err)
+ }
+ return bs
+}
+
+func (o DownloadProgressMessage) MarshalXDRInto(m *xdr.Marshaller) error {
+ if l := len(o.Folder); l > 64 {
+ return xdr.ElementSizeExceeded("Folder", l, 64)
+ }
+ m.MarshalString(o.Folder)
+ if l := len(o.Updates); l > 1000000 {
+ return xdr.ElementSizeExceeded("Updates", l, 1000000)
+ }
+ m.MarshalUint32(uint32(len(o.Updates)))
+ for i := range o.Updates {
+ if err := o.Updates[i].MarshalXDRInto(m); err != nil {
+ return err
+ }
+ }
+ m.MarshalUint32(o.Flags)
+ if l := len(o.Options); l > 64 {
+ return xdr.ElementSizeExceeded("Options", l, 64)
+ }
+ m.MarshalUint32(uint32(len(o.Options)))
+ for i := range o.Options {
+ if err := o.Options[i].MarshalXDRInto(m); err != nil {
+ return err
+ }
+ }
+ return m.Error
+}
+
+func (o *DownloadProgressMessage) UnmarshalXDR(bs []byte) error {
+ u := &xdr.Unmarshaller{Data: bs}
+ return o.UnmarshalXDRFrom(u)
+}
+func (o *DownloadProgressMessage) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
+ o.Folder = u.UnmarshalStringMax(64)
+ _UpdatesSize := int(u.UnmarshalUint32())
+ if _UpdatesSize < 0 {
+ return xdr.ElementSizeExceeded("Updates", _UpdatesSize, 1000000)
+ } else if _UpdatesSize == 0 {
+ o.Updates = nil
+ } else {
+ if _UpdatesSize > 1000000 {
+ return xdr.ElementSizeExceeded("Updates", _UpdatesSize, 1000000)
+ }
+ if _UpdatesSize <= len(o.Updates) {
+ o.Updates = o.Updates[:_UpdatesSize]
+ } else {
+ o.Updates = make([]FileDownloadProgressUpdate, _UpdatesSize)
+ }
+ for i := range o.Updates {
+ (&o.Updates[i]).UnmarshalXDRFrom(u)
+ }
+ }
+ o.Flags = u.UnmarshalUint32()
+ _OptionsSize := int(u.UnmarshalUint32())
+ if _OptionsSize < 0 {
+ return xdr.ElementSizeExceeded("Options", _OptionsSize, 64)
+ } else if _OptionsSize == 0 {
+ o.Options = nil
+ } else {
+ if _OptionsSize > 64 {
+ return xdr.ElementSizeExceeded("Options", _OptionsSize, 64)
+ }
+ if _OptionsSize <= len(o.Options) {
+ o.Options = o.Options[:_OptionsSize]
+ } else {
+ o.Options = make([]Option, _OptionsSize)
+ }
+ for i := range o.Options {
+ (&o.Options[i]).UnmarshalXDRFrom(u)
+ }
+ }
+ return u.Error
+}
+
+/*
+
Folder Structure:
0 1 2 3
@@ -996,6 +1125,109 @@ func (o *Device) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
/*
+FileDownloadProgressUpdate Structure:
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Update Type |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ Name (length + padded data) \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ Vector Structure \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Number of Block Indexes |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+| Block Indexes (n items) |
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct FileDownloadProgressUpdate {
+ unsigned int UpdateType;
+ string Name<8192>;
+ Vector Version;
+ int BlockIndexes<1000000>;
+}
+
+*/
+
+func (o FileDownloadProgressUpdate) XDRSize() int {
+ return 4 +
+ 4 + len(o.Name) + xdr.Padding(len(o.Name)) +
+ o.Version.XDRSize() +
+ 4 + len(o.BlockIndexes)*4
+}
+
+func (o FileDownloadProgressUpdate) MarshalXDR() ([]byte, error) {
+ buf := make([]byte, o.XDRSize())
+ m := &xdr.Marshaller{Data: buf}
+ return buf, o.MarshalXDRInto(m)
+}
+
+func (o FileDownloadProgressUpdate) MustMarshalXDR() []byte {
+ bs, err := o.MarshalXDR()
+ if err != nil {
+ panic(err)
+ }
+ return bs
+}
+
+func (o FileDownloadProgressUpdate) MarshalXDRInto(m *xdr.Marshaller) error {
+ m.MarshalUint32(o.UpdateType)
+ if l := len(o.Name); l > 8192 {
+ return xdr.ElementSizeExceeded("Name", l, 8192)
+ }
+ m.MarshalString(o.Name)
+ if err := o.Version.MarshalXDRInto(m); err != nil {
+ return err
+ }
+ if l := len(o.BlockIndexes); l > 1000000 {
+ return xdr.ElementSizeExceeded("BlockIndexes", l, 1000000)
+ }
+ m.MarshalUint32(uint32(len(o.BlockIndexes)))
+ for i := range o.BlockIndexes {
+ m.MarshalUint32(uint32(o.BlockIndexes[i]))
+ }
+ return m.Error
+}
+
+func (o *FileDownloadProgressUpdate) UnmarshalXDR(bs []byte) error {
+ u := &xdr.Unmarshaller{Data: bs}
+ return o.UnmarshalXDRFrom(u)
+}
+func (o *FileDownloadProgressUpdate) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
+ o.UpdateType = u.UnmarshalUint32()
+ o.Name = u.UnmarshalStringMax(8192)
+ (&o.Version).UnmarshalXDRFrom(u)
+ _BlockIndexesSize := int(u.UnmarshalUint32())
+ if _BlockIndexesSize < 0 {
+ return xdr.ElementSizeExceeded("BlockIndexes", _BlockIndexesSize, 1000000)
+ } else if _BlockIndexesSize == 0 {
+ o.BlockIndexes = nil
+ } else {
+ if _BlockIndexesSize > 1000000 {
+ return xdr.ElementSizeExceeded("BlockIndexes", _BlockIndexesSize, 1000000)
+ }
+ if _BlockIndexesSize <= len(o.BlockIndexes) {
+ o.BlockIndexes = o.BlockIndexes[:_BlockIndexesSize]
+ } else {
+ o.BlockIndexes = make([]int32, _BlockIndexesSize)
+ }
+ for i := range o.BlockIndexes {
+ o.BlockIndexes[i] = int32(u.UnmarshalUint32())
+ }
+ }
+ return u.Error
+}
+
+/*
+
Option Structure:
0 1 2 3
diff --git a/lib/protocol/nativemodel_darwin.go b/lib/protocol/nativemodel_darwin.go
index eb755a6e..34910425 100644
--- a/lib/protocol/nativemodel_darwin.go
+++ b/lib/protocol/nativemodel_darwin.go
@@ -9,32 +9,24 @@ package protocol
import "golang.org/x/text/unicode/norm"
type nativeModel struct {
- next Model
+ Model
}
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
for i := range files {
files[i].Name = norm.NFD.String(files[i].Name)
}
- m.next.Index(deviceID, folder, files, flags, options)
+ m.Model.Index(deviceID, folder, files, flags, options)
}
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
for i := range files {
files[i].Name = norm.NFD.String(files[i].Name)
}
- m.next.IndexUpdate(deviceID, folder, files, flags, options)
+ m.Model.IndexUpdate(deviceID, folder, files, flags, options)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
name = norm.NFD.String(name)
- return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
-}
-
-func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
- m.next.ClusterConfig(deviceID, config)
-}
-
-func (m nativeModel) Close(deviceID DeviceID, err error) {
- m.next.Close(deviceID, err)
+ return m.Model.Request(deviceID, folder, name, offset, hash, flags, options, buf)
}
diff --git a/lib/protocol/nativemodel_unix.go b/lib/protocol/nativemodel_unix.go
index 0611865e..ea6ec8db 100644
--- a/lib/protocol/nativemodel_unix.go
+++ b/lib/protocol/nativemodel_unix.go
@@ -7,25 +7,5 @@ package protocol
// Normal Unixes uses NFC and slashes, which is the wire format.
type nativeModel struct {
- next Model
-}
-
-func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
- m.next.Index(deviceID, folder, files, flags, options)
-}
-
-func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
- m.next.IndexUpdate(deviceID, folder, files, flags, options)
-}
-
-func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
- return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
-}
-
-func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
- m.next.ClusterConfig(deviceID, config)
-}
-
-func (m nativeModel) Close(deviceID DeviceID, err error) {
- m.next.Close(deviceID, err)
+ Model
}
diff --git a/lib/protocol/nativemodel_windows.go b/lib/protocol/nativemodel_windows.go
index 36a1d274..186ddfa7 100644
--- a/lib/protocol/nativemodel_windows.go
+++ b/lib/protocol/nativemodel_windows.go
@@ -21,30 +21,22 @@ var disallowedCharacters = string([]rune{
})
type nativeModel struct {
- next Model
+ Model
}
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
fixupFiles(folder, files)
- m.next.Index(deviceID, folder, files, flags, options)
+ m.Model.Index(deviceID, folder, files, flags, options)
}
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
fixupFiles(folder, files)
- m.next.IndexUpdate(deviceID, folder, files, flags, options)
+ m.Model.IndexUpdate(deviceID, folder, files, flags, options)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
name = filepath.FromSlash(name)
- return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
-}
-
-func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
- m.next.ClusterConfig(deviceID, config)
-}
-
-func (m nativeModel) Close(deviceID DeviceID, err error) {
- m.next.Close(deviceID, err)
+ return m.Model.Request(deviceID, folder, name, offset, hash, flags, options, buf)
}
func fixupFiles(folder string, files []FileInfo) {
diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go
index dfc16e05..fffffec6 100644
--- a/lib/protocol/protocol.go
+++ b/lib/protocol/protocol.go
@@ -24,13 +24,14 @@ const (
)
const (
- messageTypeClusterConfig = 0
- messageTypeIndex = 1
- messageTypeRequest = 2
- messageTypeResponse = 3
- messageTypePing = 4
- messageTypeIndexUpdate = 6
- messageTypeClose = 7
+ messageTypeClusterConfig = 0
+ messageTypeIndex = 1
+ messageTypeRequest = 2
+ messageTypeResponse = 3
+ messageTypePing = 4
+ messageTypeIndexUpdate = 6
+ messageTypeClose = 7
+ messageTypeDownloadProgress = 8
)
const (
@@ -52,22 +53,29 @@ const (
SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
)
-// IndexMessage message flags (for IndexUpdate)
-const (
- FlagIndexTemporary uint32 = 1 << iota
-)
-
// Request message flags
const (
- FlagRequestTemporary uint32 = 1 << iota
+ FlagFromTemporary uint32 = 1 << iota
+)
+
+// FileDownloadProgressUpdate update types
+const (
+ UpdateTypeAppend uint32 = iota
+ UpdateTypeForget
+)
+
+// CLusterConfig flags
+const (
+ FlagClusterConfigTemporaryIndexes uint32 = 1 << 0
)
// ClusterConfigMessage.Folders flags
const (
- FlagFolderReadOnly uint32 = 1 << 0
- FlagFolderIgnorePerms = 1 << 1
- FlagFolderIgnoreDelete = 1 << 2
- FlagFolderAll = 1<<3 - 1
+ FlagFolderReadOnly uint32 = 1 << 0
+ FlagFolderIgnorePerms = 1 << 1
+ FlagFolderIgnoreDelete = 1 << 2
+ FlagFolderDisabledTempIndexes = 1 << 3
+ FlagFolderAll = 1<<4 - 1
)
// ClusterConfigMessage.Folders.Devices flags
@@ -97,6 +105,8 @@ type Model interface {
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
// The peer device closed the connection
Close(deviceID DeviceID, err error)
+ // The peer device sent progress updates for the files it is currently downloading
+ DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
}
type Connection interface {
@@ -105,8 +115,9 @@ type Connection interface {
Name() string
Index(folder string, files []FileInfo, flags uint32, options []Option) error
IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
- Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
+ Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
ClusterConfig(config ClusterConfigMessage)
+ DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
Statistics() Statistics
Closed() bool
}
@@ -242,7 +253,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32,
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
-func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
+func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
var id int
select {
case id = <-c.nextID:
@@ -250,6 +261,12 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
return nil, ErrClosed
}
+ var flags uint32
+
+ if fromTemporary {
+ flags |= FlagFromTemporary
+ }
+
c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
@@ -265,7 +282,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
Size: int32(size),
Hash: hash,
Flags: flags,
- Options: options,
+ Options: nil,
}, nil)
if !ok {
return nil, ErrClosed
@@ -292,6 +309,16 @@ func (c *rawConnection) Closed() bool {
}
}
+// DownloadProgress sends the progress updates for the files that are currently being downloaded.
+func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) {
+ c.send(-1, messageTypeDownloadProgress, DownloadProgressMessage{
+ Folder: folder,
+ Updates: updates,
+ Flags: flags,
+ Options: options,
+ }, nil)
+}
+
func (c *rawConnection) ping() bool {
var id int
select {
@@ -359,6 +386,12 @@ func (c *rawConnection) readerLoop() (err error) {
}
c.handleResponse(hdr.msgID, msg)
+ case DownloadProgressMessage:
+ if state != stateReady {
+ return fmt.Errorf("protocol error: response message in state %d", state)
+ }
+ c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates, msg.Flags, msg.Options)
+
case pingMessage:
if state != stateReady {
return fmt.Errorf("protocol error: ping message in state %d", state)
@@ -469,6 +502,14 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
err = cm.UnmarshalXDR(msgBuf)
msg = cm
+ case messageTypeDownloadProgress:
+ var dp DownloadProgressMessage
+ err := dp.UnmarshalXDR(msgBuf)
+ if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
+ err = nil
+ }
+ msg = dp
+
default:
err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
}
diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go
index f17c76da..82cdd075 100644
--- a/lib/protocol/protocol_test.go
+++ b/lib/protocol/protocol_test.go
@@ -183,7 +183,7 @@ func TestClose(t *testing.T) {
c0.Index("default", nil, 0, nil)
c0.Index("default", nil, 0, nil)
- if _, err := c0.Request("default", "foo", 0, 0, nil, 0, nil); err == nil {
+ if _, err := c0.Request("default", "foo", 0, 0, nil, false); err == nil {
t.Error("Request should return an error")
}
}
diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go
index a0d3cd58..6847bb37 100644
--- a/lib/protocol/wireformat.go
+++ b/lib/protocol/wireformat.go
@@ -34,7 +34,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo, flags ui
return c.Connection.IndexUpdate(folder, myFs, flags, options)
}
-func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
+func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
name = norm.NFC.String(filepath.ToSlash(name))
- return c.Connection.Request(folder, name, offset, size, hash, flags, options)
+ return c.Connection.Request(folder, name, offset, size, hash, fromTemporary)
}