Improve puller somewhat

This commit is contained in:
Jakob Borg 2013-12-29 12:18:59 -05:00
parent c70fef1208
commit 704e0fa6b8
2 changed files with 104 additions and 71 deletions

16
main.go
View File

@ -24,11 +24,10 @@ type Options struct {
ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"` ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"`
Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"` Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"`
ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"` ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"`
Delete bool `short:"d" long:"delete" description:"Delete files from repo when deleted from cluster"` Delete bool `short:"d" long:"delete" description:"Delete files deleted from cluster"`
NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"` NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"`
ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"`
ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"`
Discovery DiscoveryOptions `group:"Discovery Options"` Discovery DiscoveryOptions `group:"Discovery Options"`
Advanced AdvancedOptions `group:"Advanced Options"`
Debug DebugOptions `group:"Debugging Options"` Debug DebugOptions `group:"Debugging Options"`
} }
@ -46,6 +45,13 @@ type DiscoveryOptions struct {
NoLocalDiscovery bool `short:"N" long:"no-local-announce" description:"Do not announce presence locally"` NoLocalDiscovery bool `short:"N" long:"no-local-announce" description:"Do not announce presence locally"`
} }
type AdvancedOptions struct {
RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per file" default:"8" value-name:"REQS"`
FilesInFlight int `long:"files-in-flight" description:"Parallell in flight file pulls" default:"4" value-name:"FILES"`
ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"`
ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"`
}
var opts Options var opts Options
var Version string var Version string
@ -162,7 +168,7 @@ func main() {
// XXX: Should use some fsnotify mechanism. // XXX: Should use some fsnotify mechanism.
go func() { go func() {
for { for {
time.Sleep(opts.ScanInterval) time.Sleep(opts.Advanced.ScanInterval)
updateLocalModel(m) updateLocalModel(m)
} }
}() }()
@ -286,7 +292,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
} }
} }
time.Sleep(opts.ConnInterval) time.Sleep(opts.Advanced.ConnInterval)
} }
} }

View File

@ -6,10 +6,10 @@ Locking
======= =======
These methods are never called from the outside so don't follow the locking These methods are never called from the outside so don't follow the locking
policy in model.go. Instead, appropriate locks are acquired when needed and policy in model.go.
held for as short a time as possible.
TODO(jb): Refactor this into smaller and cleaner pieces. TODO(jb): Refactor this into smaller and cleaner pieces.
TODO(jb): Increase performance by taking apparent peer bandwidth into account.
*/ */
@ -25,8 +25,6 @@ import (
"github.com/calmh/syncthing/buffers" "github.com/calmh/syncthing/buffers"
) )
const RemoteFetchers = 8
func (m *Model) pullFile(name string) error { func (m *Model) pullFile(name string) error {
m.RLock() m.RLock()
var localFile = m.local[name] var localFile = m.local[name]
@ -81,22 +79,17 @@ func (m *Model) pullFile(name string) error {
m.RLock() m.RLock()
var nodeIDs = m.whoHas(name) var nodeIDs = m.whoHas(name)
m.RUnlock() m.RUnlock()
var remoteBlocksChan = make(chan Block) var remoteBlocks = blockIterator{blocks: remote}
go func() { for i := 0; i < opts.Advanced.RequestsInFlight; i++ {
for _, block := range remote { curNode := nodeIDs[i%len(nodeIDs)]
remoteBlocksChan <- block
}
close(remoteBlocksChan)
}()
// XXX: This should be rewritten into something nicer that takes differing
// peer performance into account.
for i := 0; i < RemoteFetchers; i++ {
for _, nodeID := range nodeIDs {
fetchDone.Add(1) fetchDone.Add(1)
go func(nodeID string) { go func(nodeID string) {
for block := range remoteBlocksChan { for {
block, ok := remoteBlocks.Next()
if !ok {
break
}
data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash)
if err != nil { if err != nil {
break break
@ -107,32 +100,17 @@ func (m *Model) pullFile(name string) error {
} }
} }
fetchDone.Done() fetchDone.Done()
}(nodeID) }(curNode)
}
} }
fetchDone.Wait() fetchDone.Wait()
close(contentChan) close(contentChan)
applyDone.Wait() applyDone.Wait()
rf, err := os.Open(tmpFilename) err = hashCheck(tmpFilename, globalFile.Blocks)
if err != nil { if err != nil {
return err return err
} }
defer rf.Close()
writtenBlocks, err := Blocks(rf, BlockSize)
if err != nil {
return err
}
if len(writtenBlocks) != len(globalFile.Blocks) {
return fmt.Errorf("%s: incorrect number of blocks after sync", tmpFilename)
}
for i := range writtenBlocks {
if bytes.Compare(writtenBlocks[i].Hash, globalFile.Blocks[i].Hash) != 0 {
return fmt.Errorf("%s: hash mismatch after sync\n %v\n %v", tmpFilename, writtenBlocks[i], globalFile.Blocks[i])
}
}
err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0)) err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0))
if err != nil { if err != nil {
@ -148,23 +126,29 @@ func (m *Model) pullFile(name string) error {
} }
func (m *Model) puller() { func (m *Model) puller() {
for {
for {
var n string
var f File
for {
time.Sleep(time.Second)
var ns []string
m.RLock() m.RLock()
for n = range m.need { for n := range m.need {
break // just pick first name ns = append(ns, n)
}
if len(n) != 0 {
f = m.global[n]
} }
m.RUnlock() m.RUnlock()
if len(n) == 0 { if len(ns) == 0 {
// we got nothing continue
break }
var limiter = make(chan bool, opts.Advanced.FilesInFlight)
for _, n := range ns {
limiter <- true
f, ok := m.GlobalFile(n)
if !ok {
continue
} }
var err error var err error
@ -185,8 +169,9 @@ func (m *Model) puller() {
} else { } else {
warnln(err) warnln(err)
} }
<-limiter
} }
time.Sleep(time.Second)
} }
} }
@ -208,3 +193,45 @@ func applyContent(cc <-chan content, dst io.WriterAt) error {
return nil return nil
} }
func hashCheck(name string, correct []Block) error {
rf, err := os.Open(name)
if err != nil {
return err
}
defer rf.Close()
current, err := Blocks(rf, BlockSize)
if err != nil {
return err
}
if len(current) != len(correct) {
return fmt.Errorf("%s: incorrect number of blocks after sync", name)
}
for i := range current {
if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 {
return fmt.Errorf("%s: hash mismatch after sync\n %v\n %v", name, current[i], correct[i])
}
}
return nil
}
type blockIterator struct {
sync.Mutex
blocks []Block
}
func (i *blockIterator) Next() (b Block, ok bool) {
i.Lock()
defer i.Unlock()
if len(i.blocks) == 0 {
return
}
b, i.blocks = i.blocks[0], i.blocks[1:]
ok = true
return
}