mv internal lib

This commit is contained in:
Jakob Borg
2015-08-06 11:29:25 +02:00
parent 0a803891a4
commit 7705a6c1f1
197 changed files with 158 additions and 158 deletions

2
lib/model/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.stfolder
.stignore

19
lib/model/debug.go Normal file
View File

@@ -0,0 +1,19 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"os"
"strings"
"github.com/calmh/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "model") || os.Getenv("STTRACE") == "all"
l = logger.DefaultLogger
)

View File

@@ -0,0 +1,53 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
// deviceActivity tracks the number of outstanding requests per device and can
// answer which device is least busy. It is safe for use from multiple
// goroutines.
type deviceActivity struct {
act map[protocol.DeviceID]int
mut sync.Mutex
}
func newDeviceActivity() *deviceActivity {
return &deviceActivity{
act: make(map[protocol.DeviceID]int),
mut: sync.NewMutex(),
}
}
func (m *deviceActivity) leastBusy(availability []protocol.DeviceID) protocol.DeviceID {
m.mut.Lock()
low := 2<<30 - 1
var selected protocol.DeviceID
for _, device := range availability {
if usage := m.act[device]; usage < low {
low = usage
selected = device
}
}
m.mut.Unlock()
return selected
}
func (m *deviceActivity) using(device protocol.DeviceID) {
m.mut.Lock()
m.act[device]++
m.mut.Unlock()
}
func (m *deviceActivity) done(device protocol.DeviceID) {
m.mut.Lock()
m.act[device]--
m.mut.Unlock()
}

View File

@@ -0,0 +1,58 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"testing"
"github.com/syncthing/protocol"
)
func TestDeviceActivity(t *testing.T) {
n0 := protocol.DeviceID([32]byte{1, 2, 3, 4})
n1 := protocol.DeviceID([32]byte{5, 6, 7, 8})
n2 := protocol.DeviceID([32]byte{9, 10, 11, 12})
devices := []protocol.DeviceID{n0, n1, n2}
na := newDeviceActivity()
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should still be n0 (%v) not %v", n0, lb)
}
na.using(na.leastBusy(devices))
if lb := na.leastBusy(devices); lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
}
na.using(na.leastBusy(devices))
if lb := na.leastBusy(devices); lb != n2 {
t.Errorf("Least busy device should be n2 (%v) not %v", n2, lb)
}
na.using(na.leastBusy(devices))
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
na.done(n1)
if lb := na.leastBusy(devices); lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
}
na.done(n2)
if lb := na.leastBusy(devices); lb != n1 {
t.Errorf("Least busy device should still be n1 (%v) not %v", n1, lb)
}
na.done(n0)
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
}

8
lib/model/doc.go Normal file
View File

@@ -0,0 +1,8 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// Package model implements folder abstraction and file pulling mechanisms
package model

135
lib/model/folderstate.go Normal file
View File

@@ -0,0 +1,135 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"time"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/sync"
)
type folderState int
const (
FolderIdle folderState = iota
FolderScanning
FolderSyncing
FolderError
)
func (s folderState) String() string {
switch s {
case FolderIdle:
return "idle"
case FolderScanning:
return "scanning"
case FolderSyncing:
return "syncing"
case FolderError:
return "error"
default:
return "unknown"
}
}
type stateTracker struct {
folder string
mut sync.Mutex
current folderState
err error
changed time.Time
}
// setState sets the new folder state, for states other than FolderError.
func (s *stateTracker) setState(newState folderState) {
if newState == FolderError {
panic("must use setError")
}
s.mut.Lock()
if newState != s.current {
/* This should hold later...
if s.current != FolderIdle && (newState == FolderScanning || newState == FolderSyncing) {
panic("illegal state transition " + s.current.String() + " -> " + newState.String())
}
*/
eventData := map[string]interface{}{
"folder": s.folder,
"to": newState.String(),
"from": s.current.String(),
}
if !s.changed.IsZero() {
eventData["duration"] = time.Since(s.changed).Seconds()
}
s.current = newState
s.changed = time.Now()
events.Default.Log(events.StateChanged, eventData)
}
s.mut.Unlock()
}
// getState returns the current state, the time when it last changed, and the
// current error or nil.
func (s *stateTracker) getState() (current folderState, changed time.Time, err error) {
s.mut.Lock()
current, changed, err = s.current, s.changed, s.err
s.mut.Unlock()
return
}
// setError sets the folder state to FolderError with the specified error.
func (s *stateTracker) setError(err error) {
s.mut.Lock()
if s.current != FolderError || s.err.Error() != err.Error() {
eventData := map[string]interface{}{
"folder": s.folder,
"to": FolderError.String(),
"from": s.current.String(),
"error": err.Error(),
}
if !s.changed.IsZero() {
eventData["duration"] = time.Since(s.changed).Seconds()
}
s.current = FolderError
s.err = err
s.changed = time.Now()
events.Default.Log(events.StateChanged, eventData)
}
s.mut.Unlock()
}
// clearError sets the folder state to FolderIdle and clears the error
func (s *stateTracker) clearError() {
s.mut.Lock()
if s.current == FolderError {
eventData := map[string]interface{}{
"folder": s.folder,
"to": FolderIdle.String(),
"from": s.current.String(),
}
if !s.changed.IsZero() {
eventData["duration"] = time.Since(s.changed).Seconds()
}
s.current = FolderIdle
s.err = nil
s.changed = time.Now()
events.Default.Log(events.StateChanged, eventData)
}
s.mut.Unlock()
}

1945
lib/model/model.go Normal file

File diff suppressed because it is too large Load Diff

1211
lib/model/model_test.go Normal file

File diff suppressed because it is too large Load Diff

152
lib/model/progressemitter.go Executable file
View File

