Configurable file pull order (alphabetic, random, by size or age)
This commit is contained in:
@@ -6,23 +6,34 @@
|
||||
|
||||
package model
|
||||
|
||||
import "github.com/syncthing/syncthing/internal/sync"
|
||||
import (
|
||||
"math/rand"
|
||||
"sort"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
type jobQueue struct {
|
||||
progress []string
|
||||
queued []string
|
||||
queued []jobQueueEntry
|
||||
mut sync.Mutex
|
||||
}
|
||||
|
||||
type jobQueueEntry struct {
|
||||
name string
|
||||
size int64
|
||||
modified int64
|
||||
}
|
||||
|
||||
func newJobQueue() *jobQueue {
|
||||
return &jobQueue{
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *jobQueue) Push(file string) {
|
||||
func (q *jobQueue) Push(file string, size, modified int64) {
|
||||
q.mut.Lock()
|
||||
q.queued = append(q.queued, file)
|
||||
q.queued = append(q.queued, jobQueueEntry{file, size, modified})
|
||||
q.mut.Unlock()
|
||||
}
|
||||
|
||||
@@ -34,8 +45,7 @@ func (q *jobQueue) Pop() (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
var f string
|
||||
f = q.queued[0]
|
||||
f := q.queued[0].name
|
||||
q.queued = q.queued[1:]
|
||||
q.progress = append(q.progress, f)
|
||||
|
||||
@@ -47,7 +57,7 @@ func (q *jobQueue) BringToFront(filename string) {
|
||||
defer q.mut.Unlock()
|
||||
|
||||
for i, cur := range q.queued {
|
||||
if cur == filename {
|
||||
if cur.name == filename {
|
||||
if i > 0 {
|
||||
// Shift the elements before the selected element one step to
|
||||
// the right, overwriting the selected element
|
||||
@@ -81,7 +91,62 @@ func (q *jobQueue) Jobs() ([]string, []string) {
|
||||
copy(progress, q.progress)
|
||||
|
||||
queued := make([]string, len(q.queued))
|
||||
copy(queued, q.queued)
|
||||
for i := range q.queued {
|
||||
queued[i] = q.queued[i].name
|
||||
}
|
||||
|
||||
return progress, queued
|
||||
}
|
||||
|
||||
func (q *jobQueue) Shuffle() {
|
||||
q.mut.Lock()
|
||||
defer q.mut.Unlock()
|
||||
|
||||
l := len(q.queued)
|
||||
for i := range q.queued {
|
||||
r := rand.Intn(l)
|
||||
q.queued[i], q.queued[r] = q.queued[r], q.queued[i]
|
||||
}
|
||||
}
|
||||
|
||||
func (q *jobQueue) SortSmallestFirst() {
|
||||
q.mut.Lock()
|
||||
defer q.mut.Unlock()
|
||||
|
||||
sort.Sort(smallestFirst(q.queued))
|
||||
}
|
||||
|
||||
func (q *jobQueue) SortLargestFirst() {
|
||||
q.mut.Lock()
|
||||
defer q.mut.Unlock()
|
||||
|
||||
sort.Sort(sort.Reverse(smallestFirst(q.queued)))
|
||||
}
|
||||
|
||||
func (q *jobQueue) SortOldestFirst() {
|
||||
q.mut.Lock()
|
||||
defer q.mut.Unlock()
|
||||
|
||||
sort.Sort(oldestFirst(q.queued))
|
||||
}
|
||||
|
||||
func (q *jobQueue) SortNewestFirst() {
|
||||
q.mut.Lock()
|
||||
defer q.mut.Unlock()
|
||||
|
||||
sort.Sort(sort.Reverse(oldestFirst(q.queued)))
|
||||
}
|
||||
|
||||
// The usual sort.Interface boilerplate
|
||||
|
||||
type smallestFirst []jobQueueEntry
|
||||
|
||||
func (q smallestFirst) Len() int { return len(q) }
|
||||
func (q smallestFirst) Less(a, b int) bool { return q[a].size < q[b].size }
|
||||
func (q smallestFirst) Swap(a, b int) { q[a], q[b] = q[b], q[a] }
|
||||
|
||||
type oldestFirst []jobQueueEntry
|
||||
|
||||
func (q oldestFirst) Len() int { return len(q) }
|
||||
func (q oldestFirst) Less(a, b int) bool { return q[a].modified < q[b].modified }
|
||||
func (q oldestFirst) Swap(a, b int) { q[a], q[b] = q[b], q[a] }
|
||||
|
||||
@@ -15,10 +15,10 @@ import (
|
||||
func TestJobQueue(t *testing.T) {
|
||||
// Some random actions
|
||||
q := newJobQueue()
|
||||
q.Push("f1")
|
||||
q.Push("f2")
|
||||
q.Push("f3")
|
||||
q.Push("f4")
|
||||
q.Push("f1", 0, 0)
|
||||
q.Push("f2", 0, 0)
|
||||
q.Push("f3", 0, 0)
|
||||
q.Push("f4", 0, 0)
|
||||
|
||||
progress, queued := q.Jobs()
|
||||
if len(progress) != 0 || len(queued) != 4 {
|
||||
@@ -43,7 +43,7 @@ func TestJobQueue(t *testing.T) {
|
||||
t.Fatal("Wrong length", len(progress), len(queued))
|
||||
}
|
||||
|
||||
q.Push(n)
|
||||
q.Push(n, 0, 0)
|
||||
progress, queued = q.Jobs()
|
||||
if len(progress) != 0 || len(queued) != 4 {
|
||||
t.Fatal("Wrong length")
|
||||
@@ -120,10 +120,10 @@ func TestJobQueue(t *testing.T) {
|
||||
|
||||
func TestBringToFront(t *testing.T) {
|
||||
q := newJobQueue()
|
||||
q.Push("f1")
|
||||
q.Push("f2")
|
||||
q.Push("f3")
|
||||
q.Push("f4")
|
||||
q.Push("f1", 0, 0)
|
||||
q.Push("f2", 0, 0)
|
||||
q.Push("f3", 0, 0)
|
||||
q.Push("f4", 0, 0)
|
||||
|
||||
_, queued := q.Jobs()
|
||||
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
|
||||
@@ -159,12 +159,101 @@ func TestBringToFront(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestShuffle(t *testing.T) {
|
||||
q := newJobQueue()
|
||||
q.Push("f1", 0, 0)
|
||||
q.Push("f2", 0, 0)
|
||||
q.Push("f3", 0, 0)
|
||||
q.Push("f4", 0, 0)
|
||||
|
||||
// This test will fail once in eight million times (1 / (4!)^5) :)
|
||||
for i := 0; i < 5; i++ {
|
||||
q.Shuffle()
|
||||
_, queued := q.Jobs()
|
||||
if l := len(queued); l != 4 {
|
||||
t.Fatalf("Weird length %d returned from Jobs()", l)
|
||||
}
|
||||
|
||||
t.Logf("%v", queued)
|
||||
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
|
||||
// The queue was shuffled
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t.Error("Queue was not shuffled after five attempts.")
|
||||
}
|
||||
|
||||
func TestSortBySize(t *testing.T) {
|
||||
q := newJobQueue()
|
||||
q.Push("f1", 20, 0)
|
||||
q.Push("f2", 40, 0)
|
||||
q.Push("f3", 30, 0)
|
||||
q.Push("f4", 10, 0)
|
||||
|
||||
q.SortSmallestFirst()
|
||||
|
||||
_, actual := q.Jobs()
|
||||
if l := len(actual); l != 4 {
|
||||
t.Fatalf("Weird length %d returned from Jobs()", l)
|
||||
}
|
||||
expected := []string{"f4", "f1", "f3", "f2"}
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("SortSmallestFirst(): %#v != %#v", actual, expected)
|
||||
}
|
||||
|
||||
q.SortLargestFirst()
|
||||
|
||||
_, actual = q.Jobs()
|
||||
if l := len(actual); l != 4 {
|
||||
t.Fatalf("Weird length %d returned from Jobs()", l)
|
||||
}
|
||||
expected = []string{"f2", "f3", "f1", "f4"}
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("SortLargestFirst(): %#v != %#v", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSortByAge(t *testing.T) {
|
||||
q := newJobQueue()
|
||||
q.Push("f1", 0, 20)
|
||||
q.Push("f2", 0, 40)
|
||||
q.Push("f3", 0, 30)
|
||||
q.Push("f4", 0, 10)
|
||||
|
||||
q.SortOldestFirst()
|
||||
|
||||
_, actual := q.Jobs()
|
||||
if l := len(actual); l != 4 {
|
||||
t.Fatalf("Weird length %d returned from Jobs()", l)
|
||||
}
|
||||
expected := []string{"f4", "f1", "f3", "f2"}
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("SortOldestFirst(): %#v != %#v", actual, expected)
|
||||
}
|
||||
|
||||
q.SortNewestFirst()
|
||||
|
||||
_, actual = q.Jobs()
|
||||
if l := len(actual); l != 4 {
|
||||
t.Fatalf("Weird length %d returned from Jobs()", l)
|
||||
}
|
||||
expected = []string{"f2", "f3", "f1", "f4"}
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("SortNewestFirst(): %#v != %#v", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkJobQueueBump(b *testing.B) {
|
||||
files := genFiles(b.N)
|
||||
|
||||
q := newJobQueue()
|
||||
for _, f := range files {
|
||||
q.Push(f.Name)
|
||||
q.Push(f.Name, 0, 0)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
@@ -180,7 +269,7 @@ func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
q := newJobQueue()
|
||||
for _, f := range files {
|
||||
q.Push(f.Name)
|
||||
q.Push(f.Name, 0, 0)
|
||||
}
|
||||
for _ = range files {
|
||||
n, _ := q.Pop()
|
||||
|
||||
@@ -69,6 +69,7 @@ type rwFolder struct {
|
||||
copiers int
|
||||
pullers int
|
||||
shortID uint64
|
||||
order config.PullOrder
|
||||
|
||||
stop chan struct{}
|
||||
queue *jobQueue
|
||||
@@ -93,6 +94,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
|
||||
copiers: cfg.Copiers,
|
||||
pullers: cfg.Pullers,
|
||||
shortID: shortID,
|
||||
order: cfg.Order,
|
||||
|
||||
stop: make(chan struct{}),
|
||||
queue: newJobQueue(),
|
||||
@@ -346,13 +348,9 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
||||
buckets := map[string][]protocol.FileInfo{}
|
||||
|
||||
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
||||
|
||||
// Needed items are delivered sorted lexicographically. This isn't
|
||||
// really optimal from a performance point of view - it would be
|
||||
// better if files were handled in random order, to spread the load
|
||||
// over the cluster. But it means that we can be sure that we fully
|
||||
// handle directories before the files that go inside them, which is
|
||||
// nice.
|
||||
// Needed items are delivered sorted lexicographically. We'll handle
|
||||
// directories as they come along, so parents before children. Files
|
||||
// are queued and the order may be changed later.
|
||||
|
||||
file := intf.(protocol.FileInfo)
|
||||
|
||||
@@ -392,13 +390,32 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
||||
default:
|
||||
// A new or changed file or symlink. This is the only case where we
|
||||
// do stuff concurrently in the background
|
||||
p.queue.Push(file.Name)
|
||||
p.queue.Push(file.Name, file.Size(), file.Modified)
|
||||
}
|
||||
|
||||
changed++
|
||||
return true
|
||||
})
|
||||
|
||||
// Reorder the file queue according to configuration
|
||||
|
||||
switch p.order {
|
||||
case config.OrderRandom:
|
||||
p.queue.Shuffle()
|
||||
case config.OrderAlphabetic:
|
||||
// The queue is already in alphabetic order.
|
||||
case config.OrderSmallestFirst:
|
||||
p.queue.SortSmallestFirst()
|
||||
case config.OrderLargestFirst:
|
||||
p.queue.SortLargestFirst()
|
||||
case config.OrderOldestFirst:
|
||||
p.queue.SortOldestFirst()
|
||||
case config.OrderNewestFirst:
|
||||
p.queue.SortOldestFirst()
|
||||
}
|
||||
|
||||
// Process the file queue
|
||||
|
||||
nextFile:
|
||||
for {
|
||||
fileName, ok := p.queue.Pop()
|
||||
|
||||
@@ -393,7 +393,7 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
||||
}
|
||||
|
||||
// queue.Done should be called by the finisher routine
|
||||
p.queue.Push("filex")
|
||||
p.queue.Push("filex", 0, 0)
|
||||
p.queue.Pop()
|
||||
|
||||
if len(p.queue.progress) != 1 {
|
||||
@@ -480,7 +480,7 @@ func TestDeregisterOnFailInPull(t *testing.T) {
|
||||
}
|
||||
|
||||
// queue.Done should be called by the finisher routine
|
||||
p.queue.Push("filex")
|
||||
p.queue.Push("filex", 0, 0)
|
||||
p.queue.Pop()
|
||||
|
||||
if len(p.queue.progress) != 1 {
|
||||
|
||||
Reference in New Issue
Block a user