Rename Repository -> Folder, Node -> Device (fixes #739)

This commit is contained in:
Audrius Butkevicius
2014-09-28 12:00:38 +01:00
parent 9d816694ba
commit 6c352dca74
61 changed files with 2118 additions and 2118 deletions

View File

@@ -0,0 +1,51 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"sync"
"github.com/syncthing/syncthing/internal/protocol"
)
// deviceActivity tracks the number of outstanding requests per device and can
// answer which device is least busy. It is safe for use from multiple
// goroutines.
type deviceActivity struct {
act map[protocol.DeviceID]int
mut sync.Mutex
}
func newDeviceActivity() *deviceActivity {
return &deviceActivity{
act: make(map[protocol.DeviceID]int),
}
}
func (m deviceActivity) leastBusy(availability []protocol.DeviceID) protocol.DeviceID {
m.mut.Lock()
var low int = 2<<30 - 1
var selected protocol.DeviceID
for _, device := range availability {
if usage := m.act[device]; usage < low {
low = usage
selected = device
}
}
m.mut.Unlock()
return selected
}
func (m deviceActivity) using(device protocol.DeviceID) {
m.mut.Lock()
defer m.mut.Unlock()
m.act[device]++
}
func (m deviceActivity) done(device protocol.DeviceID) {
m.mut.Lock()
defer m.mut.Unlock()
m.act[device]--
}

View File

@@ -0,0 +1,56 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"testing"
"github.com/syncthing/syncthing/internal/protocol"
)
func TestDeviceActivity(t *testing.T) {
n0 := protocol.DeviceID{1, 2, 3, 4}
n1 := protocol.DeviceID{5, 6, 7, 8}
n2 := protocol.DeviceID{9, 10, 11, 12}
devices := []protocol.DeviceID{n0, n1, n2}
na := newDeviceActivity()
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should still be n0 (%v) not %v", n0, lb)
}
na.using(na.leastBusy(devices))
if lb := na.leastBusy(devices); lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
}
na.using(na.leastBusy(devices))
if lb := na.leastBusy(devices); lb != n2 {
t.Errorf("Least busy device should be n2 (%v) not %v", n2, lb)
}
na.using(na.leastBusy(devices))
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
na.done(n1)
if lb := na.leastBusy(devices); lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
}
na.done(n2)
if lb := na.leastBusy(devices); lb != n1 {
t.Errorf("Least busy device should still be n1 (%v) not %v", n1, lb)
}
na.done(n0)
if lb := na.leastBusy(devices); lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
}
}

View File

@@ -2,5 +2,5 @@
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
// Package model implements repository abstraction and file pulling mechanisms
// Package model implements folder abstraction and file pulling mechanisms
package model

File diff suppressed because it is too large Load Diff

View File