@@ -0,0 +1,152 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"fmt"
"path/filepath"
"reflect"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/sync"
)
type ProgressEmitter struct {
registry map[string]*sharedPullerState
interval time.Duration
last map[string]map[string]*pullerProgress
mut sync.Mutex
timer *time.Timer
stop chan struct{}
}
// NewProgressEmitter creates a new progress emitter which emits
// DownloadProgress events every interval.
func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
t := &ProgressEmitter{
stop: make(chan struct{}),
registry: make(map[string]*sharedPullerState),
last: make(map[string]map[string]*pullerProgress),
timer: time.NewTimer(time.Millisecond),
mut: sync.NewMutex(),
}
t.CommitConfiguration(config.Configuration{}, cfg.Raw())
cfg.Subscribe(t)
return t
}
// Serve starts the progress emitter which starts emitting DownloadProgress
// events as the progress happens.
func (t *ProgressEmitter) Serve() {
for {
select {
case <-t.stop:
if debug {
l.Debugln("progress emitter: stopping")
}
return
case <-t.timer.C:
t.mut.Lock()
if debug {
l.Debugln("progress emitter: timer - looking after", len(t.registry))
}
output := make(map[string]map[string]*pullerProgress)
for _, puller := range t.registry {
if output[puller.folder] == nil {
output[puller.folder] = make(map[string]*pullerProgress)
}
output[puller.folder][puller.file.Name] = puller.Progress()
}
if !reflect.DeepEqual(t.last, output) {
events.Default.Log(events.DownloadProgress, output)
t.last = output
if debug {
l.Debugf("progress emitter: emitting %#v", output)
}
} else if debug {
l.Debugln("progress emitter: nothing new")
}
if len(t.registry) != 0 {
t.timer.Reset(t.interval)
}
t.mut.Unlock()
}
}
}
// VerifyConfiguration implements the config.Committer interface
func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error {
return nil
}
// CommitConfiguration implements the config.Committer interface
func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) bool {
t.mut.Lock()
defer t.mut.Unlock()
t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
if debug {
l.Debugln("progress emitter: updated interval", t.interval)
}
return true
}
// Stop stops the emitter.
func (t *ProgressEmitter) Stop() {
t.stop <- struct{}{}
}
// Register a puller with the emitter which will start broadcasting pullers
// progress.
func (t *ProgressEmitter) Register(s *sharedPullerState) {
t.mut.Lock()
defer t.mut.Unlock()
if debug {
l.Debugln("progress emitter: registering", s.folder, s.file.Name)
}
if len(t.registry) == 0 {
t.timer.Reset(t.interval)
}
t.registry[filepath.Join(s.folder, s.file.Name)] = s
}
// Deregister a puller which will stop broadcasting pullers state.
func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
t.mut.Lock()
defer t.mut.Unlock()
if debug {
l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
}
delete(t.registry, filepath.Join(s.folder, s.file.Name))
}
// BytesCompleted returns the number of bytes completed in the given folder.
func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
t.mut.Lock()
defer t.mut.Unlock()
for _, s := range t.registry {
if s.folder == folder {
bytes += s.Progress().BytesDone
}
}
if debug {
l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
}
return
}
func (t *ProgressEmitter) String() string {
return fmt.Sprintf("ProgressEmitter@%p", t)
}

View File

@@ -0,0 +1,87 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"testing"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/sync"
)
var timeout = 10 * time.Millisecond
func expectEvent(w *events.Subscription, t *testing.T, size int) {
event, err := w.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err)
}
if event.Type != events.DownloadProgress {
t.Fatal("Unexpected event:", event)
}
data := event.Data.(map[string]map[string]*pullerProgress)
if len(data) != size {
t.Fatal("Unexpected event data size:", data)
}
}
func expectTimeout(w *events.Subscription, t *testing.T) {
_, err := w.Poll(timeout)
if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err)
}
}
func TestProgressEmitter(t *testing.T) {
w := events.Default.Subscribe(events.DownloadProgress)
c := config.Wrap("/tmp/test", config.Configuration{})
c.SetOptions(config.OptionsConfiguration{
ProgressUpdateIntervalS: 0,
})
p := NewProgressEmitter(c)
go p.Serve()
expectTimeout(w, t)
s := sharedPullerState{
mut: sync.NewMutex(),
}
p.Register(&s)
expectEvent(w, t, 1)
expectTimeout(w, t)
s.copyDone()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.copiedFromOrigin()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.pullStarted()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.pullDone()
expectEvent(w, t, 1)
expectTimeout(w, t)
p.Deregister(&s)
expectEvent(w, t, 0)
expectTimeout(w, t)
}

152
lib/model/queue.go Normal file
View File

@@ -0,0 +1,152 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"math/rand"
"sort"
"github.com/syncthing/syncthing/lib/sync"
)
type jobQueue struct {
progress []string
queued []jobQueueEntry
mut sync.Mutex
}
type jobQueueEntry struct {
name string
size int64
modified int64
}
func newJobQueue() *jobQueue {
return &jobQueue{
mut: sync.NewMutex(),
}
}
func (q *jobQueue) Push(file string, size, modified int64) {
q.mut.Lock()
q.queued = append(q.queued, jobQueueEntry{file, size, modified})
q.mut.Unlock()
}
func (q *jobQueue) Pop() (string, bool) {
q.mut.Lock()
defer q.mut.Unlock()
if len(q.queued) == 0 {
return "", false
}
f := q.queued[0].name
q.queued = q.queued[1:]
q.progress = append(q.progress, f)
return f, true
}
func (q *jobQueue) BringToFront(filename string) {
q.mut.Lock()
defer q.mut.Unlock()
for i, cur := range q.queued {
if cur.name == filename {
if i > 0 {
// Shift the elements before the selected element one step to
// the right, overwriting the selected element
copy(q.queued[1:i+1], q.queued[0:])
// Put the selected element at the front
q.queued[0] = cur
}
return
}
}
}
func (q *jobQueue) Done(file string) {
q.mut.Lock()
defer q.mut.Unlock()
for i := range q.progress {
if q.progress[i] == file {
copy(q.progress[i:], q.progress[i+1:])
q.progress = q.progress[:len(q.progress)-1]
return
}
}
}
func (q *jobQueue) Jobs() ([]string, []string) {
q.mut.Lock()
defer q.mut.Unlock()
progress := make([]string, len(q.progress))
copy(progress, q.progress)
queued := make([]string, len(q.queued))
for i := range q.queued {
queued[i] = q.queued[i].name
}
return progress, queued
}
func (q *jobQueue) Shuffle() {
q.mut.Lock()
defer q.mut.Unlock()
l := len(q.queued)
for i := range q.queued {
r := rand.Intn(l)
q.queued[i], q.queued[r] = q.queued[r], q.queued[i]
}
}
func (q *jobQueue) SortSmallestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(smallestFirst(q.queued))
}
func (q *jobQueue) SortLargestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(sort.Reverse(smallestFirst(q.queued)))
}
func (q *jobQueue) SortOldestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(oldestFirst(q.queued))
}
func (q *jobQueue) SortNewestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(sort.Reverse(oldestFirst(q.queued)))
}
// The usual sort.Interface boilerplate
type smallestFirst []jobQueueEntry
func (q smallestFirst) Len() int { return len(q) }
func (q smallestFirst) Less(a, b int) bool { return q[a].size < q[b].size }
func (q smallestFirst) Swap(a, b int) { q[a], q[b] = q[b], q[a] }
type oldestFirst []jobQueueEntry
func (q oldestFirst) Len() int { return len(q) }
func (q oldestFirst) Less(a, b int) bool { return q[a].modified < q[b].modified }
func (q oldestFirst) Swap(a, b int) { q[a], q[b] = q[b], q[a] }

