Add job queue (fixes #629)

Request to terminate currently ongoing downloads and jump to the bumped file
incoming in 3, 2, 1.

Also, has a slightly strange effect where we pop a job off the queue, but
the copyChannel is still busy and blocks, though it gets moved to the
progress slice in the jobqueue, and looks like it's in progress which it isn't
as it's waiting to be picked up from the copyChan.

As a result, the progress emitter doesn't register on the task, and hence the file
doesn't have a progress bar, but cannot be replaced by a bump.

I guess I can fix progress bar issue by moving the progressEmiter.Register just
before passing the file to the copyChan, but then we are back to the initial
problem of a file with a progress bar, but no progress happening as it's stuck
 on write to copyChan

I checked if there is a way to check for channel writeability (before popping)
but got struck by lightning just for bringing the idea up in #go-nuts.

My ideal scenario would be to check if copyChan is writeable, pop job from the
queue and shove it down handleFile. This way jobs would stay in the queue while
they cannot be handled, meaning that the `Bump` could bring your file up higher.
This commit is contained in:
Audrius Butkevicius
2014-12-01 19:23:06 +00:00
committed by Jakob Borg
parent e94bd90782
commit fd0a147ae6
9 changed files with 282 additions and 40 deletions

View File

@@ -149,6 +149,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
postRestMux.HandleFunc("/rest/shutdown", restPostShutdown) postRestMux.HandleFunc("/rest/shutdown", restPostShutdown)
postRestMux.HandleFunc("/rest/upgrade", restPostUpgrade) postRestMux.HandleFunc("/rest/upgrade", restPostUpgrade)
postRestMux.HandleFunc("/rest/scan", withModel(m, restPostScan)) postRestMux.HandleFunc("/rest/scan", withModel(m, restPostScan))
postRestMux.HandleFunc("/rest/bump", withModel(m, restPostBump))
// A handler that splits requests between the two above and disables // A handler that splits requests between the two above and disables
// caching // caching
@@ -314,19 +315,12 @@ func restGetNeed(m *model.Model, w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query() var qs = r.URL.Query()
var folder = qs.Get("folder") var folder = qs.Get("folder")
files := m.NeedFolderFilesLimited(folder, 100) // max 100 files progress, queued, rest := m.NeedFolderFiles(folder, 100)
// Convert the struct to a more loose structure, and inject the size. // Convert the struct to a more loose structure, and inject the size.
output := make([]map[string]interface{}, 0, len(files)) output := map[string][]map[string]interface{}{
for _, file := range files { "progress": toNeedSlice(progress),
output = append(output, map[string]interface{}{ "queued": toNeedSlice(queued),
"Name": file.Name, "rest": toNeedSlice(rest),
"Flags": file.Flags,
"Modified": file.Modified,
"Version": file.Version,
"LocalVersion": file.LocalVersion,
"NumBlocks": file.NumBlocks,
"Size": protocol.BlocksToSize(file.NumBlocks),
})
} }
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
@@ -650,6 +644,14 @@ func restPostScan(m *model.Model, w http.ResponseWriter, r *http.Request) {
} }
} }
func restPostBump(m *model.Model, w http.ResponseWriter, r *http.Request) {
qs := r.URL.Query()
folder := qs.Get("folder")
file := qs.Get("file")
m.Bump(folder, file)
restGetNeed(m, w, r)
}
func getQR(w http.ResponseWriter, r *http.Request) { func getQR(w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query() var qs = r.URL.Query()
var text = qs.Get("text") var text = qs.Get("text")
@@ -775,3 +777,19 @@ func mimeTypeForFile(file string) string {
return mime.TypeByExtension(ext) return mime.TypeByExtension(ext)
} }
} }
func toNeedSlice(files []protocol.FileInfoTruncated) []map[string]interface{} {
output := make([]map[string]interface{}, len(files))
for i, file := range files {
output[i] = map[string]interface{}{
"Name": file.Name,
"Flags": file.Flags,
"Modified": file.Modified,
"Version": file.Version,
"LocalVersion": file.LocalVersion,
"NumBlocks": file.NumBlocks,
"Size": protocol.BlocksToSize(file.NumBlocks),
}
}
return output
}

View File

