Only buffer file names, not full &FileInfo

This commit is contained in:
Jakob Borg 2014-12-30 09:31:34 +01:00
parent 34deb82aea
commit 2496185629
5 changed files with 68 additions and 82 deletions

View File

@ -79,7 +79,7 @@ const (
type service interface { type service interface {
Serve() Serve()
Stop() Stop()
Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) // In progress, Queued Jobs() ([]string, []string) // In progress, Queued
Bump(string) Bump(string)
} }
@ -427,20 +427,25 @@ func (m *Model) NeedFolderFiles(folder string, max int) ([]protocol.FileInfoTrun
m.fmut.RLock() m.fmut.RLock()
defer m.fmut.RUnlock() defer m.fmut.RUnlock()
if rf, ok := m.folderFiles[folder]; ok { if rf, ok := m.folderFiles[folder]; ok {
progress := []protocol.FileInfoTruncated{} var progress, queued, rest []protocol.FileInfoTruncated
queued := []protocol.FileInfoTruncated{} var seen map[string]bool
rest := []protocol.FileInfoTruncated{}
seen := map[string]struct{}{}
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners[folder]
if ok { if ok {
progress, queued = runner.Jobs() progressNames, queuedNames := runner.Jobs()
seen = make(map[string]struct{}, len(progress)+len(queued))
for _, file := range progress { progress = make([]protocol.FileInfoTruncated, len(progressNames))
seen[file.Name] = struct{}{} queued = make([]protocol.FileInfoTruncated, len(queuedNames))
seen = make(map[string]bool, len(progressNames)+len(queuedNames))
for i, name := range progressNames {
progress[i] = rf.GetGlobal(name).ToTruncated() /// XXX: Should implement GetGlobalTruncated directly
seen[name] = true
} }
for _, file := range queued {
seen[file.Name] = struct{}{} for i, name := range queuedNames {
queued[i] = rf.GetGlobal(name).ToTruncated() /// XXX: Should implement GetGlobalTruncated directly
seen[name] = true
} }
} }
left := max - len(progress) - len(queued) left := max - len(progress) - len(queued)
@ -448,8 +453,7 @@ func (m *Model) NeedFolderFiles(folder string, max int) ([]protocol.FileInfoTrun
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool { rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool {
left-- left--
ft := f.(protocol.FileInfoTruncated) ft := f.(protocol.FileInfoTruncated)
_, ok := seen[ft.Name] if !seen[ft.Name] {
if !ok {
rest = append(rest, ft) rest = append(rest, ft)
} }
return max < 1 || left > 0 return max < 1 || left > 0

View File

@ -340,7 +340,7 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
default: default:
// A new or changed file or symlink. This is the only case where we // A new or changed file or symlink. This is the only case where we
// do stuff concurrently in the background // do stuff concurrently in the background
p.queue.Push(&file) p.queue.Push(file.Name)
} }
changed++ changed++
@ -348,11 +348,12 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
}) })
for { for {
f := p.queue.Pop() fileName, ok := p.queue.Pop()
if f == nil { if !ok {
break break
} }
p.handleFile(*f, copyChan, finisherChan) f := p.model.CurrentGlobalFile(p.folder, fileName)
p.handleFile(f, copyChan, finisherChan)
} }
// Signal copy and puller routines that we are done with the in data for // Signal copy and puller routines that we are done with the in data for
@ -492,7 +493,7 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
if debug { if debug {
l.Debugln(p, "taking shortcut on", file.Name) l.Debugln(p, "taking shortcut on", file.Name)
} }
p.queue.Done(&file) p.queue.Done(file.Name)
if file.IsSymlink() { if file.IsSymlink() {
p.shortcutSymlink(curFile, file) p.shortcutSymlink(curFile, file)
} else { } else {
@ -860,7 +861,7 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
continue continue
} }
p.queue.Done(&state.file) p.queue.Done(state.file.Name)
p.performFinish(state) p.performFinish(state)
p.model.receivedFile(p.folder, state.file.Name) p.model.receivedFile(p.folder, state.file.Name)
if p.progressEmitter != nil { if p.progressEmitter != nil {
@ -875,7 +876,7 @@ func (p *Puller) Bump(filename string) {
p.queue.Bump(filename) p.queue.Bump(filename)
} }
func (p *Puller) Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) { func (p *Puller) Jobs() ([]string, []string) {
return p.queue.Jobs() return p.queue.Jobs()
} }