280
lib/model/queue_test.go Normal file
View File

@@ -0,0 +1,280 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"fmt"
"reflect"
"testing"
)
func TestJobQueue(t *testing.T) {
// Some random actions
q := newJobQueue()
q.Push("f1", 0, 0)
q.Push("f2", 0, 0)
q.Push("f3", 0, 0)
q.Push("f4", 0, 0)
progress, queued := q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
}
for i := 1; i < 5; i++ {
n, ok := q.Pop()
if !ok || n != fmt.Sprintf("f%d", i) {
t.Fatal("Wrong element")
}
progress, queued = q.Jobs()
if len(progress) != 1 || len(queued) != 3 {
t.Log(progress)
t.Log(queued)
t.Fatal("Wrong length")
}
q.Done(n)
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 3 {
t.Fatal("Wrong length", len(progress), len(queued))
}
q.Push(n, 0, 0)
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
}
q.Done("f5") // Does not exist
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
}
}
if len(q.progress) > 0 || len(q.queued) != 4 {
t.Fatal("Wrong length")
}
for i := 4; i > 0; i-- {
progress, queued = q.Jobs()
if len(progress) != 4-i || len(queued) != i {
t.Fatal("Wrong length")
}
s := fmt.Sprintf("f%d", i)
q.BringToFront(s)
progress, queued = q.Jobs()
if len(progress) != 4-i || len(queued) != i {
t.Fatal("Wrong length")
}
n, ok := q.Pop()
if !ok || n != s {
t.Fatal("Wrong element")
}
progress, queued = q.Jobs()
if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length")
}
q.Done("f5") // Does not exist
progress, queued = q.Jobs()
if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length")
}
}
_, ok := q.Pop()
if len(q.progress) != 4 || ok {
t.Fatal("Wrong length")
}
q.Done("f1")
q.Done("f2")
q.Done("f3")
q.Done("f4")
q.Done("f5") // Does not exist
_, ok = q.Pop()
if len(q.progress) != 0 || ok {
t.Fatal("Wrong length")
}
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length")
}
q.BringToFront("")
q.Done("f5") // Does not exist
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length")
}
}
func TestBringToFront(t *testing.T) {
q := newJobQueue()
q.Push("f1", 0, 0)
q.Push("f2", 0, 0)
q.Push("f3", 0, 0)
q.Push("f4", 0, 0)
_, queued := q.Jobs()
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
t.Errorf("Incorrect order %v at start", queued)
}
q.BringToFront("f1") // corner case: does nothing
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
t.Errorf("Incorrect order %v", queued)
}
q.BringToFront("f3")
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f3", "f1", "f2", "f4"}) {
t.Errorf("Incorrect order %v", queued)
}
q.BringToFront("f2")
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f2", "f3", "f1", "f4"}) {
t.Errorf("Incorrect order %v", queued)
}
q.BringToFront("f4") // corner case: last element
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f4", "f2", "f3", "f1"}) {
t.Errorf("Incorrect order %v", queued)
}
}
func TestShuffle(t *testing.T) {
q := newJobQueue()
q.Push("f1", 0, 0)
q.Push("f2", 0, 0)
q.Push("f3", 0, 0)
q.Push("f4", 0, 0)
// This test will fail once in eight million times (1 / (4!)^5) :)
for i := 0; i < 5; i++ {
q.Shuffle()
_, queued := q.Jobs()
if l := len(queued); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
t.Logf("%v", queued)
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
// The queue was shuffled
return
}
}
t.Error("Queue was not shuffled after five attempts.")
}
func TestSortBySize(t *testing.T) {
q := newJobQueue()
q.Push("f1", 20, 0)
q.Push("f2", 40, 0)
q.Push("f3", 30, 0)
q.Push("f4", 10, 0)
q.SortSmallestFirst()
_, actual := q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected := []string{"f4", "f1", "f3", "f2"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortSmallestFirst(): %#v != %#v", actual, expected)
}
q.SortLargestFirst()
_, actual = q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected = []string{"f2", "f3", "f1", "f4"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortLargestFirst(): %#v != %#v", actual, expected)
}
}
func TestSortByAge(t *testing.T) {
q := newJobQueue()
q.Push("f1", 0, 20)
q.Push("f2", 0, 40)
q.Push("f3", 0, 30)
q.Push("f4", 0, 10)
q.SortOldestFirst()
_, actual := q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected := []string{"f4", "f1", "f3", "f2"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortOldestFirst(): %#v != %#v", actual, expected)
}
q.SortNewestFirst()
_, actual = q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected = []string{"f2", "f3", "f1", "f4"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortNewestFirst(): %#v != %#v", actual, expected)
}
}
func BenchmarkJobQueueBump(b *testing.B) {
files := genFiles(b.N)
q := newJobQueue()
for _, f := range files {
q.Push(f.Name, 0, 0)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.BringToFront(files[i].Name)
}
}
func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
files := genFiles(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
q := newJobQueue()
for _, f := range files {
q.Push(f.Name, 0, 0)
}
for _ = range files {
n, _ := q.Pop()
q.Done(n)
}
}
}