@@ -17,11 +17,11 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage"
)
var node1, node2 protocol.NodeID
var device1, device2 protocol.DeviceID
func init() {
node1, _ = protocol.NodeIDFromString("AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR")
node2, _ = protocol.NodeIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY")
device1, _ = protocol.DeviceIDFromString("AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR")
device2, _ = protocol.DeviceIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY")
}
var testDataExpected = map[string]protocol.FileInfo{
@@ -57,11 +57,11 @@ func init() {
func TestRequest(t *testing.T) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", &config.Configuration{}, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", &config.Configuration{}, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
bs, err := m.Request(node1, "default", "foo", 0, 6)
bs, err := m.Request(device1, "default", "foo", 0, 6)
if err != nil {
t.Fatal(err)
}
@@ -69,7 +69,7 @@ func TestRequest(t *testing.T) {
t.Errorf("Incorrect data from request: %q", string(bs))
}
bs, err = m.Request(node1, "default", "../walk.go", 0, 6)
bs, err = m.Request(device1, "default", "../walk.go", 0, 6)
if err == nil {
t.Error("Unexpected nil error on insecure file read")
}
@@ -94,76 +94,76 @@ func genFiles(n int) []protocol.FileInfo {
func BenchmarkIndex10000(b *testing.B) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
files := genFiles(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Index(node1, "default", files)
m.Index(device1, "default", files)
}
}
func BenchmarkIndex00100(b *testing.B) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
files := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Index(node1, "default", files)
m.Index(device1, "default", files)
}
}
func BenchmarkIndexUpdate10000f10000(b *testing.B) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
files := genFiles(10000)
m.Index(node1, "default", files)
m.Index(device1, "default", files)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate(node1, "default", files)
m.IndexUpdate(device1, "default", files)
}
}
func BenchmarkIndexUpdate10000f00100(b *testing.B) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
files := genFiles(10000)
m.Index(node1, "default", files)
m.Index(device1, "default", files)
ufiles := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate(node1, "default", ufiles)
m.IndexUpdate(device1, "default", ufiles)
}
}
func BenchmarkIndexUpdate10000f00001(b *testing.B) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
files := genFiles(10000)
m.Index(node1, "default", files)
m.Index(device1, "default", files)
ufiles := genFiles(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate(node1, "default", ufiles)
m.IndexUpdate(device1, "default", ufiles)
}
}
type FakeConnection struct {
id protocol.NodeID
id protocol.DeviceID
requestData []byte
}
@@ -171,7 +171,7 @@ func (FakeConnection) Close() error {
return nil
}
func (f FakeConnection) ID() protocol.NodeID {
func (f FakeConnection) ID() protocol.DeviceID {
return f.id
}
@@ -191,7 +191,7 @@ func (FakeConnection) IndexUpdate(string, []protocol.FileInfo) error {
return nil
}
func (f FakeConnection) Request(repo, name string, offset int64, size int) ([]byte, error) {
func (f FakeConnection) Request(folder, name string, offset int64, size int) ([]byte, error) {
return f.requestData, nil
}
@@ -207,9 +207,9 @@ func (FakeConnection) Statistics() protocol.Statistics {
func BenchmarkRequest(b *testing.B) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
m.ScanFolder("default")
const n = 1000
files := make([]protocol.FileInfo, n)
@@ -223,15 +223,15 @@ func BenchmarkRequest(b *testing.B) {
}
fc := FakeConnection{
id: node1,
id: device1,
requestData: []byte("some data to return"),
}
m.AddConnection(fc, fc)
m.Index(node1, "default", files)
m.Index(device1, "default", files)
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, err := m.requestGlobal(node1, "default", files[i%n].Name, 0, 32, nil)
data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil)
if err != nil {
b.Error(err)
}
@@ -241,28 +241,28 @@ func BenchmarkRequest(b *testing.B) {
}
}
func TestNodeRename(t *testing.T) {
func TestDeviceRename(t *testing.T) {
ccm := protocol.ClusterConfigMessage{
ClientName: "syncthing",
ClientVersion: "v0.9.4",
}
cfg := config.New("/tmp/test", node1)
cfg.Nodes = []config.NodeConfiguration{
cfg := config.New("/tmp/test", device1)
cfg.Devices = []config.DeviceConfiguration{
{
NodeID: node1,
DeviceID: device1,
},
}
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", &cfg, "node", "syncthing", "dev", db)
if cfg.Nodes[0].Name != "" {
t.Errorf("Node already has a name")
m := NewModel("/tmp", &cfg, "device", "syncthing", "dev", db)
if cfg.Devices[0].Name != "" {
t.Errorf("Device already has a name")
}
m.ClusterConfig(node1, ccm)
if cfg.Nodes[0].Name != "" {
t.Errorf("Node already has a name")
m.ClusterConfig(device1, ccm)
if cfg.Devices[0].Name != "" {
t.Errorf("Device already has a name")
}
ccm.Options = []protocol.Option{
@@ -271,96 +271,96 @@ func TestNodeRename(t *testing.T) {
Value: "tester",
},
}
m.ClusterConfig(node1, ccm)
if cfg.Nodes[0].Name != "tester" {
t.Errorf("Node did not get a name")
m.ClusterConfig(device1, ccm)
if cfg.Devices[0].Name != "tester" {
t.Errorf("Device did not get a name")
}
ccm.Options[0].Value = "tester2"
m.ClusterConfig(node1, ccm)
if cfg.Nodes[0].Name != "tester" {
t.Errorf("Node name got overwritten")
m.ClusterConfig(device1, ccm)
if cfg.Devices[0].Name != "tester" {
t.Errorf("Device name got overwritten")
}
}
func TestClusterConfig(t *testing.T) {
cfg := config.New("/tmp/test", node1)
cfg.Nodes = []config.NodeConfiguration{
cfg := config.New("/tmp/test", device1)
cfg.Devices = []config.DeviceConfiguration{
{
NodeID: node1,
DeviceID: device1,
Introducer: true,
},
{
NodeID: node2,
DeviceID: device2,
},
}
cfg.Repositories = []config.RepositoryConfiguration{
cfg.Folders = []config.FolderConfiguration{
{
ID: "repo1",
Nodes: []config.RepositoryNodeConfiguration{
{NodeID: node1},
{NodeID: node2},
ID: "folder1",
Devices: []config.FolderDeviceConfiguration{
{DeviceID: device1},
{DeviceID: device2},
},
},
{
ID: "repo2",
Nodes: []config.RepositoryNodeConfiguration{
{NodeID: node1},
{NodeID: node2},
ID: "folder2",
Devices: []config.FolderDeviceConfiguration{
{DeviceID: device1},
{DeviceID: device2},
},
},
}
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", &cfg, "node", "syncthing", "dev", db)
m.AddRepo(cfg.Repositories[0])
m.AddRepo(cfg.Repositories[1])
m := NewModel("/tmp", &cfg, "device", "syncthing", "dev", db)
m.AddFolder(cfg.Folders[0])
m.AddFolder(cfg.Folders[1])
cm := m.clusterConfig(node2)
cm := m.clusterConfig(device2)
if l := len(cm.Repositories); l != 2 {
t.Fatalf("Incorrect number of repos %d != 2", l)
if l := len(cm.Folders); l != 2 {
t.Fatalf("Incorrect number of folders %d != 2", l)
}
r := cm.Repositories[0]
if r.ID != "repo1" {
t.Errorf("Incorrect repo %q != repo1", r.ID)
r := cm.Folders[0]
if r.ID != "folder1" {
t.Errorf("Incorrect folder %q != folder1", r.ID)
}
if l := len(r.Nodes); l != 2 {
t.Errorf("Incorrect number of nodes %d != 2", l)
if l := len(r.Devices); l != 2 {
t.Errorf("Incorrect number of devices %d != 2", l)
}
if id := r.Nodes[0].ID; bytes.Compare(id, node1[:]) != 0 {
t.Errorf("Incorrect node ID %x != %x", id, node1)
if id := r.Devices[0].ID; bytes.Compare(id, device1[:]) != 0 {
t.Errorf("Incorrect device ID %x != %x", id, device1)
}
if r.Nodes[0].Flags&protocol.FlagIntroducer == 0 {
t.Error("Node1 should be flagged as Introducer")
if r.Devices[0].Flags&protocol.FlagIntroducer == 0 {
t.Error("Device1 should be flagged as Introducer")
}
if id := r.Nodes[1].ID; bytes.Compare(id, node2[:]) != 0 {
t.Errorf("Incorrect node ID %x != %x", id, node2)
if id := r.Devices[1].ID; bytes.Compare(id, device2[:]) != 0 {
t.Errorf("Incorrect device ID %x != %x", id, device2)
}
if r.Nodes[1].Flags&protocol.FlagIntroducer != 0 {
t.Error("Node2 should not be flagged as Introducer")
if r.Devices[1].Flags&protocol.FlagIntroducer != 0 {
t.Error("Device2 should not be flagged as Introducer")
}
r = cm.Repositories[1]
if r.ID != "repo2" {
t.Errorf("Incorrect repo %q != repo2", r.ID)
r = cm.Folders[1]
if r.ID != "folder2" {
t.Errorf("Incorrect folder %q != folder2", r.ID)
}
if l := len(r.Nodes); l != 2 {
t.Errorf("Incorrect number of nodes %d != 2", l)
if l := len(r.Devices); l != 2 {
t.Errorf("Incorrect number of devices %d != 2", l)
}
if id := r.Nodes[0].ID; bytes.Compare(id, node1[:]) != 0 {
t.Errorf("Incorrect node ID %x != %x", id, node1)
if id := r.Devices[0].ID; bytes.Compare(id, device1[:]) != 0 {
t.Errorf("Incorrect device ID %x != %x", id, device1)
}
if r.Nodes[0].Flags&protocol.FlagIntroducer == 0 {
t.Error("Node1 should be flagged as Introducer")
if r.Devices[0].Flags&protocol.FlagIntroducer == 0 {
t.Error("Device1 should be flagged as Introducer")
}
if id := r.Nodes[1].ID; bytes.Compare(id, node2[:]) != 0 {
t.Errorf("Incorrect node ID %x != %x", id, node2)
if id := r.Devices[1].ID; bytes.Compare(id, device2[:]) != 0 {
t.Errorf("Incorrect device ID %x != %x", id, device2)
}
if r.Nodes[1].Flags&protocol.FlagIntroducer != 0 {
t.Error("Node2 should not be flagged as Introducer")
if r.Devices[1].Flags&protocol.FlagIntroducer != 0 {
t.Error("Device2 should not be flagged as Introducer")
}
}
@@ -379,8 +379,8 @@ func TestIgnores(t *testing.T) {
}
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "node", "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m := NewModel("/tmp", nil, "device", "syncthing", "dev", db)
m.AddFolder(config.FolderConfiguration{ID: "default", Directory: "testdata"})
expected := []string{
".*",
@@ -440,7 +440,7 @@ func TestIgnores(t *testing.T) {
t.Error("No error")
}
m.AddRepo(config.RepositoryConfiguration{ID: "fresh", Directory: "XXX"})
m.AddFolder(config.FolderConfiguration{ID: "fresh", Directory: "XXX"})
ignores, err = m.GetIgnores("fresh")
if err != nil {
t.Error(err)

View File

@@ -1,51 +0,0 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"sync"
"github.com/syncthing/syncthing/internal/protocol"
)
// nodeActivity tracks the number of outstanding requests per node and can
// answer which node is least busy. It is safe for use from multiple
// goroutines.
type nodeActivity struct {
act map[protocol.NodeID]int
mut sync.Mutex
}
func newNodeActivity() *nodeActivity {
return &nodeActivity{
act: make(map[protocol.NodeID]int),
}
}
func (m nodeActivity) leastBusy(availability []protocol.NodeID) protocol.NodeID {
m.mut.Lock()
var low int = 2<<30 - 1
var selected protocol.NodeID
for _, node := range availability {
if usage := m.act[node]; usage < low {
low = usage
selected = node
}
}
m.mut.Unlock()
return selected
}
func (m nodeActivity) using(node protocol.NodeID) {
m.mut.Lock()
defer m.mut.Unlock()
m.act[node]++
}
func (m nodeActivity) done(node protocol.NodeID) {
m.mut.Lock()
defer m.mut.Unlock()
m.act[node]--
}

View File

@@ -1,56 +0,0 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"testing"
"github.com/syncthing/syncthing/internal/protocol"
)
func TestNodeActivity(t *testing.T) {
n0 := protocol.NodeID{1, 2, 3, 4}
n1 := protocol.NodeID{5, 6, 7, 8}
n2 := protocol.NodeID{9, 10, 11, 12}
nodes := []protocol.NodeID{n0, n1, n2}
na := newNodeActivity()
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb)
}
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should still be n0 (%v) not %v", n0, lb)
}
na.using(na.leastBusy(nodes))
if lb := na.leastBusy(nodes); lb != n1 {
t.Errorf("Least busy node should be n1 (%v) not %v", n1, lb)
}
na.using(na.leastBusy(nodes))
if lb := na.leastBusy(nodes); lb != n2 {
t.Errorf("Least busy node should be n2 (%v) not %v", n2, lb)
}
na.using(na.leastBusy(nodes))
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb)
}
na.done(n1)
if lb := na.leastBusy(nodes); lb != n1 {
t.Errorf("Least busy node should be n1 (%v) not %v", n1, lb)
}
na.done(n2)
if lb := na.leastBusy(nodes); lb != n1 {
t.Errorf("Least busy node should still be n1 (%v) not %v", n1, lb)
}
na.done(n0)
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb)
}
}

View File

@@ -23,9 +23,9 @@ import (
// TODO: Stop on errors
const (
copiersPerRepo = 1
pullersPerRepo = 16
finishersPerRepo = 2
copiersPerFolder = 1
pullersPerFolder = 16
finishersPerFolder = 2
pauseIntv = 60 * time.Second
nextPullIntv = 10 * time.Second
checkPullIntv = 1 * time.Second
@@ -46,12 +46,12 @@ type copyBlocksState struct {
}
var (
activity = newNodeActivity()
errNoNode = errors.New("no available source node")
activity = newDeviceActivity()
errNoDevice = errors.New("no available source device")
)
type Puller struct {
repo string
folder string
dir string
scanIntv time.Duration
model *Model
@@ -75,8 +75,8 @@ func (p *Puller) Serve() {
defer func() {
pullTimer.Stop()
scanTimer.Stop()
// TODO: Should there be an actual RepoStopped state?
p.model.setState(p.repo, RepoIdle)
// TODO: Should there be an actual FolderStopped state?
p.model.setState(p.folder, FolderIdle)
}()
var prevVer uint64
@@ -94,10 +94,10 @@ loop:
// Index(), so that we immediately start a pull when new index
// information is available. Before that though, I'd like to build a
// repeatable benchmark of how long it takes to sync a change from
// node A to node B, so we have something to work against.
// device A to device B, so we have something to work against.
case <-pullTimer.C:
// RemoteLocalVersion() is a fast call, doesn't touch the database.
curVer := p.model.RemoteLocalVersion(p.repo)
curVer := p.model.RemoteLocalVersion(p.folder)
if curVer == prevVer {
pullTimer.Reset(checkPullIntv)
continue
@@ -106,11 +106,11 @@ loop:
if debug {
l.Debugln(p, "pulling", prevVer, curVer)
}
p.model.setState(p.repo, RepoSyncing)
p.model.setState(p.folder, FolderSyncing)
tries := 0
for {
tries++
changed := p.pullerIteration(copiersPerRepo, pullersPerRepo, finishersPerRepo)
changed := p.pullerIteration(copiersPerFolder, pullersPerFolder, finishersPerFolder)
if debug {
l.Debugln(p, "changed", changed)
}
@@ -120,8 +120,8 @@ loop:
// sync. Remember the local version number and
// schedule a resync a little bit into the future.
if lv := p.model.RemoteLocalVersion(p.repo); lv < curVer {
// There's a corner case where the node we needed
if lv := p.model.RemoteLocalVersion(p.folder); lv < curVer {
// There's a corner case where the device we needed
// files from disconnected during the puller
// iteration. The files will have been removed from
// the index, so we've concluded that we don't need
@@ -142,12 +142,12 @@ loop:
// we're not making it. Probably there are write
// errors preventing us. Flag this with a warning and
// wait a bit longer before retrying.
l.Warnf("Repo %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.repo, pauseIntv)
l.Warnf("Folder %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.folder, pauseIntv)
pullTimer.Reset(pauseIntv)
break
}
}
p.model.setState(p.repo, RepoIdle)
p.model.setState(p.folder, FolderIdle)
// The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the
@@ -156,12 +156,12 @@ loop:
if debug {
l.Debugln(p, "rescan")
}
p.model.setState(p.repo, RepoScanning)
if err := p.model.ScanRepo(p.repo); err != nil {
invalidateRepo(p.model.cfg, p.repo, err)
p.model.setState(p.folder, FolderScanning)
if err := p.model.ScanFolder(p.folder); err != nil {
invalidateFolder(p.model.cfg, p.folder, err)
break loop
}
p.model.setState(p.repo, RepoIdle)
p.model.setState(p.folder, FolderIdle)
scanTimer.Reset(p.scanIntv)
}
}
@@ -172,13 +172,13 @@ func (p *Puller) Stop() {
}
func (p *Puller) String() string {
return fmt.Sprintf("puller/%s@%p", p.repo, p)
return fmt.Sprintf("puller/%s@%p", p.folder, p)
}
// pullerIteration runs a single puller iteration for the given repo and
// pullerIteration runs a single puller iteration for the given folder and
// returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently
// flagged as needed in the repo. The specified number of copier, puller and
// flagged as needed in the folder. The specified number of copier, puller and
// finisher routines are used. It's seldom efficient to use more than one
// copier routine, while multiple pullers are essential and multiple finishers
// may be useful (they are primarily CPU bound due to hashing).
@@ -218,7 +218,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
}
p.model.rmut.RLock()
files := p.model.repoFiles[p.repo]
files := p.model.folderFiles[p.folder]
p.model.rmut.RUnlock()
// !!!
@@ -228,7 +228,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
// !!!
changed := 0
files.WithNeed(protocol.LocalNodeID, func(intf protocol.FileIntf) bool {
files.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
// Needed items are delivered sorted lexicographically. This isn't
// really optimal from a performance point of view - it would be
@@ -240,7 +240,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
file := intf.(protocol.FileInfo)
events.Default.Log(events.ItemStarted, map[string]string{
"repo": p.repo,
"folder": p.folder,
"item": file.Name,
})
@@ -290,7 +290,7 @@ func (p *Puller) handleDir(file protocol.FileInfo) {
mode := os.FileMode(file.Flags & 0777)
if debug {
curFile := p.model.CurrentRepoFile(p.repo, file.Name)
curFile := p.model.CurrentFolderFile(p.folder, file.Name)
l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
}
@@ -307,19 +307,19 @@ func (p *Puller) handleDir(file protocol.FileInfo) {
}
if err = osutil.InWritableDir(mkdir, realName); err == nil {
p.model.updateLocal(p.repo, file)
p.model.updateLocal(p.folder, file)
} else {
l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
l.Infof("Puller (folder %q, file %q): %v", p.folder, file.Name, err)
}
return
}
// Weird error when stat()'ing the dir. Probably won't work to do
// anything else with it if we can't even stat() it.
l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
l.Infof("Puller (folder %q, file %q): %v", p.folder, file.Name, err)
return
} else if !info.IsDir() {
l.Infof("Puller (repo %q, file %q): should be dir, but is not", p.repo, file.Name)
l.Infof("Puller (folder %q, file %q): should be dir, but is not", p.folder, file.Name)
return
}
@@ -328,9 +328,9 @@ func (p *Puller) handleDir(file protocol.FileInfo) {
// It's OK to change mode bits on stuff within non-writable directories.
if err := os.Chmod(realName, mode); err == nil {
p.model.updateLocal(p.repo, file)
p.model.updateLocal(p.folder, file)
} else {
l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
l.Infof("Puller (folder %q, file %q): %v", p.folder, file.Name, err)
}
}
@@ -339,7 +339,7 @@ func (p *Puller) deleteDir(file protocol.FileInfo) {
realName := filepath.Join(p.dir, file.Name)
err := osutil.InWritableDir(os.Remove, realName)
if err == nil || os.IsNotExist(err) {
p.model.updateLocal(p.repo, file)
p.model.updateLocal(p.folder, file)
}
}
@@ -355,16 +355,16 @@ func (p *Puller) deleteFile(file protocol.FileInfo) {
}
if err != nil {
l.Infof("Puller (repo %q, file %q): delete: %v", p.repo, file.Name, err)
l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
} else {
p.model.updateLocal(p.repo, file)
p.model.updateLocal(p.folder, file)
}
}
// handleFile queues the copies and pulls as necessary for a single new or
// changed file.
func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState) {
curFile := p.model.CurrentRepoFile(p.repo, file.Name)
curFile := p.model.CurrentFolderFile(p.folder, file.Name)
copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks)
if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 {
@@ -384,7 +384,7 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
s := sharedPullerState{
file: file,
repo: p.repo,
folder: p.folder,
tempName: tempName,
realName: realName,
pullNeeded: len(pullBlocks),
@@ -422,18 +422,18 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) {
realName := filepath.Join(p.dir, file.Name)
err := os.Chmod(realName, os.FileMode(file.Flags&0777))
if err != nil {
l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err)
l.Infof("Puller (folder %q, file %q): shortcut: %v", p.folder, file.Name, err)
return
}
t := time.Unix(file.Modified, 0)
err = os.Chtimes(realName, t, t)
if err != nil {
l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err)
l.Infof("Puller (folder %q, file %q): shortcut: %v", p.folder, file.Name, err)
return
}
p.model.updateLocal(p.repo, file)
p.model.updateLocal(p.folder, file)
}
// copierRoutine reads pullerStates until the in channel closes and performs
@@ -487,13 +487,13 @@ nextBlock:
continue nextBlock
}
// Select the least busy node to pull the block frop.model. If we found no
// feasible node at all, fail the block (and in the long run, the
// Select the least busy device to pull the block frop.model. If we found no
// feasible device at all, fail the block (and in the long run, the
// file).
potentialNodes := p.model.availability(p.repo, state.file.Name)
selected := activity.leastBusy(potentialNodes)
if selected == (protocol.NodeID{}) {
state.earlyClose("pull", errNoNode)
potentialDevices := p.model.availability(p.folder, state.file.Name)
selected := activity.leastBusy(potentialDevices)
if selected == (protocol.DeviceID{}) {
state.earlyClose("pull", errNoDevice)
continue nextBlock
}
@@ -505,10 +505,10 @@ nextBlock:
continue nextBlock
}
// Fetch the block, while marking the selected node as in use so that
// leastBusy can select another node when someone else asks.
// Fetch the block, while marking the selected device as in use so that
// leastBusy can select another device when someone else asks.
activity.using(selected)
buf, err := p.model.requestGlobal(selected, p.repo, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
buf, err := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
activity.done(selected)
if err != nil {
state.earlyClose("pull", err)
@@ -589,7 +589,7 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
}
// Record the updated file in the index
p.model.updateLocal(p.repo, state.file)
p.model.updateLocal(p.folder, state.file)
}
}
}
@@ -609,11 +609,11 @@ func (p *Puller) clean() {
})
}
func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
for i := range cfg.Repositories {
repo := &cfg.Repositories[i]
if repo.ID == repoID {
repo.Invalid = err.Error()
func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range cfg.Folders {
folder := &cfg.Folders[i]
if folder.ID == folderID {
folder.Invalid = err.Error()
return
}
}

View File

@@ -17,7 +17,7 @@ import (
type sharedPullerState struct {
// Immutable, does not require locking
file protocol.FileInfo
repo string
folder string
tempName string
realName string
@@ -113,7 +113,7 @@ func (s *sharedPullerState) earlyCloseLocked(context string, err error) {
return
}
l.Infof("Puller (repo %q, file %q): %s: %v", s.repo, s.file.Name, context, err)
l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
s.err = err
if s.fd != nil {
s.fd.Close()
@@ -133,7 +133,7 @@ func (s *sharedPullerState) copyDone() {
s.mut.Lock()
s.copyNeeded--
if debug {
l.Debugln("sharedPullerState", s.repo, s.file.Name, "copyNeeded ->", s.pullNeeded)
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.pullNeeded)
}
s.mut.Unlock()
}
@@ -142,7 +142,7 @@ func (s *sharedPullerState) pullDone() {
s.mut.Lock()
s.pullNeeded--
if debug {
l.Debugln("sharedPullerState", s.repo, s.file.Name, "pullNeeded ->", s.pullNeeded)
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded ->", s.pullNeeded)
}
s.mut.Unlock()
}

0
internal/model/testdata/.stignore vendored Normal file → Executable file
View File