@@ -801,21 +801,37 @@
<hr/> <hr/>
<table class="table table-striped table-condensed"> <table class="table table-striped table-condensed">
<tr ng-repeat="f in needed" ng-init="a = needAction(f)"> <tr ng-repeat="f in needed.progress" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td> <td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td> <td title="{{f.Name}}">{{f.Name | basename}}</td>
<td> <td ng-if="a == 'sync' && progress[neededFolder] && progress[neededFolder][f.Name]">
<span ng-if="a == 'sync' && progress[neededFolder] && progress[neededFolder][f.Name]">
<div class="progress"> <div class="progress">
<div class="progress-bar progress-bar-success" style="width: {{progress[neededFolder][f.Name].Reused}}%"></div> <div class="progress-bar progress-bar-success" style="width: {{progress[neededFolder][f.Name].Reused}}%"></div>
<div class="progress-bar" style="width: {{progress[neededFolder][f.Name].CopiedFromOrigin}}%"></div> <div class="progress-bar" style="width: {{progress[neededFolder][f.Name].CopiedFromOrigin}}%"></div>
<div class="progress-bar progress-bar-info" style="width: {{progress[neededFolder][f.Name].CopiedFromElsewhere}}%"></div> <div class="progress-bar progress-bar-info" style="width: {{progress[neededFolder][f.Name].CopiedFromElsewhere}}%"></div>
<div class="progress-bar progress-bar-warning" style="width: {{progress[neededFolder][f.Name].Pulled}}%"></div> <div class="progress-bar progress-bar-warning" style="width: {{progress[neededFolder][f.Name].Pulled}}%"></div>
<div class="progress-bar progress-bar-danger progress-bar-striped active" style="width: {{progress[neededFolder][f.Name].Pulling}}%"></div> <div class="progress-bar progress-bar-danger progress-bar-striped active" style="width: {{progress[neededFolder][f.Name].Pulling}}%"></div>
<span class="show frontal">{{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B</span> <span class="show frontal">
</div> {{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B
</span> </span>
</div>
</td> </td>
<td class="text-right small-data" ng-if="a != 'sync' || !progress[neededFolder] || !progress[neededFolder][f.Name]">
<span ng-if="f.Size > 0">{{f.Size | binary}}B</span>
</td>
</tr>
<tr ng-repeat="f in needed.queued" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td>
<td class="text-right small-data">
<span ng-if="$index != 0" class="glyphicon glyphicon-chevron-up" ng-click="bumpFile(neededFolder, f.Name)"></span>
<span ng-if="f.Size > 0">{{f.Size | binary}}B</span>
</td>
</tr>
<tr ng-repeat="f in needed.rest" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td>
<td class="text-right small-data"><span ng-if="f.Size > 0">{{f.Size | binary}}B</span></td>
</tr> </tr>
</table> </table>

View File

@@ -1056,6 +1056,15 @@ angular.module('syncthing.core')
$http.post(urlbase + "/scan?folder=" + encodeURIComponent(folder)); $http.post(urlbase + "/scan?folder=" + encodeURIComponent(folder));
}; };
$scope.bumpFile = function (folder, file) {
$http.post(urlbase + "/bump?folder=" + encodeURIComponent(folder) + "&file=" + encodeURIComponent(file)).success(function (data) {
if ($scope.neededFolder == folder) {
console.log("bumpFile", folder, data);
$scope.needed = data;
}
});
};
// pseudo main. called on all definitions assigned // pseudo main. called on all definitions assigned
initController(); initController();
}); });

File diff suppressed because one or more lines are too long

View File