164
lib/model/rofolder.go Normal file
View File

@@ -0,0 +1,164 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"fmt"
"math/rand"
"time"
"github.com/syncthing/syncthing/lib/sync"
)
type roFolder struct {
stateTracker
folder string
intv time.Duration
timer *time.Timer
model *Model
stop chan struct{}
scanNow chan rescanRequest
delayScan chan time.Duration
}
type rescanRequest struct {
subs []string
err chan error
}
func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
return &roFolder{
stateTracker: stateTracker{
folder: folder,
mut: sync.NewMutex(),
},
folder: folder,
intv: interval,
timer: time.NewTimer(time.Millisecond),
model: model,
stop: make(chan struct{}),
scanNow: make(chan rescanRequest),
delayScan: make(chan time.Duration),
}
}
func (s *roFolder) Serve() {
if debug {
l.Debugln(s, "starting")
defer l.Debugln(s, "exiting")
}
defer func() {
s.timer.Stop()
}()
reschedule := func() {
if s.intv == 0 {
return
}
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4
s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond)
}
initialScanCompleted := false
for {
select {
case <-s.stop:
return
case <-s.timer.C:
if err := s.model.CheckFolderHealth(s.folder); err != nil {
l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
reschedule()
continue
}
if debug {
l.Debugln(s, "rescan")
}
if err := s.model.internalScanFolderSubs(s.folder, nil); err != nil {
// Potentially sets the error twice, once in the scanner just
// by doing a check, and once here, if the error returned is
// the same one as returned by CheckFolderHealth, though
// duplicate set is handled by setError.
s.setError(err)
reschedule()
continue
}
if !initialScanCompleted {
l.Infoln("Completed initial scan (ro) of folder", s.folder)
initialScanCompleted = true
}
if s.intv == 0 {
continue
}
reschedule()
case req := <-s.scanNow:
if err := s.model.CheckFolderHealth(s.folder); err != nil {
l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
req.err <- err
continue
}
if debug {
l.Debugln(s, "forced rescan")
}
if err := s.model.internalScanFolderSubs(s.folder, req.subs); err != nil {
// Potentially sets the error twice, once in the scanner just
// by doing a check, and once here, if the error returned is
// the same one as returned by CheckFolderHealth, though
// duplicate set is handled by setError.
s.setError(err)
req.err <- err
continue
}
req.err <- nil
case next := <-s.delayScan:
s.timer.Reset(next)
}
}
}
func (s *roFolder) Stop() {
close(s.stop)
}
func (s *roFolder) IndexUpdated() {
}
func (s *roFolder) Scan(subs []string) error {
req := rescanRequest{
subs: subs,
err: make(chan error),
}
s.scanNow <- req
return <-req.err
}
func (s *roFolder) String() string {
return fmt.Sprintf("roFolder/%s@%p", s.folder, s)
}
func (s *roFolder) BringToFront(string) {}
func (s *roFolder) Jobs() ([]string, []string) {
return nil, nil
}
func (s *roFolder) DelayScan(next time.Duration) {
s.delayScan <- next
}

1502
lib/model/rwfolder.go Normal file

File diff suppressed because it is too large Load Diff

546
lib/model/rwfolder_test.go Normal file
View File