View File

@ -15,15 +15,11 @@
package model package model
import ( import "sync"
"sync"
"github.com/syncthing/syncthing/internal/protocol"
)
type JobQueue struct { type JobQueue struct {
progress []*protocol.FileInfo progress []string
queued []*protocol.FileInfo queued []string
mut sync.Mutex mut sync.Mutex
} }
@ -31,26 +27,26 @@ func NewJobQueue() *JobQueue {
return &JobQueue{} return &JobQueue{}
} }
func (q *JobQueue) Push(file *protocol.FileInfo) { func (q *JobQueue) Push(file string) {
q.mut.Lock() q.mut.Lock()
q.queued = append(q.queued, file) q.queued = append(q.queued, file)
q.mut.Unlock() q.mut.Unlock()
} }
func (q *JobQueue) Pop() *protocol.FileInfo { func (q *JobQueue) Pop() (string, bool) {
q.mut.Lock() q.mut.Lock()
defer q.mut.Unlock() defer q.mut.Unlock()
if len(q.queued) == 0 { if len(q.queued) == 0 {
return nil return "", false
} }
var f *protocol.FileInfo var f string
f, q.queued[0] = q.queued[0], nil f = q.queued[0]
q.queued = q.queued[1:] q.queued = q.queued[1:]
q.progress = append(q.progress, f) q.progress = append(q.progress, f)
return f return f, true
} }
func (q *JobQueue) Bump(filename string) { func (q *JobQueue) Bump(filename string) {
@ -58,40 +54,35 @@ func (q *JobQueue) Bump(filename string) {
defer q.mut.Unlock() defer q.mut.Unlock()
for i := range q.queued { for i := range q.queued {
if q.queued[i].Name == filename { if q.queued[i] == filename {
q.queued[0], q.queued[i] = q.queued[i], q.queued[0] q.queued[0], q.queued[i] = q.queued[i], q.queued[0]
return return
} }
} }
} }
func (q *JobQueue) Done(file *protocol.FileInfo) { func (q *JobQueue) Done(file string) {
q.mut.Lock() q.mut.Lock()
defer q.mut.Unlock() defer q.mut.Unlock()
for i := range q.progress { for i := range q.progress {
if q.progress[i].Name == file.Name { if q.progress[i] == file {
copy(q.progress[i:], q.progress[i+1:]) copy(q.progress[i:], q.progress[i+1:])
q.progress[len(q.progress)-1] = nil
q.progress = q.progress[:len(q.progress)-1] q.progress = q.progress[:len(q.progress)-1]
return return
} }
} }
} }
func (q *JobQueue) Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) { func (q *JobQueue) Jobs() ([]string, []string) {
q.mut.Lock() q.mut.Lock()
defer q.mut.Unlock() defer q.mut.Unlock()
progress := make([]protocol.FileInfoTruncated, len(q.progress)) progress := make([]string, len(q.progress))
for i := range q.progress { copy(progress, q.progress)
progress[i] = q.progress[i].ToTruncated()
}
queued := make([]protocol.FileInfoTruncated, len(q.queued)) queued := make([]string, len(q.queued))
for i := range q.queued { copy(queued, q.queued)
queued[i] = q.queued[i].ToTruncated()
}
return progress, queued return progress, queued
} }

View File

