diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go
index 98a80933..cbf2da76 100644
--- a/cmd/syncthing/gui.go
+++ b/cmd/syncthing/gui.go
@@ -149,6 +149,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
postRestMux.HandleFunc("/rest/shutdown", restPostShutdown)
postRestMux.HandleFunc("/rest/upgrade", restPostUpgrade)
postRestMux.HandleFunc("/rest/scan", withModel(m, restPostScan))
+ postRestMux.HandleFunc("/rest/bump", withModel(m, restPostBump))
// A handler that splits requests between the two above and disables
// caching
@@ -314,19 +315,12 @@ func restGetNeed(m *model.Model, w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query()
var folder = qs.Get("folder")
- files := m.NeedFolderFilesLimited(folder, 100) // max 100 files
+ progress, queued, rest := m.NeedFolderFiles(folder, 100)
// Convert the struct to a more loose structure, and inject the size.
- output := make([]map[string]interface{}, 0, len(files))
- for _, file := range files {
- output = append(output, map[string]interface{}{
- "Name": file.Name,
- "Flags": file.Flags,
- "Modified": file.Modified,
- "Version": file.Version,
- "LocalVersion": file.LocalVersion,
- "NumBlocks": file.NumBlocks,
- "Size": protocol.BlocksToSize(file.NumBlocks),
- })
+ output := map[string][]map[string]interface{}{
+ "progress": toNeedSlice(progress),
+ "queued": toNeedSlice(queued),
+ "rest": toNeedSlice(rest),
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@@ -650,6 +644,14 @@ func restPostScan(m *model.Model, w http.ResponseWriter, r *http.Request) {
}
}
+func restPostBump(m *model.Model, w http.ResponseWriter, r *http.Request) {
+ qs := r.URL.Query()
+ folder := qs.Get("folder")
+ file := qs.Get("file")
+ m.BringToFront(folder, file)
+ restGetNeed(m, w, r)
+}
+
func getQR(w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query()
var text = qs.Get("text")
@@ -775,3 +777,19 @@ func mimeTypeForFile(file string) string {
return mime.TypeByExtension(ext)
}
}
+
+func toNeedSlice(files []protocol.FileInfoTruncated) []map[string]interface{} {
+ output := make([]map[string]interface{}, len(files))
+ for i, file := range files {
+ output[i] = map[string]interface{}{
+ "Name": file.Name,
+ "Flags": file.Flags,
+ "Modified": file.Modified,
+ "Version": file.Version,
+ "LocalVersion": file.LocalVersion,
+ "NumBlocks": file.NumBlocks,
+ "Size": protocol.BlocksToSize(file.NumBlocks),
+ }
+ }
+ return output
+}
diff --git a/gui/index.html b/gui/index.html
index 890c8da3..b1fb4251 100644
--- a/gui/index.html
+++ b/gui/index.html
@@ -801,21 +801,37 @@
-
+
| {{needActions[a]}} |
{{f.Name | basename}} |
-
-
-
-
-
-
-
-
- {{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B
-
-
+ |
+
+
+
+
+
+
+
+ {{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B
+
+
|
+
+ {{f.Size | binary}}B
+ |
+
+
+ | {{needActions[a]}} |
+ {{f.Name | basename}} |
+
+
+ {{f.Size | binary}}B
+ |
+
+
+ | {{needActions[a]}} |
+ {{f.Name | basename}} |
+ {{f.Size | binary}}B |
diff --git a/gui/scripts/syncthing/core/controllers/syncthingController.js b/gui/scripts/syncthing/core/controllers/syncthingController.js
index 14142c1f..387030d3 100644
--- a/gui/scripts/syncthing/core/controllers/syncthingController.js
+++ b/gui/scripts/syncthing/core/controllers/syncthingController.js
@@ -1056,6 +1056,15 @@ angular.module('syncthing.core')
$http.post(urlbase + "/scan?folder=" + encodeURIComponent(folder));
};
+ $scope.bumpFile = function (folder, file) {
+ $http.post(urlbase + "/bump?folder=" + encodeURIComponent(folder) + "&file=" + encodeURIComponent(file)).success(function (data) {
+ if ($scope.neededFolder == folder) {
+ console.log("bumpFile", folder, data);
+ $scope.needed = data;
+ }
+ });
+ };
+
// pseudo main. called on all definitions assigned
initController();
});
diff --git a/internal/auto/gui.files.go b/internal/auto/gui.files.go
index 6f7d1b08..7b5b2ad0 100644
--- a/internal/auto/gui.files.go
+++ b/internal/auto/gui.files.go
@@ -147,7 +147,7 @@ func Assets() map[string][]byte {
bs, _ = ioutil.ReadAll(gr)
assets["assets/lang/valid-langs.js"] = bs
- bs, _ = base64.StdEncoding.DecodeString("")
+ bs, _ = base64.StdEncoding.DecodeString("")
gr, _ = gzip.NewReader(bytes.NewBuffer(bs))
bs, _ = ioutil.ReadAll(gr)
assets["index.html"] = bs
@@ -167,7 +167,7 @@ func Assets() map[string][]byte {
bs, _ = ioutil.ReadAll(gr)
assets["scripts/syncthing/core/controllers/eventController.js"] = bs
- bs, _ = base64.StdEncoding.DecodeString("")
+ bs, _ = base64.StdEncoding.DecodeString("")
gr, _ = gzip.NewReader(bytes.NewBuffer(bs))
bs, _ = ioutil.ReadAll(gr)
assets["scripts/syncthing/core/controllers/syncthingController.js"] = bs
diff --git a/internal/model/model.go b/internal/model/model.go
index c1a64d1e..c17bc366 100644
--- a/internal/model/model.go
+++ b/internal/model/model.go
@@ -79,6 +79,8 @@ const (
type service interface {
Serve()
Stop()
+ Jobs() ([]string, []string) // In progress, Queued
+ BringToFront(string)
}
type Model struct {
@@ -189,6 +191,7 @@ func (m *Model) StartFolderRW(folder string) {
copiers: cfg.Copiers,
pullers: cfg.Pullers,
finishers: cfg.Finishers,
+ queue: newJobQueue(),
}
m.folderRunners[folder] = p
m.fmut.Unlock()
@@ -416,22 +419,50 @@ func (m *Model) NeedSize(folder string) (files int, bytes int64) {
return
}
-// NeedFiles returns the list of currently needed files, stopping at maxFiles
-// files. Limit <= 0 is ignored.
-func (m *Model) NeedFolderFilesLimited(folder string, maxFiles int) []protocol.FileInfoTruncated {
+// NeedFiles returns the list of currently needed files in progress, queued,
+// and to be queued on next puller iteration. Also takes a soft cap which is
+// only respected when adding files from the model rather than the runner queue.
+func (m *Model) NeedFolderFiles(folder string, max int) ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated, []protocol.FileInfoTruncated) {
defer m.leveldbPanicWorkaround()
m.fmut.RLock()
defer m.fmut.RUnlock()
if rf, ok := m.folderFiles[folder]; ok {
- fs := make([]protocol.FileInfoTruncated, 0, maxFiles)
- rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool {
- fs = append(fs, f.(protocol.FileInfoTruncated))
- return maxFiles <= 0 || len(fs) < maxFiles
- })
- return fs
+ var progress, queued, rest []protocol.FileInfoTruncated
+ var seen map[string]bool
+
+ runner, ok := m.folderRunners[folder]
+ if ok {
+ progressNames, queuedNames := runner.Jobs()
+
+ progress = make([]protocol.FileInfoTruncated, len(progressNames))
+ queued = make([]protocol.FileInfoTruncated, len(queuedNames))
+ seen = make(map[string]bool, len(progressNames)+len(queuedNames))
+
+ for i, name := range progressNames {
+ progress[i] = rf.GetGlobal(name).ToTruncated() /// XXX: Should implement GetGlobalTruncated directly
+ seen[name] = true
+ }
+
+ for i, name := range queuedNames {
+ queued[i] = rf.GetGlobal(name).ToTruncated() /// XXX: Should implement GetGlobalTruncated directly
+ seen[name] = true
+ }
+ }
+ left := max - len(progress) - len(queued)
+ if max < 1 || left > 0 {
+ rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool {
+ left--
+ ft := f.(protocol.FileInfoTruncated)
+ if !seen[ft.Name] {
+ rest = append(rest, ft)
+ }
+ return max < 1 || left > 0
+ })
+ }
+ return progress, queued, rest
}
- return nil
+ return nil, nil, nil
}
// Index is called when a new device is connected and we receive their full index.
@@ -1336,7 +1367,7 @@ func (m *Model) RemoteLocalVersion(folder string) uint64 {
return ver
}
-func (m *Model) availability(folder string, file string) []protocol.DeviceID {
+func (m *Model) availability(folder, file string) []protocol.DeviceID {
// Acquire this lock first, as the value returned from foldersFiles can
// gen heavily modified on Close()
m.pmut.RLock()
@@ -1359,6 +1390,17 @@ func (m *Model) availability(folder string, file string) []protocol.DeviceID {
return availableDevices
}
+// Bump the given files priority in the job queue
+func (m *Model) BringToFront(folder, file string) {
+ m.pmut.RLock()
+ defer m.pmut.RUnlock()
+
+ runner, ok := m.folderRunners[folder]
+ if ok {
+ runner.BringToFront(file)
+ }
+}
+
func (m *Model) String() string {
return fmt.Sprintf("model@%p", m)
}
diff --git a/internal/model/puller.go b/internal/model/puller.go
index 1d9fea68..e1d063dd 100644
--- a/internal/model/puller.go
+++ b/internal/model/puller.go
@@ -78,6 +78,7 @@ type Puller struct {
copiers int
pullers int
finishers int
+ queue *jobQueue
}
// Serve will run scans and pulls. It will return when Stop()ed or on a
@@ -337,15 +338,23 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
p.handleDir(file)
default:
// A new or changed file or symlink. This is the only case where we
- // do stuff in the background; the other three are done
- // synchronously.
- p.handleFile(file, copyChan, finisherChan)
+ // do stuff concurrently in the background
+ p.queue.Push(file.Name)
}
changed++
return true
})
+ for {
+ fileName, ok := p.queue.Pop()
+ if !ok {
+ break
+ }
+ f := p.model.CurrentGlobalFile(p.folder, fileName)
+ p.handleFile(f, copyChan, finisherChan)
+ }
+
// Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish.
close(copyChan)
@@ -483,6 +492,7 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
if debug {
l.Debugln(p, "taking shortcut on", file.Name)
}
+ p.queue.Done(file.Name)
if file.IsSymlink() {
p.shortcutSymlink(curFile, file)
} else {
@@ -850,6 +860,7 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
continue
}
+ p.queue.Done(state.file.Name)
p.performFinish(state)
p.model.receivedFile(p.folder, state.file.Name)
if p.progressEmitter != nil {
@@ -859,6 +870,15 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
}
}
+// Moves the given filename to the front of the job queue
+func (p *Puller) BringToFront(filename string) {
+ p.queue.BringToFront(filename)
+}
+
+func (p *Puller) Jobs() ([]string, []string) {
+ return p.queue.Jobs()
+}
+
func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range cfg.Folders {
folder := &cfg.Folders[i]
diff --git a/internal/model/queue.go b/internal/model/queue.go
new file mode 100644
index 00000000..a4f34dab
--- /dev/null
+++ b/internal/model/queue.go
@@ -0,0 +1,94 @@
+// Copyright (C) 2014 The Syncthing Authors.
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see .
+
+package model
+
+import "sync"
+
+type jobQueue struct {
+ progress []string
+ queued []string
+ mut sync.Mutex
+}
+
+func newJobQueue() *jobQueue {
+ return &jobQueue{}
+}
+
+func (q *jobQueue) Push(file string) {
+ q.mut.Lock()
+ q.queued = append(q.queued, file)
+ q.mut.Unlock()
+}
+
+func (q *jobQueue) Pop() (string, bool) {
+ q.mut.Lock()
+ defer q.mut.Unlock()
+
+ if len(q.queued) == 0 {
+ return "", false
+ }
+
+ var f string
+ f = q.queued[0]
+ q.queued = q.queued[1:]
+ q.progress = append(q.progress, f)
+
+ return f, true
+}
+
+func (q *jobQueue) BringToFront(filename string) {
+ q.mut.Lock()
+ defer q.mut.Unlock()
+
+ for i, cur := range q.queued {
+ if cur == filename {
+ if i > 0 {
+ // Shift the elements before the selected element one step to
+ // the right, overwriting the selected element
+ copy(q.queued[1:i+1], q.queued[0:])
+ // Put the selected element at the front
+ q.queued[0] = cur
+ }
+ return
+ }
+ }
+}
+
+func (q *jobQueue) Done(file string) {
+ q.mut.Lock()
+ defer q.mut.Unlock()
+
+ for i := range q.progress {
+ if q.progress[i] == file {
+ copy(q.progress[i:], q.progress[i+1:])
+ q.progress = q.progress[:len(q.progress)-1]
+ return
+ }
+ }
+}
+
+func (q *jobQueue) Jobs() ([]string, []string) {
+ q.mut.Lock()
+ defer q.mut.Unlock()
+
+ progress := make([]string, len(q.progress))
+ copy(progress, q.progress)
+
+ queued := make([]string, len(q.queued))
+ copy(queued, q.queued)
+
+ return progress, queued
+}
diff --git a/internal/model/queue_test.go b/internal/model/queue_test.go
new file mode 100644
index 00000000..37456644
--- /dev/null
+++ b/internal/model/queue_test.go
@@ -0,0 +1,200 @@
+// Copyright (C) 2014 The Syncthing Authors.
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see .
+
+package model
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+)
+
+func TestJobQueue(t *testing.T) {
+ // Some random actions
+ q := newJobQueue()
+ q.Push("f1")
+ q.Push("f2")
+ q.Push("f3")
+ q.Push("f4")
+
+ progress, queued := q.Jobs()
+ if len(progress) != 0 || len(queued) != 4 {
+ t.Fatal("Wrong length")
+ }
+
+ for i := 1; i < 5; i++ {
+ n, ok := q.Pop()
+ if !ok || n != fmt.Sprintf("f%d", i) {
+ t.Fatal("Wrong element")
+ }
+ progress, queued = q.Jobs()
+ if len(progress) != 1 || len(queued) != 3 {
+ t.Log(progress)
+ t.Log(queued)
+ t.Fatal("Wrong length")
+ }
+
+ q.Done(n)
+ progress, queued = q.Jobs()
+ if len(progress) != 0 || len(queued) != 3 {
+ t.Fatal("Wrong length", len(progress), len(queued))
+ }
+
+ q.Push(n)
+ progress, queued = q.Jobs()
+ if len(progress) != 0 || len(queued) != 4 {
+ t.Fatal("Wrong length")
+ }
+
+ q.Done("f5") // Does not exist
+ progress, queued = q.Jobs()
+ if len(progress) != 0 || len(queued) != 4 {
+ t.Fatal("Wrong length")
+ }
+ }
+
+ if len(q.progress) > 0 || len(q.queued) != 4 {
+ t.Fatal("Wrong length")
+ }
+
+ for i := 4; i > 0; i-- {
+ progress, queued = q.Jobs()
+ if len(progress) != 4-i || len(queued) != i {
+ t.Fatal("Wrong length")
+ }
+
+ s := fmt.Sprintf("f%d", i)
+
+ q.BringToFront(s)
+ progress, queued = q.Jobs()
+ if len(progress) != 4-i || len(queued) != i {
+ t.Fatal("Wrong length")
+ }
+
+ n, ok := q.Pop()
+ if !ok || n != s {
+ t.Fatal("Wrong element")
+ }
+ progress, queued = q.Jobs()
+ if len(progress) != 5-i || len(queued) != i-1 {
+ t.Fatal("Wrong length")
+ }
+
+ q.Done("f5") // Does not exist
+ progress, queued = q.Jobs()
+ if len(progress) != 5-i || len(queued) != i-1 {
+ t.Fatal("Wrong length")
+ }
+ }
+
+ _, ok := q.Pop()
+ if len(q.progress) != 4 || ok {
+ t.Fatal("Wrong length")
+ }
+
+ q.Done("f1")
+ q.Done("f2")
+ q.Done("f3")
+ q.Done("f4")
+ q.Done("f5") // Does not exist
+
+ _, ok = q.Pop()
+ if len(q.progress) != 0 || ok {
+ t.Fatal("Wrong length")
+ }
+
+ progress, queued = q.Jobs()
+ if len(progress) != 0 || len(queued) != 0 {
+ t.Fatal("Wrong length")
+ }
+ q.BringToFront("")
+ q.Done("f5") // Does not exist
+ progress, queued = q.Jobs()
+ if len(progress) != 0 || len(queued) != 0 {
+ t.Fatal("Wrong length")
+ }
+}
+
+func TestBringToFront(t *testing.T) {
+ q := newJobQueue()
+ q.Push("f1")
+ q.Push("f2")
+ q.Push("f3")
+ q.Push("f4")
+
+ _, queued := q.Jobs()
+ if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
+ t.Errorf("Incorrect order %v at start", queued)
+ }
+
+ q.BringToFront("f1") // corner case: does nothing
+
+ _, queued = q.Jobs()
+ if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
+ t.Errorf("Incorrect order %v", queued)
+ }
+
+ q.BringToFront("f3")
+
+ _, queued = q.Jobs()
+ if !reflect.DeepEqual(queued, []string{"f3", "f1", "f2", "f4"}) {
+ t.Errorf("Incorrect order %v", queued)
+ }
+
+ q.BringToFront("f2")
+
+ _, queued = q.Jobs()
+ if !reflect.DeepEqual(queued, []string{"f2", "f3", "f1", "f4"}) {
+ t.Errorf("Incorrect order %v", queued)
+ }
+
+ q.BringToFront("f4") // corner case: last element
+
+ _, queued = q.Jobs()
+ if !reflect.DeepEqual(queued, []string{"f4", "f2", "f3", "f1"}) {
+ t.Errorf("Incorrect order %v", queued)
+ }
+}
+
+func BenchmarkJobQueueBump(b *testing.B) {
+ files := genFiles(b.N)
+
+ q := newJobQueue()
+ for _, f := range files {
+ q.Push(f.Name)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ q.BringToFront(files[i].Name)
+ }
+}
+
+func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
+ files := genFiles(10000)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ q := newJobQueue()
+ for _, f := range files {
+ q.Push(f.Name)
+ }
+ for range files {
+ n, _ := q.Pop()
+ q.Done(n)
+ }
+ }
+
+}
diff --git a/internal/model/scanner.go b/internal/model/scanner.go
index 98c1fc2d..c2824b60 100644
--- a/internal/model/scanner.go
+++ b/internal/model/scanner.go
@@ -75,3 +75,9 @@ func (s *Scanner) Stop() {
func (s *Scanner) String() string {
return fmt.Sprintf("scanner/%s@%p", s.folder, s)
}
+
+func (s *Scanner) BringToFront(string) {}
+
+func (s *Scanner) Jobs() ([]string, []string) {
+ return nil, nil
+}
diff --git a/internal/protocol/message.go b/internal/protocol/message.go
index 8cc191d8..ae04a9da 100644
--- a/internal/protocol/message.go
+++ b/internal/protocol/message.go
@@ -69,6 +69,17 @@ func (f FileInfo) HasPermissionBits() bool {
return f.Flags&FlagNoPermBits == 0
}
+func (f FileInfo) ToTruncated() FileInfoTruncated {
+ return FileInfoTruncated{
+ Name: f.Name,
+ Flags: f.Flags,
+ Modified: f.Modified,
+ Version: f.Version,
+ LocalVersion: f.LocalVersion,
+ NumBlocks: uint32(len(f.Blocks)),
+ }
+}
+
// Used for unmarshalling a FileInfo structure but skipping the actual block list
type FileInfoTruncated struct {
Name string // max:8192