From c9cce9613e12f2305234deb19342a3aa8ff95817 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 6 Jan 2014 11:11:18 +0100 Subject: [PATCH] Refactor out the model into a subpackage --- build.sh | 1 + gui.go | 32 ++- main.go | 56 ++--- blocks.go => model/blocks.go | 14 +- blocks_test.go => model/blocks_test.go | 4 +- model.go => model/model.go | 304 ++++++++++++----------- model_puller.go => model/model_puller.go | 30 ++- model_test.go => model/model_test.go | 32 +-- {testdata => model/testdata}/bar | 0 {testdata => model/testdata}/baz/quux | 0 {testdata => model/testdata}/foo | 0 walk.go => model/walk.go | 63 ++--- walk_test.go => model/walk_test.go | 6 +- 13 files changed, 279 insertions(+), 263 deletions(-) rename blocks.go => model/blocks.go (73%) rename blocks_test.go => model/blocks_test.go (98%) rename model.go => model/model.go (71%) rename model_puller.go => model/model_puller.go (89%) rename model_test.go => model/model_test.go (95%) rename {testdata => model/testdata}/bar (100%) rename {testdata => model/testdata}/baz/quux (100%) rename {testdata => model/testdata}/foo (100%) rename walk.go => model/walk.go (61%) rename walk_test.go => model/walk_test.go (94%) diff --git a/build.sh b/build.sh index ed3e618e..bf7c3836 100755 --- a/build.sh +++ b/build.sh @@ -4,6 +4,7 @@ version=$(git describe --always) buildDir=dist if [[ -z $1 ]] ; then + go test ./... || exit 1 go build -ldflags "-X main.Version $version" \ && nrsc syncthing gui else diff --git a/gui.go b/gui.go index 77cb8c82..1ea75b1f 100644 --- a/gui.go +++ b/gui.go @@ -8,12 +8,13 @@ import ( "mime" "net/http" "path/filepath" - "bitbucket.org/tebeka/nrsc" + "bitbucket.org/tebeka/nrsc" + "github.com/calmh/syncthing/model" "github.com/codegangsta/martini" ) -func startGUI(addr string, m *Model) { +func startGUI(addr string, m *model.Model) { router := martini.NewRouter() router.Get("/", getRoot) router.Get("/rest/version", restGetVersion) @@ -40,7 +41,7 @@ func restGetVersion() string { return Version } -func restGetModel(m *Model, w http.ResponseWriter) { +func restGetModel(m *model.Model, w http.ResponseWriter) { var res = make(map[string]interface{}) globalFiles, globalDeleted, globalBytes := m.GlobalSize() @@ -59,7 +60,7 @@ func restGetModel(m *Model, w http.ResponseWriter) { json.NewEncoder(w).Encode(res) } -func restGetConnections(m *Model, w http.ResponseWriter) { +func restGetConnections(m *model.Model, w http.ResponseWriter) { var res = m.ConnectionStats() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(res) @@ -73,14 +74,27 @@ func restGetConfig(w http.ResponseWriter) { json.NewEncoder(w).Encode(res) } -func restGetNeed(m *Model, w http.ResponseWriter) { +type guiFile model.File + +func (f guiFile) MarshalJSON() ([]byte, error) { + type t struct { + Name string + Size int + } + return json.Marshal(t{ + Name: f.Name, + Size: model.File(f).Size(), + }) +} + +func restGetNeed(m *model.Model, w http.ResponseWriter) { files, _ := m.NeedFiles() - if files == nil { - // We don't want the empty list to serialize as "null\n" - files = make([]FileInfo, 0) + gfs := make([]guiFile, len(files)) + for i, f := range files { + gfs[i] = guiFile(f) } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(files) + json.NewEncoder(w).Encode(gfs) } func nrscStatic(path string) interface{} { diff --git a/main.go b/main.go index ee429e4f..284efc4c 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,7 @@ package main import ( "compress/gzip" - "crypto/sha1" "crypto/tls" - "fmt" "log" "net" "net/http" @@ -18,6 +16,7 @@ import ( "github.com/calmh/ini" "github.com/calmh/syncthing/discover" flags "github.com/calmh/syncthing/github.com/jessevdk/go-flags" + "github.com/calmh/syncthing/model" "github.com/calmh/syncthing/protocol" ) @@ -36,12 +35,10 @@ type Options struct { } type DebugOptions struct { - LogSource bool `long:"log-source"` - TraceFile bool `long:"trace-file"` - TraceNet bool `long:"trace-net"` - TraceIdx bool `long:"trace-idx"` - TraceNeed bool `long:"trace-need"` - Profiler string `long:"profiler" value-name:"ADDR"` + LogSource bool `long:"log-source"` + TraceModel []string `long:"trace-model" value-name:"TRACE" description:"idx, net, file, need"` + TraceConnect bool `long:"trace-connect"` + Profiler string `long:"profiler" value-name:"ADDR"` } type DiscoveryOptions struct { @@ -76,7 +73,7 @@ func main() { if err != nil { os.Exit(0) } - if opts.Debug.TraceFile || opts.Debug.TraceIdx || opts.Debug.TraceNet || opts.Debug.LogSource { + if len(opts.Debug.TraceModel) > 0 || opts.Debug.LogSource { logger = log.New(os.Stderr, "", log.Lshortfile|log.Ldate|log.Ltime|log.Lmicroseconds) } if strings.HasPrefix(opts.ConfDir, "~/") { @@ -139,7 +136,10 @@ func main() { } ensureDir(dir, -1) - m := NewModel(dir) + m := model.NewModel(dir) + for _, t := range opts.Debug.TraceModel { + m.Trace(t) + } // GUI if opts.GUIAddr != "" { @@ -167,10 +167,8 @@ func main() { // Routine to pull blocks from other nodes to synchronize the local // repository. Does not run when we are in read only (publish only) mode. if !opts.ReadOnly { - infoln("Cleaning out incomplete synchronizations") - CleanTempFiles(dir) okln("Ready to synchronize") - m.Start() + m.StartRW(opts.Delete, opts.Advanced.FilesInFlight, opts.Advanced.RequestsInFlight) } // Periodically scan the repository and update the local model. @@ -190,9 +188,9 @@ func main() { select {} } -func printStatsLoop(m *Model) { +func printStatsLoop(m *model.Model) { var lastUpdated int64 - var lastStats = make(map[string]ConnectionInfo) + var lastStats = make(map[string]model.ConnectionInfo) for { time.Sleep(60 * time.Second) @@ -216,12 +214,12 @@ func printStatsLoop(m *Model) { files, _, bytes = m.LocalSize() infof("%6d files, %9sB in local repo", files, BinaryPrefix(bytes)) needFiles, bytes := m.NeedFiles() - infof("%6d files, %9sB in to synchronize", len(needFiles), BinaryPrefix(bytes)) + infof("%6d files, %9sB to synchronize", len(needFiles), BinaryPrefix(bytes)) } } } -func listen(myID string, addr string, m *Model, cfg *tls.Config) { +func listen(myID string, addr string, m *model.Model, cfg *tls.Config) { l, err := tls.Listen("tcp", addr, cfg) fatalErr(err) @@ -233,7 +231,7 @@ listen: continue } - if opts.Debug.TraceNet { + if opts.Debug.TraceConnect { debugln("NET: Connect from", conn.RemoteAddr()) } @@ -267,7 +265,7 @@ listen: } } -func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, cfg *tls.Config) { +func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.Model, cfg *tls.Config) { _, portstr, err := net.SplitHostPort(addr) fatalErr(err) port, _ := strconv.Atoi(portstr) @@ -310,12 +308,12 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, } } - if opts.Debug.TraceNet { + if opts.Debug.TraceConnect { debugln("NET: Dial", nodeID, addr) } conn, err := tls.Dial("tcp", addr, cfg) if err != nil { - if opts.Debug.TraceNet { + if opts.Debug.TraceConnect { debugln("NET:", err) } continue @@ -337,14 +335,14 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, } } -func updateLocalModel(m *Model) { - files := Walk(m.Dir(), m, !opts.NoSymlinks) +func updateLocalModel(m *model.Model) { + files := m.Walk(!opts.NoSymlinks) m.ReplaceLocal(files) saveIndex(m) } -func saveIndex(m *Model) { - name := fmt.Sprintf("%x.idx.gz", sha1.Sum([]byte(m.Dir()))) +func saveIndex(m *model.Model) { + name := m.RepoID() + ".idx.gz" fullName := path.Join(opts.ConfDir, name) idxf, err := os.Create(fullName + ".tmp") if err != nil { @@ -359,9 +357,9 @@ func saveIndex(m *Model) { os.Rename(fullName+".tmp", fullName) } -func loadIndex(m *Model) { - fname := fmt.Sprintf("%x.idx.gz", sha1.Sum([]byte(m.Dir()))) - idxf, err := os.Open(path.Join(opts.ConfDir, fname)) +func loadIndex(m *model.Model) { + name := m.RepoID() + ".idx.gz" + idxf, err := os.Open(path.Join(opts.ConfDir, name)) if err != nil { return } @@ -377,7 +375,7 @@ func loadIndex(m *Model) { if err != nil { return } - m.SeedIndex(idx) + m.SeedLocal(idx) } func ensureDir(dir string, mode int) { diff --git a/blocks.go b/model/blocks.go similarity index 73% rename from blocks.go rename to model/blocks.go index 46504d2c..ecc0bb16 100644 --- a/blocks.go +++ b/model/blocks.go @@ -1,4 +1,4 @@ -package main +package model import ( "bytes" @@ -6,8 +6,6 @@ import ( "io" ) -type BlockList []Block - type Block struct { Offset uint64 Length uint32 @@ -15,8 +13,8 @@ type Block struct { } // Blocks returns the blockwise hash of the reader. -func Blocks(r io.Reader, blocksize int) (BlockList, error) { - var blocks BlockList +func Blocks(r io.Reader, blocksize int) ([]Block, error) { + var blocks []Block var offset uint64 for { lr := &io.LimitedReader{r, int64(blocksize)} @@ -42,9 +40,9 @@ func Blocks(r io.Reader, blocksize int) (BlockList, error) { return blocks, nil } -// To returns the list of blocks necessary to transform src into dst. -// Both block lists must have been created with the same block size. -func (src BlockList) To(tgt BlockList) (have, need BlockList) { +// BlockDiff returns lists of common and missing (to transform src into tgt) +// blocks. Both block lists must have been created with the same block size. +func BlockDiff(src, tgt []Block) (have, need []Block) { if len(tgt) == 0 && len(src) != 0 { return nil, nil } diff --git a/blocks_test.go b/model/blocks_test.go similarity index 98% rename from blocks_test.go rename to model/blocks_test.go index 256db278..cc56549c 100644 --- a/blocks_test.go +++ b/model/blocks_test.go @@ -1,4 +1,4 @@ -package main +package model import ( "bytes" @@ -98,7 +98,7 @@ func TestDiff(t *testing.T) { for i, test := range diffTestData { a, _ := Blocks(bytes.NewBufferString(test.a), test.s) b, _ := Blocks(bytes.NewBufferString(test.b), test.s) - _, d := a.To(b) + _, d := BlockDiff(a, b) if len(d) != len(test.d) { t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d)) } else { diff --git a/model.go b/model/model.go similarity index 71% rename from model.go rename to model/model.go index be4a0980..5aed28eb 100644 --- a/model.go +++ b/model/model.go @@ -1,4 +1,4 @@ -package main +package model /* @@ -12,8 +12,10 @@ acquire locks, but document what locks they require. */ import ( + "crypto/sha1" "fmt" "io" + "log" "net" "os" "path" @@ -40,6 +42,13 @@ type Model struct { lastIdxBcast time.Time lastIdxBcastRequest time.Time + + rwRunning bool + parallellFiles int + paralllelReqs int + delete bool + + trace map[string]bool } const ( @@ -49,6 +58,9 @@ const ( idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long ) +// NewModel creates and starts a new model. The model starts in read-only mode, +// where it sends index information to connected peers and responds to requests +// for file data without altering the local repository in any way. func NewModel(dir string) *Model { m := &Model{ dir: dir, @@ -59,16 +71,42 @@ func NewModel(dir string) *Model { nodes: make(map[string]*protocol.Connection), rawConn: make(map[string]io.ReadWriteCloser), lastIdxBcast: time.Now(), + trace: make(map[string]bool), } go m.broadcastIndexLoop() return m } -func (m *Model) Start() { +// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace. +func (m *Model) Trace(t string) { + m.Lock() + defer m.Unlock() + m.trace[t] = true +} + +// StartRW starts read/write processing on the current model. When in +// read/write mode the model will attempt to keep in sync with the cluster by +// pulling needed files from peer nodes. +func (m *Model) StartRW(del bool, pfiles, preqs int) { + m.Lock() + defer m.Unlock() + + if m.rwRunning { + panic("starting started model") + } + + m.rwRunning = true + m.delete = del + m.parallellFiles = pfiles + m.paralllelReqs = preqs + + go m.cleanTempFiles() go m.puller() } +// Generation returns an opaque integer that is guaranteed to increment on +// every change to the local repository or global model. func (m *Model) Generation() int64 { m.RLock() defer m.RUnlock() @@ -81,6 +119,7 @@ type ConnectionInfo struct { Address string } +// ConnectionStats returns a map with connection statistics for each connected node. func (m *Model) ConnectionStats() map[string]ConnectionInfo { type remoteAddrer interface { RemoteAddr() net.Addr @@ -102,6 +141,8 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { return res } +// LocalSize returns the number of files, deleted files and total bytes for all +// files in the global model. func (m *Model) GlobalSize() (files, deleted, bytes int) { m.RLock() defer m.RUnlock() @@ -117,6 +158,8 @@ func (m *Model) GlobalSize() (files, deleted, bytes int) { return } +// LocalSize returns the number of files, deleted files and total bytes for all +// files in the local repository. func (m *Model) LocalSize() (files, deleted, bytes int) { m.RLock() defer m.RUnlock() @@ -132,6 +175,8 @@ func (m *Model) LocalSize() (files, deleted, bytes int) { return } +// InSyncSize returns the number and total byte size of the local files that +// are in sync with the global model. func (m *Model) InSyncSize() (files, bytes int) { m.RLock() defer m.RUnlock() @@ -145,31 +190,27 @@ func (m *Model) InSyncSize() (files, bytes int) { return } -type FileInfo struct { - Name string - Size int -} - -func (m *Model) NeedFiles() (files []FileInfo, bytes int) { +// NeedFiles returns the list of currently needed files and the total size. +func (m *Model) NeedFiles() (files []File, bytes int) { m.RLock() defer m.RUnlock() for n := range m.need { f := m.global[n] - s := f.Size() - files = append(files, FileInfo{f.Name, s}) - bytes += s + files = append(files, f) + bytes += f.Size() } return } // Index is called when a new node is connected and we receive their full index. +// Implements the protocol.Model interface. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { m.Lock() defer m.Unlock() - if opts.Debug.TraceNet { - debugf("NET IDX(in): %s: %d files", nodeID, len(fs)) + if m.trace["net"] { + log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs)) } m.remote[nodeID] = make(map[string]File) @@ -182,12 +223,13 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { } // IndexUpdate is called for incremental updates to connected nodes' indexes. +// Implements the protocol.Model interface. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { m.Lock() defer m.Unlock() - if opts.Debug.TraceNet { - debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs)) + if m.trace["net"] { + log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs)) } repo, ok := m.remote[nodeID] @@ -196,7 +238,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { } for _, f := range fs { - if f.Flags&FlagDeleted != 0 && !opts.Delete { + if f.Flags&FlagDeleted != 0 && !m.delete { // Files marked as deleted do not even enter the model continue } @@ -207,20 +249,8 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { m.recomputeNeed() } -// SeedIndex is called when our previously cached index is loaded from disk at startup. -func (m *Model) SeedIndex(fs []protocol.FileInfo) { - m.Lock() - defer m.Unlock() - - m.local = make(map[string]File) - for _, f := range fs { - m.local[f.Name] = fileFromFileInfo(f) - } - - m.recomputeGlobal() - m.recomputeNeed() -} - +// Close removes the peer from the model and closes the underlyign connection if possible. +// Implements the protocol.Model interface. func (m *Model) Close(node string, err error) { m.Lock() defer m.Unlock() @@ -228,14 +258,6 @@ func (m *Model) Close(node string, err error) { conn, ok := m.rawConn[node] if ok { conn.Close() - } else { - warnln("Close on unknown connection for node", node) - } - - if err != nil { - warnf("Disconnected from node %s: %v", node, err) - } else { - infoln("Disconnected from node", node) } delete(m.remote, node) @@ -246,9 +268,11 @@ func (m *Model) Close(node string, err error) { m.recomputeNeed() } +// Request returns the specified data segment by reading it from local disk. +// Implements the protocol.Model interface. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { - if opts.Debug.TraceNet && nodeID != "" { - debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + if m.trace["net"] && nodeID != "" { + log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) } fn := path.Join(m.dir, name) fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? @@ -266,21 +290,7 @@ func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash [] return buf, nil } -func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { - m.RLock() - nc, ok := m.nodes[nodeID] - m.RUnlock() - if !ok { - return nil, fmt.Errorf("RequestGlobal: no such node: %s", nodeID) - } - - if opts.Debug.TraceNet { - debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) - } - - return nc.Request(name, offset, size, hash) -} - +// ReplaceLocal replaces the local repository index with the given list of files. func (m *Model) ReplaceLocal(fs []File) { m.Lock() defer m.Unlock() @@ -312,6 +322,95 @@ func (m *Model) ReplaceLocal(fs []File) { } } +// SeedLocal replaces the local repository index with the given list of files, +// in protocol data types. Does not track deletes, should only be used to seed +// the local index from a cache file at startup. +func (m *Model) SeedLocal(fs []protocol.FileInfo) { + m.Lock() + defer m.Unlock() + + m.local = make(map[string]File) + for _, f := range fs { + m.local[f.Name] = fileFromFileInfo(f) + } + + m.recomputeGlobal() + m.recomputeNeed() +} + +// ConnectedTo returns true if we are connected to the named node. +func (m *Model) ConnectedTo(nodeID string) bool { + m.RLock() + defer m.RUnlock() + _, ok := m.nodes[nodeID] + return ok +} + +// ProtocolIndex returns the current local index in protocol data types. +func (m *Model) ProtocolIndex() []protocol.FileInfo { + m.RLock() + defer m.RUnlock() + return m.protocolIndex() +} + +// RepoID returns a unique ID representing the current repository location. +func (m *Model) RepoID() string { + return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir))) +} + +// AddConnection adds a new peer connection to the model. An initial index will +// be sent to the connected peer, thereafter index updates whenever the local +// repository changes. +func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) { + node := protocol.NewConnection(nodeID, conn, conn, m) + + m.Lock() + m.nodes[nodeID] = node + m.rawConn[nodeID] = conn + m.Unlock() + + m.RLock() + idx := m.protocolIndex() + m.RUnlock() + + go func() { + node.Index(idx) + }() +} + +// protocolIndex returns the current local index in protocol data types. +// Must be called with the read lock held. +func (m *Model) protocolIndex() []protocol.FileInfo { + var index []protocol.FileInfo + for _, f := range m.local { + mf := fileInfoFromFile(f) + if m.trace["idx"] { + var flagComment string + if mf.Flags&FlagDeleted != 0 { + flagComment = " (deleted)" + } + log.Printf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks)) + } + index = append(index, mf) + } + return index +} + +func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { + m.RLock() + nc, ok := m.nodes[nodeID] + m.RUnlock() + if !ok { + return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID) + } + + if m.trace["net"] { + log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + } + + return nc.Request(name, offset, size, hash) +} + func (m *Model) broadcastIndexLoop() { for { m.RLock() @@ -328,8 +427,8 @@ func (m *Model) broadcastIndexLoop() { m.lastIdxBcast = time.Now() for _, node := range m.nodes { node := node - if opts.Debug.TraceNet { - debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx)) + if m.trace["net"] { + log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx)) } go func() { node.Index(idx) @@ -367,10 +466,7 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool { return updated } -func (m *Model) UpdateLocal(f File) { - m.Lock() - defer m.Unlock() - +func (m *Model) updateLocal(f File) { if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified { m.local[f.Name] = f m.recomputeGlobal() @@ -380,36 +476,6 @@ func (m *Model) UpdateLocal(f File) { } } -func (m *Model) Dir() string { - m.RLock() - defer m.RUnlock() - return m.dir -} - -func (m *Model) HaveFiles() []File { - m.RLock() - defer m.RUnlock() - var files []File - for _, file := range m.local { - files = append(files, file) - } - return files -} - -func (m *Model) LocalFile(name string) (File, bool) { - m.RLock() - defer m.RUnlock() - f, ok := m.local[name] - return f, ok -} - -func (m *Model) GlobalFile(name string) (File, bool) { - m.RLock() - defer m.RUnlock() - f, ok := m.global[name] - return f, ok -} - // Must be called with the write lock held. func (m *Model) recomputeGlobal() { var newGlobal = make(map[string]File) @@ -452,7 +518,7 @@ func (m *Model) recomputeNeed() { for n, f := range m.global { hf, ok := m.local[n] if !ok || f.Modified > hf.Modified { - if f.Flags&FlagDeleted != 0 && !opts.Delete { + if f.Flags&FlagDeleted != 0 && !m.delete { // Don't want to delete files, so forget this need continue } @@ -460,8 +526,8 @@ func (m *Model) recomputeNeed() { // Don't have the file, so don't need to delete it continue } - if opts.Debug.TraceNeed { - debugln("NEED:", ok, hf, f) + if m.trace["need"] { + log.Println("NEED:", ok, hf, f) } m.need[n] = true } @@ -482,56 +548,6 @@ func (m *Model) whoHas(name string) []string { return remote } -func (m *Model) ConnectedTo(nodeID string) bool { - m.RLock() - defer m.RUnlock() - _, ok := m.nodes[nodeID] - return ok -} - -func (m *Model) ProtocolIndex() []protocol.FileInfo { - m.RLock() - defer m.RUnlock() - return m.protocolIndex() -} - -// Must be called with the read lock held. -func (m *Model) protocolIndex() []protocol.FileInfo { - var index []protocol.FileInfo - for _, f := range m.local { - mf := fileInfoFromFile(f) - if opts.Debug.TraceIdx { - var flagComment string - if mf.Flags&FlagDeleted != 0 { - flagComment = " (deleted)" - } - debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks)) - } - index = append(index, mf) - } - return index -} - -func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) { - node := protocol.NewConnection(nodeID, conn, conn, m) - - m.Lock() - m.nodes[nodeID] = node - m.rawConn[nodeID] = conn - m.Unlock() - - infoln("Connected to node", nodeID) - - m.RLock() - idx := m.protocolIndex() - m.RUnlock() - - go func() { - node.Index(idx) - infoln("Sent initial index to node", nodeID) - }() -} - func fileFromFileInfo(f protocol.FileInfo) File { var blocks []Block var offset uint64 diff --git a/model_puller.go b/model/model_puller.go similarity index 89% rename from model_puller.go rename to model/model_puller.go index 6d5cb0a6..b3a7dd9b 100644 --- a/model_puller.go +++ b/model/model_puller.go @@ -1,4 +1,4 @@ -package main +package model /* @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "log" "os" "path" "sync" @@ -60,7 +61,7 @@ func (m *Model) pullFile(name string) error { applyDone.Done() }() - local, remote := localFile.Blocks.To(globalFile.Blocks) + local, remote := BlockDiff(localFile.Blocks, globalFile.Blocks) var fetchDone sync.WaitGroup // One local copy routine @@ -83,7 +84,7 @@ func (m *Model) pullFile(name string) error { // N remote copy routines var remoteBlocks = blockIterator{blocks: remote} - for i := 0; i < opts.Advanced.RequestsInFlight; i++ { + for i := 0; i < m.paralllelReqs; i++ { curNode := nodeIDs[i%len(nodeIDs)] fetchDone.Add(1) @@ -93,7 +94,7 @@ func (m *Model) pullFile(name string) error { if !ok { break } - data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) + data, err := m.requestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) if err != nil { break } @@ -143,7 +144,7 @@ func (m *Model) puller() { continue } - var limiter = make(chan bool, opts.Advanced.FilesInFlight) + var limiter = make(chan bool, m.parallellFiles) var allDone sync.WaitGroup for _, n := range ns { @@ -156,28 +157,31 @@ func (m *Model) puller() { <-limiter }() - f, ok := m.GlobalFile(n) + m.RLock() + f, ok := m.global[n] + m.RUnlock() + if !ok { return } var err error if f.Flags&FlagDeleted == 0 { - if opts.Debug.TraceFile { - debugf("FILE: Pull %q", n) + if m.trace["file"] { + log.Printf("FILE: Pull %q", n) } err = m.pullFile(n) } else { - if opts.Debug.TraceFile { - debugf("FILE: Remove %q", n) + if m.trace["file"] { + log.Printf("FILE: Remove %q", n) } // Cheerfully ignore errors here _ = os.Remove(path.Join(m.dir, n)) } if err == nil { - m.UpdateLocal(f) - } else { - warnln(err) + m.Lock() + m.updateLocal(f) + m.Unlock() } }(n) } diff --git a/model_test.go b/model/model_test.go similarity index 95% rename from model_test.go rename to model/model_test.go index 31b6a750..45e2bc5e 100644 --- a/model_test.go +++ b/model/model_test.go @@ -1,4 +1,4 @@ -package main +package model import ( "reflect" @@ -46,8 +46,8 @@ var testDataExpected = map[string]File{ } func TestUpdateLocal(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) if len(m.need) > 0 { @@ -88,8 +88,8 @@ func TestUpdateLocal(t *testing.T) { } func TestRemoteUpdateExisting(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) newFile := protocol.FileInfo{ @@ -105,8 +105,8 @@ func TestRemoteUpdateExisting(t *testing.T) { } func TestRemoteAddNew(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) newFile := protocol.FileInfo{ @@ -122,8 +122,8 @@ func TestRemoteAddNew(t *testing.T) { } func TestRemoteUpdateOld(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) oldTimeStamp := int64(1234) @@ -140,8 +140,8 @@ func TestRemoteUpdateOld(t *testing.T) { } func TestRemoteIndexUpdate(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) foo := protocol.FileInfo{ @@ -173,8 +173,8 @@ func TestRemoteIndexUpdate(t *testing.T) { } func TestDelete(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) if l1, l2 := len(m.local), len(fs); l1 != l2 { @@ -190,7 +190,7 @@ func TestDelete(t *testing.T) { Modified: ot, Blocks: []Block{{0, 100, []byte("some hash bytes")}}, } - m.UpdateLocal(newFile) + m.updateLocal(newFile) if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) @@ -263,8 +263,8 @@ func TestDelete(t *testing.T) { } func TestForgetNode(t *testing.T) { - m := NewModel("foo") - fs := Walk("testdata", m, false) + m := NewModel("testdata") + fs := m.Walk(false) m.ReplaceLocal(fs) if l1, l2 := len(m.local), len(fs); l1 != l2 { diff --git a/testdata/bar b/model/testdata/bar similarity index 100% rename from testdata/bar rename to model/testdata/bar diff --git a/testdata/baz/quux b/model/testdata/baz/quux similarity index 100% rename from testdata/baz/quux rename to model/testdata/baz/quux diff --git a/testdata/foo b/model/testdata/foo similarity index 100% rename from testdata/foo rename to model/testdata/foo diff --git a/walk.go b/model/walk.go similarity index 61% rename from walk.go rename to model/walk.go index cdd93871..7377c62f 100644 --- a/walk.go +++ b/model/walk.go @@ -1,7 +1,8 @@ -package main +package model import ( "fmt" + "log" "os" "path" "path/filepath" @@ -14,15 +15,7 @@ type File struct { Name string Flags uint32 Modified int64 - Blocks BlockList -} - -func (f File) Dump() { - fmt.Printf("%s\n", f.Name) - for _, b := range f.Blocks { - fmt.Printf(" %dB @ %d: %x\n", b.Length, b.Offset, b.Hash) - } - fmt.Println() + Blocks []Block } func (f File) Size() (bytes int) { @@ -42,10 +35,9 @@ func tempName(name string, modified int64) string { return path.Join(tdir, tname) } -func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc { +func (m *Model) genWalker(res *[]File) filepath.WalkFunc { return func(p string, info os.FileInfo, err error) error { if err != nil { - warnln(err) return nil } @@ -54,37 +46,36 @@ func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc { } if info.Mode()&os.ModeType == 0 { - rn, err := filepath.Rel(base, p) + rn, err := filepath.Rel(m.dir, p) if err != nil { - warnln(err) return nil } fi, err := os.Stat(p) if err != nil { - warnln(err) return nil } modified := fi.ModTime().Unix() - hf, ok := model.LocalFile(rn) + m.RLock() + hf, ok := m.local[rn] + m.RUnlock() + if ok && hf.Modified == modified { // No change *res = append(*res, hf) } else { - if opts.Debug.TraceFile { - debugf("FILE: Hash %q", p) + if m.trace["file"] { + log.Printf("FILE: Hash %q", p) } fd, err := os.Open(p) if err != nil { - warnln(err) return nil } defer fd.Close() blocks, err := Blocks(fd, BlockSize) if err != nil { - warnln(err) return nil } f := File{ @@ -101,34 +92,28 @@ func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc { } } -func Walk(dir string, model *Model, followSymlinks bool) []File { +// Walk returns the list of files found in the local repository by scanning the +// file system. Files are blockwise hashed. +func (m *Model) Walk(followSymlinks bool) []File { var files []File - fn := genWalker(dir, &files, model) - err := filepath.Walk(dir, fn) - if err != nil { - warnln(err) - } + fn := m.genWalker(&files) + filepath.Walk(m.dir, fn) - if !opts.NoSymlinks { - d, err := os.Open(dir) + if followSymlinks { + d, err := os.Open(m.dir) if err != nil { - warnln(err) return files } defer d.Close() fis, err := d.Readdir(-1) if err != nil { - warnln(err) return files } for _, fi := range fis { if fi.Mode()&os.ModeSymlink != 0 { - err := filepath.Walk(path.Join(dir, fi.Name())+"/", fn) - if err != nil { - warnln(err) - } + filepath.Walk(path.Join(m.dir, fi.Name())+"/", fn) } } } @@ -136,19 +121,19 @@ func Walk(dir string, model *Model, followSymlinks bool) []File { return files } -func cleanTempFile(path string, info os.FileInfo, err error) error { +func (m *Model) cleanTempFile(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.Mode()&os.ModeType == 0 && isTempName(path) { - if opts.Debug.TraceFile { - debugf("FILE: Remove %q", path) + if m.trace["file"] { + log.Printf("FILE: Remove %q", path) } os.Remove(path) } return nil } -func CleanTempFiles(dir string) { - filepath.Walk(dir, cleanTempFile) +func (m *Model) cleanTempFiles() { + filepath.Walk(m.dir, m.cleanTempFile) } diff --git a/walk_test.go b/model/walk_test.go similarity index 94% rename from walk_test.go rename to model/walk_test.go index e88ab085..b48bd432 100644 --- a/walk_test.go +++ b/model/walk_test.go @@ -1,4 +1,4 @@ -package main +package model import ( "fmt" @@ -17,8 +17,8 @@ var testdata = []struct { } func TestWalk(t *testing.T) { - m := new(Model) - files := Walk("testdata", m, false) + m := NewModel("testdata") + files := m.Walk(false) if l1, l2 := len(files), len(testdata); l1 != l2 { t.Fatalf("Incorrect number of walked files %d != %d", l1, l2)