@ -18,25 +18,15 @@ package model
import ( import (
"fmt" "fmt"
"testing" "testing"
"github.com/syncthing/syncthing/internal/protocol"
)
var (
f1 = &protocol.FileInfo{Name: "f1"}
f2 = &protocol.FileInfo{Name: "f2"}
f3 = &protocol.FileInfo{Name: "f3"}
f4 = &protocol.FileInfo{Name: "f4"}
f5 = &protocol.FileInfo{Name: "f5"}
) )
func TestJobQueue(t *testing.T) { func TestJobQueue(t *testing.T) {
// Some random actions // Some random actions
q := NewJobQueue() q := NewJobQueue()
q.Push(f1) q.Push("f1")
q.Push(f2) q.Push("f2")
q.Push(f3) q.Push("f3")
q.Push(f4) q.Push("f4")
progress, queued := q.Jobs() progress, queued := q.Jobs()
if len(progress) != 0 || len(queued) != 4 { if len(progress) != 0 || len(queued) != 4 {
@ -44,8 +34,8 @@ func TestJobQueue(t *testing.T) {
} }
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
n := q.Pop() n, ok := q.Pop()
if n == nil || n.Name != fmt.Sprintf("f%d", i) { if !ok || n != fmt.Sprintf("f%d", i) {
t.Fatal("Wrong element") t.Fatal("Wrong element")
} }
progress, queued = q.Jobs() progress, queued = q.Jobs()
@ -65,7 +55,7 @@ func TestJobQueue(t *testing.T) {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.Done(f5) // Does not exist q.Done("f5") // Does not exist
progress, queued = q.Jobs() progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 4 { if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
@ -90,8 +80,8 @@ func TestJobQueue(t *testing.T) {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
n := q.Pop() n, ok := q.Pop()
if n == nil || n.Name != s { if !ok || n != s {
t.Fatal("Wrong element") t.Fatal("Wrong element")
} }
progress, queued = q.Jobs() progress, queued = q.Jobs()
@ -99,24 +89,26 @@ func TestJobQueue(t *testing.T) {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.Done(f5) // Does not exist q.Done("f5") // Does not exist
progress, queued = q.Jobs() progress, queued = q.Jobs()
if len(progress) != 5-i || len(queued) != i-1 { if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
} }
if len(q.progress) != 4 || q.Pop() != nil { _, ok := q.Pop()
if len(q.progress) != 4 || ok {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.Done(f1) q.Done("f1")
q.Done(f2) q.Done("f2")
q.Done(f3) q.Done("f3")
q.Done(f4) q.Done("f4")
q.Done(f5) // Does not exist q.Done("f5") // Does not exist
if len(q.progress) != 0 || q.Pop() != nil { _, ok = q.Pop()
if len(q.progress) != 0 || ok {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
@ -125,7 +117,7 @@ func TestJobQueue(t *testing.T) {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.Bump("") q.Bump("")
q.Done(f5) // Does not exist q.Done("f5") // Does not exist
progress, queued = q.Jobs() progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 0 { if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
@ -178,8 +170,8 @@ func BenchmarkJobQueueBump(b *testing.B) {
files := genFiles(b.N) files := genFiles(b.N)
q := NewJobQueue() q := NewJobQueue()
for j := range files { for _, f := range files {
q.Push(&files[j]) q.Push(f.Name)
} }
b.ResetTimer() b.ResetTimer()
@ -194,11 +186,11 @@ func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
q := NewJobQueue() q := NewJobQueue()
for j := range files { for _, f := range files {
q.Push(&files[j]) q.Push(f.Name)
} }
for range files { for range files {
n := q.Pop() n, _ := q.Pop()
q.Done(n) q.Done(n)
} }
} }

View File

@ -18,8 +18,6 @@ package model
import ( import (
"fmt" "fmt"
"time" "time"
"github.com/syncthing/syncthing/internal/protocol"
) )
type Scanner struct { type Scanner struct {
@ -80,6 +78,6 @@ func (s *Scanner) String() string {
func (s *Scanner) Bump(string) {} func (s *Scanner) Bump(string) {}
func (s *Scanner) Jobs() ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated) { func (s *Scanner) Jobs() ([]string, []string) {
return nil, nil return nil, nil
} }