@@ -0,0 +1,546 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
func init() {
// We do this to make sure that the temp file required for the tests does
// not get removed during the tests.
future := time.Now().Add(time.Hour)
err := os.Chtimes(filepath.Join("testdata", defTempNamer.TempName("file")), future, future)
if err != nil {
panic(err)
}
}
var blocks = []protocol.BlockInfo{
{Hash: []uint8{0xfa, 0x43, 0x23, 0x9b, 0xce, 0xe7, 0xb9, 0x7c, 0xa6, 0x2f, 0x0, 0x7c, 0xc6, 0x84, 0x87, 0x56, 0xa, 0x39, 0xe1, 0x9f, 0x74, 0xf3, 0xdd, 0xe7, 0x48, 0x6d, 0xb3, 0xf9, 0x8d, 0xf8, 0xe4, 0x71}}, // Zero'ed out block
{Offset: 0, Size: 0x20000, Hash: []uint8{0x7e, 0xad, 0xbc, 0x36, 0xae, 0xbb, 0xcf, 0x74, 0x43, 0xe2, 0x7a, 0x5a, 0x4b, 0xb8, 0x5b, 0xce, 0xe6, 0x9e, 0x1e, 0x10, 0xf9, 0x8a, 0xbc, 0x77, 0x95, 0x2, 0x29, 0x60, 0x9e, 0x96, 0xae, 0x6c}},
{Offset: 131072, Size: 0x20000, Hash: []uint8{0x3c, 0xc4, 0x20, 0xf4, 0xb, 0x2e, 0xcb, 0xb9, 0x5d, 0xce, 0x34, 0xa8, 0xc3, 0x92, 0xea, 0xf3, 0xda, 0x88, 0x33, 0xee, 0x7a, 0xb6, 0xe, 0xf1, 0x82, 0x5e, 0xb0, 0xa9, 0x26, 0xa9, 0xc0, 0xef}},
{Offset: 262144, Size: 0x20000, Hash: []uint8{0x76, 0xa8, 0xc, 0x69, 0xd7, 0x5c, 0x52, 0xfd, 0xdf, 0x55, 0xef, 0x44, 0xc1, 0xd6, 0x25, 0x48, 0x4d, 0x98, 0x48, 0x4d, 0xaa, 0x50, 0xf6, 0x6b, 0x32, 0x47, 0x55, 0x81, 0x6b, 0xed, 0xee, 0xfb}},
{Offset: 393216, Size: 0x20000, Hash: []uint8{0x44, 0x1e, 0xa4, 0xf2, 0x8d, 0x1f, 0xc3, 0x1b, 0x9d, 0xa5, 0x18, 0x5e, 0x59, 0x1b, 0xd8, 0x5c, 0xba, 0x7d, 0xb9, 0x8d, 0x70, 0x11, 0x5c, 0xea, 0xa1, 0x57, 0x4d, 0xcb, 0x3c, 0x5b, 0xf8, 0x6c}},
{Offset: 524288, Size: 0x20000, Hash: []uint8{0x8, 0x40, 0xd0, 0x5e, 0x80, 0x0, 0x0, 0x7c, 0x8b, 0xb3, 0x8b, 0xf7, 0x7b, 0x23, 0x26, 0x28, 0xab, 0xda, 0xcf, 0x86, 0x8f, 0xc2, 0x8a, 0x39, 0xc6, 0xe6, 0x69, 0x59, 0x97, 0xb6, 0x1a, 0x43}},
{Offset: 655360, Size: 0x20000, Hash: []uint8{0x38, 0x8e, 0x44, 0xcb, 0x30, 0xd8, 0x90, 0xf, 0xce, 0x7, 0x4b, 0x58, 0x86, 0xde, 0xce, 0x59, 0xa2, 0x46, 0xd2, 0xf9, 0xba, 0xaf, 0x35, 0x87, 0x38, 0xdf, 0xd2, 0xd, 0xf9, 0x45, 0xed, 0x91}},
{Offset: 786432, Size: 0x20000, Hash: []uint8{0x32, 0x28, 0xcd, 0xf, 0x37, 0x21, 0xe5, 0xd4, 0x1e, 0x58, 0x87, 0x73, 0x8e, 0x36, 0xdf, 0xb2, 0x70, 0x78, 0x56, 0xc3, 0x42, 0xff, 0xf7, 0x8f, 0x37, 0x95, 0x0, 0x26, 0xa, 0xac, 0x54, 0x72}},
{Offset: 917504, Size: 0x20000, Hash: []uint8{0x96, 0x6b, 0x15, 0x6b, 0xc4, 0xf, 0x19, 0x18, 0xca, 0xbb, 0x5f, 0xd6, 0xbb, 0xa2, 0xc6, 0x2a, 0xac, 0xbb, 0x8a, 0xb9, 0xce, 0xec, 0x4c, 0xdb, 0x78, 0xec, 0x57, 0x5d, 0x33, 0xf9, 0x8e, 0xaf}},
}
// Layout of the files: (indexes from the above array)
// 12345678 - Required file
// 02005008 - Existing file (currently in the index)
// 02340070 - Temp file on the disk
func TestHandleFile(t *testing.T) {
// After the diff between required and existing we should:
// Copy: 2, 5, 8
// Pull: 1, 3, 4, 6, 7
// Create existing file
existingFile := protocol.FileInfo{
Name: "filex",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[0], blocks[0],
blocks[5], blocks[0], blocks[0], blocks[8],
},
}
// Create target file
requiredFile := existingFile
requiredFile.Blocks = blocks[1:]
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Update index
m.updateLocals("default", []protocol.FileInfo{existingFile})
p := rwFolder{
folder: "default",
dir: "testdata",
model: m,
errors: make(map[string]string),
errorsMut: sync.NewMutex(),
}
copyChan := make(chan copyBlocksState, 1)
p.handleFile(requiredFile, copyChan, nil)
// Receive the results
toCopy := <-copyChan
if len(toCopy.blocks) != 8 {
t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks))
}
for i, block := range toCopy.blocks {
if string(block.Hash) != string(blocks[i+1].Hash) {
t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String())
}
}
}
func TestHandleFileWithTemp(t *testing.T) {
// After diff between required and existing we should:
// Copy: 2, 5, 8
// Pull: 1, 3, 4, 6, 7
// After dropping out blocks already on the temp file we should:
// Copy: 5, 8
// Pull: 1, 6
// Create existing file
existingFile := protocol.FileInfo{
Name: "file",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[0], blocks[0],
blocks[5], blocks[0], blocks[0], blocks[8],
},
}
// Create target file
requiredFile := existingFile
requiredFile.Blocks = blocks[1:]
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Update index
m.updateLocals("default", []protocol.FileInfo{existingFile})
p := rwFolder{
folder: "default",
dir: "testdata",
model: m,
errors: make(map[string]string),
errorsMut: sync.NewMutex(),
}
copyChan := make(chan copyBlocksState, 1)
p.handleFile(requiredFile, copyChan, nil)
// Receive the results
toCopy := <-copyChan
if len(toCopy.blocks) != 4 {
t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks))
}
for i, eq := range []int{1, 5, 6, 8} {
if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
}
}
}
func TestCopierFinder(t *testing.T) {
// After diff between required and existing we should:
// Copy: 1, 2, 3, 4, 6, 7, 8
// Since there is no existing file, nor a temp file
// After dropping out blocks found locally:
// Pull: 1, 5, 6, 8
tempFile := filepath.Join("testdata", defTempNamer.TempName("file2"))
err := os.Remove(tempFile)
if err != nil && !os.IsNotExist(err) {
t.Error(err)
}
// Create existing file
existingFile := protocol.FileInfo{
Name: defTempNamer.TempName("file"),
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[3], blocks[4],
blocks[0], blocks[0], blocks[7], blocks[0],
},
}
// Create target file
requiredFile := existingFile
requiredFile.Blocks = blocks[1:]
requiredFile.Name = "file2"
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Update index
m.updateLocals("default", []protocol.FileInfo{existingFile})
iterFn := func(folder, file string, index int32) bool {
return true
}
// Verify that the blocks we say exist on file, really exist in the db.
for _, idx := range []int{2, 3, 4, 7} {
if m.finder.Iterate(blocks[idx].Hash, iterFn) == false {
t.Error("Didn't find block")
}
}
p := rwFolder{
folder: "default",
dir: "testdata",
model: m,
errors: make(map[string]string),
errorsMut: sync.NewMutex(),
}
copyChan := make(chan copyBlocksState)
pullChan := make(chan pullBlockState, 4)
finisherChan := make(chan *sharedPullerState, 1)
// Run a single fetcher routine
go p.copierRoutine(copyChan, pullChan, finisherChan)
p.handleFile(requiredFile, copyChan, finisherChan)
pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan}
finish := <-finisherChan
select {
case <-pullChan:
t.Fatal("Finisher channel has data to be read")
case <-finisherChan:
t.Fatal("Finisher channel has data to be read")
default:
}
// Verify that the right blocks went into the pull list
for i, eq := range []int{1, 5, 6, 8} {
if string(pulls[i].block.Hash) != string(blocks[eq].Hash) {
t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String())
}
if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) {
t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String())
}
}
// Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(tempFile, protocol.BlockSize)
if err != nil {
t.Log(err)
}
for _, eq := range []int{2, 3, 4, 7} {
if string(blks[eq-1].Hash) != string(blocks[eq].Hash) {
t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String())
}
}
finish.fd.Close()
os.Remove(tempFile)
}
// Test that updating a file removes it's old blocks from the blockmap
func TestCopierCleanup(t *testing.T) {
iterFn := func(folder, file string, index int32) bool {
return true
}
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Create a file
file := protocol.FileInfo{
Name: "test",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{blocks[0]},
}
// Add file to index
m.updateLocals("default", []protocol.FileInfo{file})
if !m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Expected block not found")
}
file.Blocks = []protocol.BlockInfo{blocks[1]}
file.Version = file.Version.Update(protocol.LocalDeviceID.Short())
// Update index (removing old blocks)
m.updateLocals("default", []protocol.FileInfo{file})
if m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Unexpected block found")
}
if !m.finder.Iterate(blocks[1].Hash, iterFn) {
t.Error("Expected block not found")
}
file.Blocks = []protocol.BlockInfo{blocks[0]}
file.Version = file.Version.Update(protocol.LocalDeviceID.Short())
// Update index (removing old blocks)
m.updateLocals("default", []protocol.FileInfo{file})
if !m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Unexpected block found")
}
if m.finder.Iterate(blocks[1].Hash, iterFn) {
t.Error("Expected block not found")
}
}
// Make sure that the copier routine hashes the content when asked, and pulls
// if it fails to find the block.
func TestLastResortPulling(t *testing.T) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Add a file to index (with the incorrect block representation, as content
// doesn't actually match the block list)
file := protocol.FileInfo{
Name: "empty",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{blocks[0]},
}
m.updateLocals("default", []protocol.FileInfo{file})
// Pretend that we are handling a new file of the same content but
// with a different name (causing to copy that particular block)
file.Name = "newfile"
iterFn := func(folder, file string, index int32) bool {
return true
}
// Check that that particular block is there
if !m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Expected block not found")
}
p := rwFolder{
folder: "default",
dir: "testdata",
model: m,
errors: make(map[string]string),
errorsMut: sync.NewMutex(),
}
copyChan := make(chan copyBlocksState)
pullChan := make(chan pullBlockState, 1)
finisherChan := make(chan *sharedPullerState, 1)
// Run a single copier routine
go p.copierRoutine(copyChan, pullChan, finisherChan)
p.handleFile(file, copyChan, finisherChan)
// Copier should hash empty file, realise that the region it has read
// doesn't match the hash which was advertised by the block map, fix it
// and ask to pull the block.
<-pullChan
// Verify that it did fix the incorrect hash.
if m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Found unexpected block")
}
if !m.finder.Iterate(scanner.SHA256OfNothing, iterFn) {
t.Error("Expected block not found")
}
(<-finisherChan).fd.Close()
os.Remove(filepath.Join("testdata", defTempNamer.TempName("newfile")))
}
func TestDeregisterOnFailInCopy(t *testing.T) {
file := protocol.FileInfo{
Name: "filex",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[0], blocks[0],
blocks[5], blocks[0], blocks[0], blocks[8],
},
}
defer os.Remove("testdata/" + defTempNamer.TempName("filex"))
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
emitter := NewProgressEmitter(defaultConfig)
go emitter.Serve()
p := rwFolder{
folder: "default",
dir: "testdata",
model: m,
queue: newJobQueue(),
progressEmitter: emitter,
errors: make(map[string]string),
errorsMut: sync.NewMutex(),
}
// queue.Done should be called by the finisher routine
p.queue.Push("filex", 0, 0)
p.queue.Pop()
if len(p.queue.progress) != 1 {
t.Fatal("Expected file in progress")
}
copyChan := make(chan copyBlocksState)
pullChan := make(chan pullBlockState)
finisherBufferChan := make(chan *sharedPullerState)
finisherChan := make(chan *sharedPullerState)
go p.copierRoutine(copyChan, pullChan, finisherBufferChan)
go p.finisherRoutine(finisherChan)
p.handleFile(file, copyChan, finisherChan)
// Receive a block at puller, to indicate that at least a single copier
// loop has been performed.
toPull := <-pullChan
// Wait until copier is trying to pass something down to the puller again
time.Sleep(100 * time.Millisecond)
// Close the file
toPull.sharedPullerState.fail("test", os.ErrNotExist)
// Unblock copier
<-pullChan
select {
case state := <-finisherBufferChan:
// At this point the file should still be registered with both the job
// queue, and the progress emitter. Verify this.
if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 {
t.Fatal("Could not find file")
}
// Pass the file down the real finisher, and give it time to consume
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if state.fd != nil {
t.Fatal("File not closed?")
}
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued))
}
// Doing it again should have no effect
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered")
}
case <-time.After(time.Second):
t.Fatal("Didn't get anything to the finisher")
}
}
func TestDeregisterOnFailInPull(t *testing.T) {
file := protocol.FileInfo{
Name: "filex",
Flags: 0,
Modified: 0,
Blocks: []protocol.BlockInfo{
blocks[0], blocks[2], blocks[0], blocks[0],
blocks[5], blocks[0], blocks[0], blocks[8],
},
}
defer os.Remove("testdata/" + defTempNamer.TempName("filex"))
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
emitter := NewProgressEmitter(defaultConfig)
go emitter.Serve()
p := rwFolder{
folder: "default",
dir: "testdata",
model: m,
queue: newJobQueue(),
progressEmitter: emitter,
errors: make(map[string]string),
errorsMut: sync.NewMutex(),
}
// queue.Done should be called by the finisher routine
p.queue.Push("filex", 0, 0)
p.queue.Pop()
if len(p.queue.progress) != 1 {
t.Fatal("Expected file in progress")
}
copyChan := make(chan copyBlocksState)
pullChan := make(chan pullBlockState)
finisherBufferChan := make(chan *sharedPullerState)
finisherChan := make(chan *sharedPullerState)
go p.copierRoutine(copyChan, pullChan, finisherBufferChan)
go p.pullerRoutine(pullChan, finisherBufferChan)
go p.finisherRoutine(finisherChan)
p.handleFile(file, copyChan, finisherChan)
// Receove at finisher, we shoud error out as puller has nowhere to pull
// from.
select {
case state := <-finisherBufferChan:
// At this point the file should still be registered with both the job
// queue, and the progress emitter. Verify this.
if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 {
t.Fatal("Could not find file")
}
// Pass the file down the real finisher, and give it time to consume
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if state.fd != nil {
t.Fatal("File not closed?")
}
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued))
}
// Doing it again should have no effect
finisherChan <- state
time.Sleep(100 * time.Millisecond)
if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
t.Fatal("Still registered")
}
case <-time.After(time.Second):
t.Fatal("Didn't get anything to the finisher")
}
}

