Complete rewrite of the puller (fixes #638, fixes #715, fixes #701)

This commit is contained in:
Jakob Borg
2014-09-27 14:44:15 +02:00
parent 7bc4589d4d
commit 99427d649e
16 changed files with 1030 additions and 824 deletions

View File

@@ -28,6 +28,7 @@ import (
"github.com/syncthing/syncthing/internal/protocol"
"github.com/syncthing/syncthing/internal/scanner"
"github.com/syncthing/syncthing/internal/stats"
"github.com/syncthing/syncthing/internal/versioner"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -138,22 +139,54 @@ func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName,
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRepoRW(repo string, threads int) {
m.rmut.RLock()
defer m.rmut.RUnlock()
func (m *Model) StartRepoRW(repo string) {
m.rmut.Lock()
cfg, ok := m.repoCfgs[repo]
m.rmut.Unlock()
if cfg, ok := m.repoCfgs[repo]; !ok {
panic("cannot start without repo")
} else {
newPuller(cfg, m, threads, m.cfg)
if !ok {
panic("cannot start nonexistent repo " + repo)
}
p := Puller{
repo: repo,
dir: cfg.Directory,
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
model: m,
}
if len(cfg.Versioning.Type) > 0 {
factory, ok := versioner.Factories[cfg.Versioning.Type]
if !ok {
l.Fatalf("Requested versioning type %q that does not exist", cfg.Versioning.Type)
}
p.versioner = factory(repo, cfg.Directory, cfg.Versioning.Params)
}
go p.Serve()
}
// StartRO starts read only processing on the current model. When in
// read only mode the model will announce files to the cluster but not
// pull in any external changes.
func (m *Model) StartRepoRO(repo string) {
m.StartRepoRW(repo, 0) // zero threads => read only
intv := time.Duration(m.repoCfgs[repo].RescanIntervalS) * time.Second
go func() {
for {
time.Sleep(intv)
if debug {
l.Debugln(m, "rescan", repo)
}
m.setState(repo, RepoScanning)
if err := m.ScanRepo(repo); err != nil {
invalidateRepo(m.cfg, repo, err)
return
}
m.setState(repo, RepoIdle)
}
}()
}
type ConnectionInfo struct {
@@ -240,7 +273,7 @@ func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
res := 100 * (1 - float64(need)/float64(tot))
if debug {
l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
l.Debugf("%v Completion(%s, %q): %f (%d / %d)", m, node, repo, res, need, tot)
}
return res
@@ -316,7 +349,7 @@ func (m *Model) NeedSize(repo string) (files int, bytes int64) {
})
}
if debug {
l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
l.Debugf("%v NeedSize(%q): %d %d", m, repo, files, bytes)
}
return
}
@@ -389,7 +422,7 @@ func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInf
// Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
if debug {
l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
l.Debugf("%v IDXUP(in): %s / %q: %d files", m, nodeID, repo, len(fs))
}
if !m.repoSharedWith(repo, nodeID) {
@@ -475,7 +508,7 @@ func (m *Model) ClusterConfig(nodeID protocol.NodeID, cm protocol.ClusterConfigM
var id protocol.NodeID
copy(id[:], node.ID)
if m.cfg.GetNodeConfiguration(id)==nil {
if m.cfg.GetNodeConfiguration(id) == nil {
// The node is currently unknown. Add it to the config.
l.Infof("Adding node %v to config (vouched for by introducer %v)", id, nodeID)
@@ -574,20 +607,20 @@ func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64,
lf := r.Get(protocol.LocalNodeID, name)
if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
if debug {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, nodeID, repo, name, offset, size, lf)
}
return nil, ErrInvalid
}
if offset > lf.Size() {
if debug {
l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, nodeID, name, offset, size)
}
return nil, ErrNoSuchFile
}
if debug && nodeID != protocol.LocalNodeID {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, nodeID, repo, name, offset, size)
}
m.rmut.RLock()
fn := filepath.Join(m.repoCfgs[repo].Directory, name)
@@ -768,15 +801,9 @@ func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores i
var err error
if debug {
l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
l.Debugf("sendIndexes for %s-%s/%q starting", nodeID, name, repo)
}
defer func() {
if debug {
l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
}
}()
minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores)
for err == nil {
@@ -787,6 +814,10 @@ func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores i
minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores)
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q exiting: %v", nodeID, name, repo, err)
}
}
func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) {
@@ -877,7 +908,7 @@ func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset
}
if debug {
l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x", m, nodeID, repo, name, offset, size, hash)
}
return nc.Request(repo, name, offset, size)
@@ -1175,10 +1206,10 @@ func (m *Model) Override(repo string) {
m.setState(repo, RepoIdle)
}
// Version returns the change version for the given repository. This is
// guaranteed to increment if the contents of the local or global repository
// has changed.
func (m *Model) LocalVersion(repo string) uint64 {
// CurrentLocalVersion returns the change version for the given repository.
// This is guaranteed to increment if the contents of the local repository has
// changed.
func (m *Model) CurrentLocalVersion(repo string) uint64 {
m.rmut.Lock()
defer m.rmut.Unlock()
@@ -1187,10 +1218,41 @@ func (m *Model) LocalVersion(repo string) uint64 {
panic("bug: LocalVersion called for nonexistent repo " + repo)
}
ver := fs.LocalVersion(protocol.LocalNodeID)
return fs.LocalVersion(protocol.LocalNodeID)
}
// RemoteLocalVersion returns the change version for the given repository, as
// sent by remote peers. This is guaranteed to increment if the contents of
// the remote or global repository has changed.
func (m *Model) RemoteLocalVersion(repo string) uint64 {
m.rmut.Lock()
defer m.rmut.Unlock()
fs, ok := m.repoFiles[repo]
if !ok {
panic("bug: LocalVersion called for nonexistent repo " + repo)
}
var ver uint64
for _, n := range m.repoNodes[repo] {
ver += fs.LocalVersion(n)
}
return ver
}
func (m *Model) availability(repo string, file string) []protocol.NodeID {
m.rmut.Lock()
defer m.rmut.Unlock()
fs, ok := m.repoFiles[repo]
if !ok {
return nil
}
return fs.Availability(file)
}
func (m *Model) String() string {
return fmt.Sprintf("model@%p", m)
}

View File

@@ -241,25 +241,6 @@ func BenchmarkRequest(b *testing.B) {
}
}
func TestActivityMap(t *testing.T) {
isValid := func(protocol.NodeID) bool {
return true
}
m := make(activityMap)
if node := m.leastBusyNode([]protocol.NodeID{node1}, isValid); node != node1 {
t.Errorf("Incorrect least busy node %q", node)
}
if node := m.leastBusyNode([]protocol.NodeID{node2}, isValid); node != node2 {
t.Errorf("Incorrect least busy node %q", node)
}
if node := m.leastBusyNode([]protocol.NodeID{node1, node2}, isValid); node != node1 {
t.Errorf("Incorrect least busy node %q", node)
}
if node := m.leastBusyNode([]protocol.NodeID{node1, node2}, isValid); node != node2 {
t.Errorf("Incorrect least busy node %q", node)
}
}
func TestNodeRename(t *testing.T) {
ccm := protocol.ClusterConfigMessage{
ClientName: "syncthing",

View File

@@ -0,0 +1,51 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"sync"
"github.com/syncthing/syncthing/internal/protocol"
)
// nodeActivity tracks the number of outstanding requests per node and can
// answer which node is least busy. It is safe for use from multiple
// goroutines.
type nodeActivity struct {
act map[protocol.NodeID]int
mut sync.Mutex
}
func newNodeActivity() *nodeActivity {
return &nodeActivity{
act: make(map[protocol.NodeID]int),
}
}
func (m nodeActivity) leastBusy(availability []protocol.NodeID) protocol.NodeID {
m.mut.Lock()
var low int = 2<<30 - 1
var selected protocol.NodeID
for _, node := range availability {
if usage := m.act[node]; usage < low {
low = usage
selected = node
}
}
m.mut.Unlock()
return selected
}
func (m nodeActivity) using(node protocol.NodeID) {
m.mut.Lock()
defer m.mut.Unlock()
m.act[node]++
}
func (m nodeActivity) done(node protocol.NodeID) {
m.mut.Lock()
defer m.mut.Unlock()
m.act[node]--
}

View File

@@ -0,0 +1,56 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"testing"
"github.com/syncthing/syncthing/internal/protocol"
)
func TestNodeActivity(t *testing.T) {
n0 := protocol.NodeID{1, 2, 3, 4}
n1 := protocol.NodeID{5, 6, 7, 8}
n2 := protocol.NodeID{9, 10, 11, 12}
nodes := []protocol.NodeID{n0, n1, n2}
na := newNodeActivity()
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb)
}
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should still be n0 (%v) not %v", n0, lb)
}
na.using(na.leastBusy(nodes))
if lb := na.leastBusy(nodes); lb != n1 {
t.Errorf("Least busy node should be n1 (%v) not %v", n1, lb)
}
na.using(na.leastBusy(nodes))
if lb := na.leastBusy(nodes); lb != n2 {
t.Errorf("Least busy node should be n2 (%v) not %v", n2, lb)
}
na.using(na.leastBusy(nodes))
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb)
}
na.done(n1)
if lb := na.leastBusy(nodes); lb != n1 {
t.Errorf("Least busy node should be n1 (%v) not %v", n1, lb)
}
na.done(n2)
if lb := na.leastBusy(nodes); lb != n1 {
t.Errorf("Least busy node should still be n1 (%v) not %v", n1, lb)
}
na.done(n0)
if lb := na.leastBusy(nodes); lb != n0 {
t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,183 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/syncthing/syncthing/internal/protocol"
)
// A sharedPullerState is kept for each file that is being synced and is kept
// updated along the way.
type sharedPullerState struct {
// Immutable, does not require locking
file protocol.FileInfo
repo string
tempName string
realName string
// Mutable, must be locked for access
err error // The first error we hit
fd *os.File // The fd of the temp file
copyNeeded int // Number of copy actions we expect to happen
pullNeeded int // Number of block pulls we expect to happen
closed bool // Set when the file has been closed
mut sync.Mutex // Protects the above
}
// tempFile returns the fd for the temporary file, reusing an open fd
// or creating the file as necessary.
func (s *sharedPullerState) tempFile() (*os.File, error) {
s.mut.Lock()
defer s.mut.Unlock()
// If we've already hit an error, return early
if s.err != nil {
return nil, s.err
}
// If the temp file is already open, return the file descriptor
if s.fd != nil {
return s.fd, nil
}
// Ensure that the parent directory exists or can be created
dir := filepath.Dir(s.tempName)
if info, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, 0755)
if err != nil {
s.earlyCloseLocked("dst mkdir", err)
return nil, err
}
} else if err != nil {
s.earlyCloseLocked("dst stat dir", err)
return nil, err
} else if !info.IsDir() {
err = fmt.Errorf("%q: not a directory", dir)
s.earlyCloseLocked("dst mkdir", err)
return nil, err
} else if info.Mode()&04 == 0 {
err := os.Chmod(dir, 0755)
if err == nil {
defer func() {
err := os.Chmod(dir, info.Mode().Perm())
if err != nil {
panic(err)
}
}()
}
}
// Attempt to create the temp file
fd, err := os.OpenFile(s.tempName, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0644)
if err != nil {
s.earlyCloseLocked("dst create", err)
return nil, err
}
// Same fd will be used by all writers
s.fd = fd
return fd, nil
}
// sourceFile opens the existing source file for reading
func (s *sharedPullerState) sourceFile() (*os.File, error) {
s.mut.Lock()
defer s.mut.Unlock()
// If we've already hit an error, return early
if s.err != nil {
return nil, s.err
}
// Attempt to open the existing file
fd, err := os.Open(s.realName)
if err != nil {
s.earlyCloseLocked("src open", err)
return nil, err
}
return fd, nil
}
// earlyClose prints a warning message composed of the context and
// error, and marks the sharedPullerState as failed. Is a no-op when called on
// an already failed state.
func (s *sharedPullerState) earlyClose(context string, err error) {
s.mut.Lock()
defer s.mut.Unlock()
s.earlyCloseLocked(context, err)
}
func (s *sharedPullerState) earlyCloseLocked(context string, err error) {
if s.err != nil {
return
}
l.Infof("Puller (repo %q, file %q): %s: %v", s.repo, s.file.Name, context, err)
s.err = err
if s.fd != nil {
s.fd.Close()
os.Remove(s.tempName)
}
s.closed = true
}
func (s *sharedPullerState) failed() error {
s.mut.Lock()
defer s.mut.Unlock()
return s.err
}
func (s *sharedPullerState) copyDone() {
s.mut.Lock()
s.copyNeeded--
if debug {
l.Debugln("sharedPullerState", s.repo, s.file.Name, "copyNeeded ->", s.pullNeeded)
}
s.mut.Unlock()
}
func (s *sharedPullerState) pullDone() {
s.mut.Lock()
s.pullNeeded--
if debug {
l.Debugln("sharedPullerState", s.repo, s.file.Name, "pullNeeded ->", s.pullNeeded)
}
s.mut.Unlock()
}
// finalClose atomically closes and returns closed status of a file. A true
// first return value means the file was closed and should be finished, with
// the error indicating the success or failure of the close. A false first
// return value indicates the file is not ready to be closed, or is already
// closed and should in either case not be finished off now.
func (s *sharedPullerState) finalClose() (bool, error) {
s.mut.Lock()
defer s.mut.Unlock()
if s.pullNeeded+s.copyNeeded != 0 {
// Not done yet.
return false, nil
}
if s.closed {
// Already handled.
return false, nil
}
s.closed = true
if fd := s.fd; fd != nil {
s.fd = nil
return true, fd.Close()
}
return true, nil
}

View File

@@ -0,0 +1,52 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import "testing"
func TestSourceFileOK(t *testing.T) {
s := sharedPullerState{
realName: "testdata/foo",
}
fd, err := s.sourceFile()
if err != nil {
t.Fatal(err)
}
if fd == nil {
t.Fatal("Unexpected nil fd")
}
bs := make([]byte, 6)
n, err := fd.Read(bs)
if n != len(bs) {
t.Fatal("Wrong read length %d != %d", n, len(bs))
}
if string(bs) != "foobar" {
t.Fatal("Wrong contents %s != foobar", bs)
}
if err := s.failed(); err != nil {
t.Fatal(err)
}
}
func TestSourceFileBad(t *testing.T) {
s := sharedPullerState{
realName: "nonexistent",
}
fd, err := s.sourceFile()
if err == nil {
t.Fatal("Unexpected nil error")
}
if fd != nil {
t.Fatal("Unexpected non-nil fd")
}
if err := s.failed(); err == nil {
t.Fatal("Unexpected nil failed()")
}
}