@@ -79,6 +79,8 @@ const (
type service interface { type service interface {
Serve() Serve()
Stop() Stop()
Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) // In progress, Queued
Bump(string)
} }
type Model struct { type Model struct {
@@ -416,22 +418,46 @@ func (m *Model) NeedSize(folder string) (files int, bytes int64) {
return return
} }
// NeedFiles returns the list of currently needed files, stopping at maxFiles // NeedFiles returns the list of currently needed files in progress, queued,
// files. Limit <= 0 is ignored. // and to be queued on next puller iteration. Also takes a soft cap which is
func (m *Model) NeedFolderFilesLimited(folder string, maxFiles int) []protocol.FileInfoTruncated { // only respected when adding files from the model rather than the runner queue.
func (m *Model) NeedFolderFiles(folder string, max int) ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated, []protocol.FileInfoTruncated) {
defer m.leveldbPanicWorkaround() defer m.leveldbPanicWorkaround()
m.fmut.RLock() m.fmut.RLock()
defer m.fmut.RUnlock() defer m.fmut.RUnlock()
if rf, ok := m.folderFiles[folder]; ok { if rf, ok := m.folderFiles[folder]; ok {
fs := make([]protocol.FileInfoTruncated, 0, maxFiles) progress := []protocol.FileInfoTruncated{}
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool { queued := []protocol.FileInfoTruncated{}
fs = append(fs, f.(protocol.FileInfoTruncated)) rest := []protocol.FileInfoTruncated{}
return maxFiles <= 0 || len(fs) < maxFiles seen := map[string]struct{}{}
})
return fs runner, ok := m.folderRunners[folder]
if ok {
progress, queued = runner.Jobs()
seen = make(map[string]struct{}, len(progress)+len(queued))
for _, file := range progress {
seen[file.Name] = struct{}{}
} }
return nil for _, file := range queued {
seen[file.Name] = struct{}{}
}
}
left := max - len(progress) - len(queued)
if max < 1 || left > 0 {
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool {
left--
ft := f.(protocol.FileInfoTruncated)
_, ok := seen[ft.Name]
if !ok {
rest = append(rest, ft)
}
return max < 1 || left > 0
})
}
return progress, queued, rest
}
return nil, nil, nil
} }
// Index is called when a new device is connected and we receive their full index. // Index is called when a new device is connected and we receive their full index.
@@ -1336,7 +1362,7 @@ func (m *Model) RemoteLocalVersion(folder string) uint64 {
return ver return ver
} }
func (m *Model) availability(folder string, file string) []protocol.DeviceID { func (m *Model) availability(folder, file string) []protocol.DeviceID {
// Acquire this lock first, as the value returned from foldersFiles can // Acquire this lock first, as the value returned from foldersFiles can
// gen heavily modified on Close() // gen heavily modified on Close()
m.pmut.RLock() m.pmut.RLock()
@@ -1359,6 +1385,17 @@ func (m *Model) availability(folder string, file string) []protocol.DeviceID {
return availableDevices return availableDevices
} }
// Bump the given files priority in the job queue
func (m *Model) Bump(folder, file string) {
m.pmut.RLock()
defer m.pmut.RUnlock()
runner, ok := m.folderRunners[folder]
if ok {
runner.Bump(file)
}
}
func (m *Model) String() string { func (m *Model) String() string {
return fmt.Sprintf("model@%p", m) return fmt.Sprintf("model@%p", m)
} }

View File

@@ -78,6 +78,7 @@ type Puller struct {
copiers int copiers int
pullers int pullers int
finishers int finishers int
queue *JobQueue
} }
// Serve will run scans and pulls. It will return when Stop()ed or on a // Serve will run scans and pulls. It will return when Stop()ed or on a
@@ -89,6 +90,7 @@ func (p *Puller) Serve() {
} }
p.stop = make(chan struct{}) p.stop = make(chan struct{})
p.queue = NewJobQueue()
pullTimer := time.NewTimer(checkPullIntv) pullTimer := time.NewTimer(checkPullIntv)
scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately. scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately.
@@ -337,15 +339,22 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
p.handleDir(file) p.handleDir(file)
default: default:
// A new or changed file or symlink. This is the only case where we // A new or changed file or symlink. This is the only case where we
// do stuff in the background; the other three are done // do stuff concurrently in the background
// synchronously. p.queue.Push(&file)
p.handleFile(file, copyChan, finisherChan)
} }
changed++ changed++
return true return true
}) })
for {
f := p.queue.Pop()
if f == nil {
break
}
p.handleFile(*f, copyChan, finisherChan)
}
// Signal copy and puller routines that we are done with the in data for // Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish. // this iteration. Wait for them to finish.
close(copyChan) close(copyChan)
@@ -483,6 +492,7 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
if debug { if debug {
l.Debugln(p, "taking shortcut on", file.Name) l.Debugln(p, "taking shortcut on", file.Name)
} }
p.queue.Done(&file)
if file.IsSymlink() { if file.IsSymlink() {
p.shortcutSymlink(curFile, file) p.shortcutSymlink(curFile, file)
} else { } else {
@@ -850,6 +860,7 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
continue continue
} }
p.queue.Done(&state.file)
p.performFinish(state) p.performFinish(state)
p.model.receivedFile(p.folder, state.file.Name) p.model.receivedFile(p.folder, state.file.Name)
if p.progressEmitter != nil { if p.progressEmitter != nil {
@@ -859,6 +870,32 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
} }
} }
// Moves the given filename to the front of the job queue
func (p *Puller) Bump(filename string) {
p.queue.Bump(filename)
}
func (p *Puller) Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) {
return p.queue.Jobs()
}
// clean deletes orphaned temporary files
func (p *Puller) clean() {
keep := time.Duration(p.model.cfg.Options().KeepTemporariesH) * time.Hour
now := time.Now()
filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.Mode().IsRegular() && defTempNamer.IsTemporary(path) && info.ModTime().Add(keep).Before(now) {
os.Remove(path)
}
return nil
})
}
func invalidateFolder(cfg *config.Configuration, folderID string, err error) { func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range cfg.Folders { for i := range cfg.Folders {
folder := &cfg.Folders[i] folder := &cfg.Folders[i]

106
internal/model/queue.go Normal file
View File

@@ -0,0 +1,106 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package model
import (
"container/list"
"sync"
"github.com/syncthing/syncthing/internal/protocol"
)
type JobQueue struct {
progress []*protocol.FileInfo
queued *list.List
lookup map[string]*list.Element // O(1) lookups
mut sync.Mutex
}
func NewJobQueue() *JobQueue {
return &JobQueue{
progress: []*protocol.FileInfo{},
queued: list.New(),
lookup: make(map[string]*list.Element),
}
}
func (q *JobQueue) Push(file *protocol.FileInfo) {
q.mut.Lock()
defer q.mut.Unlock()
q.lookup[file.Name] = q.queued.PushBack(file)
}
func (q *JobQueue) Pop() *protocol.FileInfo {
q.mut.Lock()
defer q.mut.Unlock()
if q.queued.Len() == 0 {
return nil
}
f := q.queued.Remove(q.queued.Front()).(*protocol.FileInfo)
delete(q.lookup, f.Name)
q.progress = append(q.progress, f)
return f
}
func (q *JobQueue) Bump(filename string) {
q.mut.Lock()
defer q.mut.Unlock()
ele, ok := q.lookup[filename]
if ok {
q.queued.MoveToFront(ele)
}
}
func (q *JobQueue) Done(file *protocol.FileInfo) {
q.mut.Lock()
defer q.mut.Unlock()
for i := range q.progress {
if q.progress[i].Name == file.Name {
copy(q.progress[i:], q.progress[i+1:])
q.progress[len(q.progress)-1] = nil
q.progress = q.progress[:len(q.progress)-1]
return
}
}
}
func (q *JobQueue) Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) {
q.mut.Lock()
defer q.mut.Unlock()
progress := make([]protocol.FileInfoTruncated, len(q.progress))
for i := range q.progress {
progress[i] = q.progress[i].ToTruncated()
}
queued := make([]protocol.FileInfoTruncated, q.queued.Len())
i := 0
for e := q.queued.Front(); e != nil; e = e.Next() {
fi := e.Value.(*protocol.FileInfo)
queued[i] = fi.ToTruncated()
i++
}
return progress, queued
}

View File

@@ -18,6 +18,8 @@ package model
import ( import (
"fmt" "fmt"
"time" "time"
"github.com/syncthing/syncthing/internal/protocol"
) )
type Scanner struct { type Scanner struct {
@@ -75,3 +77,9 @@ func (s *Scanner) Stop() {
func (s *Scanner) String() string { func (s *Scanner) String() string {
return fmt.Sprintf("scanner/%s@%p", s.folder, s) return fmt.Sprintf("scanner/%s@%p", s.folder, s)
} }
func (s *Scanner) Bump(string) {}
func (s *Scanner) Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) {
return nil, nil
}

View File

@@ -69,6 +69,17 @@ func (f FileInfo) HasPermissionBits() bool {
return f.Flags&FlagNoPermBits == 0 return f.Flags&FlagNoPermBits == 0
} }
func (f FileInfo) ToTruncated() FileInfoTruncated {
return FileInfoTruncated{
Name: f.Name,
Flags: f.Flags,
Modified: f.Modified,
Version: f.Version,
LocalVersion: f.LocalVersion,
NumBlocks: uint32(len(f.Blocks)),
}
}
// Used for unmarshalling a FileInfo structure but skipping the actual block list // Used for unmarshalling a FileInfo structure but skipping the actual block list
type FileInfoTruncated struct { type FileInfoTruncated struct {
Name string // max:8192 Name string // max:8192