View File

@@ -0,0 +1,262 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"io"
"os"
"path/filepath"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/sync"
)
// A sharedPullerState is kept for each file that is being synced and is kept
// updated along the way.
type sharedPullerState struct {
// Immutable, does not require locking
file protocol.FileInfo // The new file (desired end state)
folder string
tempName string
realName string
reused int // Number of blocks reused from temporary file
ignorePerms bool
version protocol.Vector // The current (old) version
// Mutable, must be locked for access
err error // The first error we hit
fd *os.File // The fd of the temp file
copyTotal int // Total number of copy actions for the whole job
pullTotal int // Total number of pull actions for the whole job
copyOrigin int // Number of blocks copied from the original file
copyNeeded int // Number of copy actions still pending
pullNeeded int // Number of block pulls still pending
closed bool // True if the file has been finalClosed.
mut sync.Mutex // Protects the above
}
// A momentary state representing the progress of the puller
type pullerProgress struct {
Total int `json:"total"`
Reused int `json:"reused"`
CopiedFromOrigin int `json:"copiedFromOrigin"`
CopiedFromElsewhere int `json:"copiedFromElsewhere"`
Pulled int `json:"pulled"`
Pulling int `json:"pulling"`
BytesDone int64 `json:"bytesDone"`
BytesTotal int64 `json:"bytesTotal"`
}
// A lockedWriterAt synchronizes WriteAt calls with an external mutex.
// WriteAt() is goroutine safe by itself, but not against for example Close().
type lockedWriterAt struct {
mut *sync.Mutex
wr io.WriterAt
}
func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
(*w.mut).Lock()
defer (*w.mut).Unlock()
return w.wr.WriteAt(p, off)
}
// tempFile returns the fd for the temporary file, reusing an open fd
// or creating the file as necessary.
func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
s.mut.Lock()
defer s.mut.Unlock()
// If we've already hit an error, return early
if s.err != nil {
return nil, s.err
}
// If the temp file is already open, return the file descriptor
if s.fd != nil {
return lockedWriterAt{&s.mut, s.fd}, nil
}
// Ensure that the parent directory is writable. This is
// osutil.InWritableDir except we need to do more stuff so we duplicate it
// here.
dir := filepath.Dir(s.tempName)
if info, err := os.Stat(dir); err != nil {
s.failLocked("dst stat dir", err)
return nil, err
} else if info.Mode()&0200 == 0 {
err := os.Chmod(dir, 0755)
if !s.ignorePerms && err == nil {
defer func() {
err := os.Chmod(dir, info.Mode().Perm())
if err != nil {
panic(err)
}
}()
}
}
// Attempt to create the temp file
flags := os.O_WRONLY
if s.reused == 0 {
flags |= os.O_CREATE | os.O_EXCL
} else {
// With sufficiently bad luck when exiting or crashing, we may have
// had time to chmod the temp file to read only state but not yet
// moved it to it's final name. This leaves us with a read only temp
// file that we're going to try to reuse. To handle that, we need to
// make sure we have write permissions on the file before opening it.
err := os.Chmod(s.tempName, 0644)
if !s.ignorePerms && err != nil {
s.failLocked("dst create chmod", err)
return nil, err
}
}
fd, err := os.OpenFile(s.tempName, flags, 0666)
if err != nil {
s.failLocked("dst create", err)
return nil, err
}
// Same fd will be used by all writers
s.fd = fd
return lockedWriterAt{&s.mut, s.fd}, nil
}
// sourceFile opens the existing source file for reading
func (s *sharedPullerState) sourceFile() (*os.File, error) {
s.mut.Lock()
defer s.mut.Unlock()
// If we've already hit an error, return early
if s.err != nil {
return nil, s.err
}
// Attempt to open the existing file
fd, err := os.Open(s.realName)
if err != nil {
s.failLocked("src open", err)
return nil, err
}
return fd, nil
}
// earlyClose prints a warning message composed of the context and
// error, and marks the sharedPullerState as failed. Is a no-op when called on
// an already failed state.
func (s *sharedPullerState) fail(context string, err error) {
s.mut.Lock()
defer s.mut.Unlock()
s.failLocked(context, err)
}
func (s *sharedPullerState) failLocked(context string, err error) {
if s.err != nil {
return
}
l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
s.err = err
}
func (s *sharedPullerState) failed() error {
s.mut.Lock()
defer s.mut.Unlock()
return s.err
}
func (s *sharedPullerState) copyDone() {
s.mut.Lock()
s.copyNeeded--
if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
}
s.mut.Unlock()
}
func (s *sharedPullerState) copiedFromOrigin() {
s.mut.Lock()
s.copyOrigin++
s.mut.Unlock()
}
func (s *sharedPullerState) pullStarted() {
s.mut.Lock()
s.copyTotal--
s.copyNeeded--
s.pullTotal++
s.pullNeeded++
if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
}
s.mut.Unlock()
}
func (s *sharedPullerState) pullDone() {
s.mut.Lock()
s.pullNeeded--
if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
}
s.mut.Unlock()
}
// finalClose atomically closes and returns closed status of a file. A true
// first return value means the file was closed and should be finished, with
// the error indicating the success or failure of the close. A false first
// return value indicates the file is not ready to be closed, or is already
// closed and should in either case not be finished off now.
func (s *sharedPullerState) finalClose() (bool, error) {
s.mut.Lock()
defer s.mut.Unlock()
if s.closed {
// Already closed
return false, nil
}
if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
// Not done yet, and not errored
return false, nil
}
if s.fd != nil {
if closeErr := s.fd.Close(); closeErr != nil && s.err == nil {
// This is our error if we weren't errored before. Otherwise we
// keep the earlier error.
s.err = closeErr
}
s.fd = nil
}
s.closed = true
return true, s.err
}
// Returns the momentarily progress for the puller
func (s *sharedPullerState) Progress() *pullerProgress {
s.mut.Lock()
defer s.mut.Unlock()
total := s.reused + s.copyTotal + s.pullTotal
done := total - s.copyNeeded - s.pullNeeded
return &pullerProgress{
Total: total,
Reused: s.reused,
CopiedFromOrigin: s.copyOrigin,
CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
Pulled: s.pullTotal - s.pullNeeded,
Pulling: s.pullNeeded,
BytesTotal: db.BlocksToSize(total),
BytesDone: db.BlocksToSize(done),
}
}

View File

@@ -0,0 +1,87 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"os"
"testing"
"github.com/syncthing/syncthing/lib/sync"
)
func TestSourceFileOK(t *testing.T) {
s := sharedPullerState{
realName: "testdata/foo",
mut: sync.NewMutex(),
}
fd, err := s.sourceFile()
if err != nil {
t.Fatal(err)
}
if fd == nil {
t.Fatal("Unexpected nil fd")
}
bs := make([]byte, 6)
n, err := fd.Read(bs)
if n != len(bs) {
t.Fatalf("Wrong read length %d != %d", n, len(bs))
}
if string(bs) != "foobar" {
t.Fatalf("Wrong contents %s != foobar", string(bs))
}
if err := s.failed(); err != nil {
t.Fatal(err)
}
}
func TestSourceFileBad(t *testing.T) {
s := sharedPullerState{
realName: "nonexistent",
mut: sync.NewMutex(),
}
fd, err := s.sourceFile()
if err == nil {
t.Fatal("Unexpected nil error")
}
if fd != nil {
t.Fatal("Unexpected non-nil fd")
}
if err := s.failed(); err == nil {
t.Fatal("Unexpected nil failed()")
}
}
// Test creating temporary file inside read-only directory
func TestReadOnlyDir(t *testing.T) {
// Create a read only directory, clean it up afterwards.
os.Mkdir("testdata/read_only_dir", 0555)
defer func() {
os.Chmod("testdata/read_only_dir", 0755)
os.RemoveAll("testdata/read_only_dir")
}()
s := sharedPullerState{
tempName: "testdata/read_only_dir/.temp_name",
mut: sync.NewMutex(),
}
fd, err := s.tempFile()
if err != nil {
t.Fatal(err)
}
if fd == nil {
t.Fatal("Unexpected nil fd")
}
s.fail("Test done", nil)
s.finalClose()
}

45
lib/model/tempname.go Normal file
View File

@@ -0,0 +1,45 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"crypto/md5"
"fmt"
"path/filepath"
"runtime"
"strings"
)
type tempNamer struct {
prefix string
}
var defTempNamer tempNamer
func init() {
if runtime.GOOS == "windows" {
defTempNamer = tempNamer{"~syncthing~"}
} else {
defTempNamer = tempNamer{".syncthing."}
}
}
func (t tempNamer) IsTemporary(name string) bool {
return strings.HasPrefix(filepath.Base(name), t.prefix)
}
func (t tempNamer) TempName(name string) string {
tdir := filepath.Dir(name)
tbase := filepath.Base(name)
if len(tbase) > 240 {
hash := md5.New()
hash.Write([]byte(name))
tbase = fmt.Sprintf("%x", hash.Sum(nil))
}
tname := fmt.Sprintf("%s%s.tmp", t.prefix, tbase)
return filepath.Join(tdir, tname)
}

View File

@@ -0,0 +1,26 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"strings"
"testing"
)
func TestLongTempFilename(t *testing.T) {
filename := ""
for i := 0; i < 300; i++ {
filename += "l"
}
tFile := defTempNamer.TempName(filename)
if len(tFile) < 10 || len(tFile) > 200 {
t.Fatal("Invalid long filename")
}
if !strings.HasSuffix(defTempNamer.TempName("short"), "short.tmp") {
t.Fatal("Invalid short filename", defTempNamer.TempName("short"))
}
}

2
lib/model/testdata/.stignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.*
quux

BIN
lib/model/testdata/.syncthing.file.tmp vendored Normal file

Binary file not shown.

1
lib/model/testdata/bar vendored Normal file
View File

@@ -0,0 +1 @@
foobarbaz

1
lib/model/testdata/baz/quux vendored Normal file
View File

@@ -0,0 +1 @@
baazquux

0
lib/model/testdata/empty vendored Normal file
View File

1
lib/model/testdata/foo vendored Normal file
View File

@@ -0,0 +1 @@
foobar

BIN
lib/model/testdata/~syncthing~file.tmp vendored Normal file

Binary file not shown.

36
lib/model/util.go Normal file
View File

@@ -0,0 +1,36 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"sync"
"time"
)
func deadlockDetect(mut sync.Locker, timeout time.Duration) {
go func() {
for {
time.Sleep(timeout / 4)
ok := make(chan bool, 2)
go func() {
mut.Lock()
mut.Unlock()
ok <- true
}()
go func() {
time.Sleep(timeout)
ok <- false
}()
if r := <-ok; !r {
panic("deadlock detected")
}
}
}()
}