diff --git a/cmd/.gitignore b/cmd/.gitignore new file mode 100644 index 00000000..3e1db14f --- /dev/null +++ b/cmd/.gitignore @@ -0,0 +1 @@ +!syncthing diff --git a/cmd/syncthing/blocks.go b/cmd/syncthing/blocks.go new file mode 100644 index 00000000..b0f84cd9 --- /dev/null +++ b/cmd/syncthing/blocks.go @@ -0,0 +1,74 @@ +package main + +import ( + "bytes" + "crypto/sha256" + "io" +) + +type Block struct { + Offset int64 + Size uint32 + Hash []byte +} + +// Blocks returns the blockwise hash of the reader. +func Blocks(r io.Reader, blocksize int) ([]Block, error) { + var blocks []Block + var offset int64 + for { + lr := &io.LimitedReader{R: r, N: int64(blocksize)} + hf := sha256.New() + n, err := io.Copy(hf, lr) + if err != nil { + return nil, err + } + + if n == 0 { + break + } + + b := Block{ + Offset: offset, + Size: uint32(n), + Hash: hf.Sum(nil), + } + blocks = append(blocks, b) + offset += int64(n) + } + + if len(blocks) == 0 { + // Empty file + blocks = append(blocks, Block{ + Offset: 0, + Size: 0, + Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}, + }) + } + + return blocks, nil +} + +// BlockDiff returns lists of common and missing (to transform src into tgt) +// blocks. Both block lists must have been created with the same block size. +func BlockDiff(src, tgt []Block) (have, need []Block) { + if len(tgt) == 0 && len(src) != 0 { + return nil, nil + } + + if len(tgt) != 0 && len(src) == 0 { + // Copy the entire file + return nil, tgt + } + + for i := range tgt { + if i >= len(src) || bytes.Compare(tgt[i].Hash, src[i].Hash) != 0 { + // Copy differing block + need = append(need, tgt[i]) + } else { + have = append(have, tgt[i]) + } + } + + return have, need +} diff --git a/cmd/syncthing/blocks_test.go b/cmd/syncthing/blocks_test.go new file mode 100644 index 00000000..be791493 --- /dev/null +++ b/cmd/syncthing/blocks_test.go @@ -0,0 +1,116 @@ +package main + +import ( + "bytes" + "fmt" + "testing" +) + +var blocksTestData = []struct { + data []byte + blocksize int + hash []string +}{ + {[]byte(""), 1024, []string{ + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"}}, + {[]byte("contents"), 1024, []string{ + "d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}}, + {[]byte("contents"), 9, []string{ + "d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}}, + {[]byte("contents"), 8, []string{ + "d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}}, + {[]byte("contents"), 7, []string{ + "ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73", + "043a718774c572bd8a25adbeb1bfcd5c0256ae11cecf9f9c3f925d0e52beaf89"}, + }, + {[]byte("contents"), 3, []string{ + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3", + "44ad63f60af0f6db6fdde6d5186ef78176367df261fa06be3079b6c80c8adba4"}, + }, + {[]byte("conconts"), 3, []string{ + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "44ad63f60af0f6db6fdde6d5186ef78176367df261fa06be3079b6c80c8adba4"}, + }, + {[]byte("contenten"), 3, []string{ + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3", + "e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3"}, + }, +} + +func TestBlocks(t *testing.T) { + for _, test := range blocksTestData { + buf := bytes.NewBuffer(test.data) + blocks, err := Blocks(buf, test.blocksize) + + if err != nil { + t.Fatal(err) + } + + if l := len(blocks); l != len(test.hash) { + t.Fatalf("Incorrect number of blocks %d != %d", l, len(test.hash)) + } else { + i := 0 + for off := int64(0); off < int64(len(test.data)); off += int64(test.blocksize) { + if blocks[i].Offset != off { + t.Errorf("Incorrect offset for block %d: %d != %d", i, blocks[i].Offset, off) + } + + bs := test.blocksize + if rem := len(test.data) - int(off); bs > rem { + bs = rem + } + if int(blocks[i].Size) != bs { + t.Errorf("Incorrect length for block %d: %d != %d", i, blocks[i].Size, bs) + } + if h := fmt.Sprintf("%x", blocks[i].Hash); h != test.hash[i] { + t.Errorf("Incorrect block hash %q != %q", h, test.hash[i]) + } + + i++ + } + } + } +} + +var diffTestData = []struct { + a string + b string + s int + d []Block +}{ + {"contents", "contents", 1024, []Block{}}, + {"", "", 1024, []Block{}}, + {"contents", "contents", 3, []Block{}}, + {"contents", "cantents", 3, []Block{{0, 3, nil}}}, + {"contents", "contants", 3, []Block{{3, 3, nil}}}, + {"contents", "cantants", 3, []Block{{0, 3, nil}, {3, 3, nil}}}, + {"contents", "", 3, []Block{{0, 0, nil}}}, + {"", "contents", 3, []Block{{0, 3, nil}, {3, 3, nil}, {6, 2, nil}}}, + {"con", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}}, + {"contents", "con", 3, nil}, + {"contents", "cont", 3, []Block{{3, 1, nil}}}, + {"cont", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}}, +} + +func TestDiff(t *testing.T) { + for i, test := range diffTestData { + a, _ := Blocks(bytes.NewBufferString(test.a), test.s) + b, _ := Blocks(bytes.NewBufferString(test.b), test.s) + _, d := BlockDiff(a, b) + if len(d) != len(test.d) { + t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d)) + } else { + for j := range test.d { + if d[j].Offset != test.d[j].Offset { + t.Errorf("Incorrect offset for diff %d block %d; %d != %d", i, j, d[j].Offset, test.d[j].Offset) + } + if d[j].Size != test.d[j].Size { + t.Errorf("Incorrect length for diff %d block %d; %d != %d", i, j, d[j].Size, test.d[j].Size) + } + } + } + } +} diff --git a/cmd/syncthing/config.go b/cmd/syncthing/config.go new file mode 100644 index 00000000..df250600 --- /dev/null +++ b/cmd/syncthing/config.go @@ -0,0 +1,202 @@ +package main + +import ( + "crypto/sha256" + "encoding/xml" + "fmt" + "io" + "reflect" + "sort" + "strconv" + "strings" +) + +type Configuration struct { + Version int `xml:"version,attr" default:"1"` + Repositories []RepositoryConfiguration `xml:"repository"` + Options OptionsConfiguration `xml:"options"` + XMLName xml.Name `xml:"configuration" json:"-"` +} + +type RepositoryConfiguration struct { + Directory string `xml:"directory,attr"` + Nodes []NodeConfiguration `xml:"node"` +} + +type NodeConfiguration struct { + NodeID string `xml:"id,attr"` + Name string `xml:"name,attr"` + Addresses []string `xml:"address"` +} + +type OptionsConfiguration struct { + ListenAddress []string `xml:"listenAddress" default:":22000" ini:"listen-address"` + ReadOnly bool `xml:"readOnly" ini:"read-only"` + AllowDelete bool `xml:"allowDelete" default:"true" ini:"allow-delete"` + FollowSymlinks bool `xml:"followSymlinks" default:"true" ini:"follow-symlinks"` + GUIEnabled bool `xml:"guiEnabled" default:"true" ini:"gui-enabled"` + GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"` + GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025" ini:"global-announce-server"` + GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"` + LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"` + ParallelRequests int `xml:"parallelRequests" default:"16" ini:"parallel-requests"` + MaxSendKbps int `xml:"maxSendKbps" ini:"max-send-kbps"` + RescanIntervalS int `xml:"rescanIntervalS" default:"60" ini:"rescan-interval"` + ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60" ini:"reconnection-interval"` + MaxChangeKbps int `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"` +} + +func setDefaults(data interface{}) error { + s := reflect.ValueOf(data).Elem() + t := s.Type() + + for i := 0; i < s.NumField(); i++ { + f := s.Field(i) + tag := t.Field(i).Tag + + v := tag.Get("default") + if len(v) > 0 { + switch f.Interface().(type) { + case string: + f.SetString(v) + + case []string: + rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1) + rv.Index(0).SetString(v) + f.Set(rv) + + case int: + i, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return err + } + f.SetInt(i) + + case bool: + f.SetBool(v == "true") + + default: + panic(f.Type()) + } + } + } + return nil +} + +func readConfigINI(m map[string]string, data interface{}) error { + s := reflect.ValueOf(data).Elem() + t := s.Type() + + for i := 0; i < s.NumField(); i++ { + f := s.Field(i) + tag := t.Field(i).Tag + + name := tag.Get("ini") + if len(name) == 0 { + name = strings.ToLower(t.Field(i).Name) + } + + if v, ok := m[name]; ok { + switch f.Interface().(type) { + case string: + f.SetString(v) + + case int: + i, err := strconv.ParseInt(v, 10, 64) + if err == nil { + f.SetInt(i) + } + + case bool: + f.SetBool(v == "true") + + default: + panic(f.Type()) + } + } + } + return nil +} + +func writeConfigXML(wr io.Writer, cfg Configuration) error { + e := xml.NewEncoder(wr) + e.Indent("", " ") + err := e.Encode(cfg) + if err != nil { + return err + } + _, err = wr.Write([]byte("\n")) + return err +} + +func uniqueStrings(ss []string) []string { + var m = make(map[string]bool, len(ss)) + for _, s := range ss { + m[s] = true + } + + var us = make([]string, 0, len(m)) + for k := range m { + us = append(us, k) + } + + return us +} + +func readConfigXML(rd io.Reader) (Configuration, error) { + var cfg Configuration + + setDefaults(&cfg) + setDefaults(&cfg.Options) + + var err error + if rd != nil { + err = xml.NewDecoder(rd).Decode(&cfg) + } + + cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress) + return cfg, err +} + +type NodeConfigurationList []NodeConfiguration + +func (l NodeConfigurationList) Less(a, b int) bool { + return l[a].NodeID < l[b].NodeID +} +func (l NodeConfigurationList) Swap(a, b int) { + l[a], l[b] = l[b], l[a] +} +func (l NodeConfigurationList) Len() int { + return len(l) +} + +func clusterHash(nodes []NodeConfiguration) string { + sort.Sort(NodeConfigurationList(nodes)) + h := sha256.New() + for _, n := range nodes { + h.Write([]byte(n.NodeID)) + } + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func cleanNodeList(nodes []NodeConfiguration, myID string) []NodeConfiguration { + var myIDExists bool + for _, node := range nodes { + if node.NodeID == myID { + myIDExists = true + break + } + } + + if !myIDExists { + nodes = append(nodes, NodeConfiguration{ + NodeID: myID, + Addresses: []string{"dynamic"}, + Name: "", + }) + } + + sort.Sort(NodeConfigurationList(nodes)) + + return nodes +} diff --git a/cmd/syncthing/filemonitor.go b/cmd/syncthing/filemonitor.go new file mode 100644 index 00000000..2aa7df8e --- /dev/null +++ b/cmd/syncthing/filemonitor.go @@ -0,0 +1,173 @@ +package main + +import ( + "bytes" + "errors" + "fmt" + "log" + "os" + "path" + "sync" + "time" + + "github.com/calmh/syncthing/buffers" +) + +type fileMonitor struct { + name string // in-repo name + path string // full path + writeDone sync.WaitGroup + model *Model + global File + localBlocks []Block + copyError error + writeError error +} + +func (m *fileMonitor) FileBegins(cc <-chan content) error { + if m.model.trace["file"] { + log.Printf("FILE: FileBegins: " + m.name) + } + + tmp := tempName(m.path, m.global.Modified) + + dir := path.Dir(tmp) + _, err := os.Stat(dir) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(dir, 0777) + if err != nil { + return err + } + } + + outFile, err := os.Create(tmp) + if err != nil { + return err + } + + m.writeDone.Add(1) + + var writeWg sync.WaitGroup + if len(m.localBlocks) > 0 { + writeWg.Add(1) + inFile, err := os.Open(m.path) + if err != nil { + return err + } + + // Copy local blocks, close infile when done + go m.copyLocalBlocks(inFile, outFile, &writeWg) + } + + // Write remote blocks, + writeWg.Add(1) + go m.copyRemoteBlocks(cc, outFile, &writeWg) + + // Wait for both writing routines, then close the outfile + go func() { + writeWg.Wait() + outFile.Close() + m.writeDone.Done() + }() + + return nil +} + +func (m *fileMonitor) copyLocalBlocks(inFile, outFile *os.File, writeWg *sync.WaitGroup) { + defer inFile.Close() + defer writeWg.Done() + + var buf = buffers.Get(BlockSize) + defer buffers.Put(buf) + + for _, lb := range m.localBlocks { + buf = buf[:lb.Size] + _, err := inFile.ReadAt(buf, lb.Offset) + if err != nil { + m.copyError = err + return + } + _, err = outFile.WriteAt(buf, lb.Offset) + if err != nil { + m.copyError = err + return + } + } +} + +func (m *fileMonitor) copyRemoteBlocks(cc <-chan content, outFile *os.File, writeWg *sync.WaitGroup) { + defer writeWg.Done() + + for content := range cc { + _, err := outFile.WriteAt(content.data, content.offset) + buffers.Put(content.data) + if err != nil { + m.writeError = err + return + } + } +} + +func (m *fileMonitor) FileDone() error { + if m.model.trace["file"] { + log.Printf("FILE: FileDone: " + m.name) + } + + m.writeDone.Wait() + + tmp := tempName(m.path, m.global.Modified) + defer os.Remove(tmp) + + if m.copyError != nil { + return m.copyError + } + if m.writeError != nil { + return m.writeError + } + + err := hashCheck(tmp, m.global.Blocks) + if err != nil { + return err + } + + err = os.Chtimes(tmp, time.Unix(m.global.Modified, 0), time.Unix(m.global.Modified, 0)) + if err != nil { + return err + } + + err = os.Chmod(tmp, os.FileMode(m.global.Flags&0777)) + if err != nil { + return err + } + + err = os.Rename(tmp, m.path) + if err != nil { + return err + } + + m.model.updateLocal(m.global) + return nil +} + +func hashCheck(name string, correct []Block) error { + rf, err := os.Open(name) + if err != nil { + return err + } + defer rf.Close() + + current, err := Blocks(rf, BlockSize) + if err != nil { + return err + } + if len(current) != len(correct) { + return errors.New("incorrect number of blocks") + } + for i := range current { + if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 { + return fmt.Errorf("hash mismatch: %x != %x", current[i], correct[i]) + } + } + + return nil +} diff --git a/cmd/syncthing/filequeue.go b/cmd/syncthing/filequeue.go new file mode 100644 index 00000000..94432671 --- /dev/null +++ b/cmd/syncthing/filequeue.go @@ -0,0 +1,239 @@ +package main + +import ( + "log" + "sort" + "sync" + "time" +) + +type Monitor interface { + FileBegins(<-chan content) error + FileDone() error +} + +type FileQueue struct { + files queuedFileList + sorted bool + fmut sync.Mutex // protects files and sorted + availability map[string][]string + amut sync.Mutex // protects availability + queued map[string]bool +} + +type queuedFile struct { + name string + blocks []Block + activeBlocks []bool + given int + remaining int + channel chan content + nodes []string + nodesChecked time.Time + monitor Monitor +} + +type content struct { + offset int64 + data []byte +} + +type queuedFileList []queuedFile + +func (l queuedFileList) Len() int { return len(l) } + +func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] } + +func (l queuedFileList) Less(a, b int) bool { + // Sort by most blocks already given out, then alphabetically + if l[a].given != l[b].given { + return l[a].given > l[b].given + } + return l[a].name < l[b].name +} + +type queuedBlock struct { + name string + block Block + index int +} + +func NewFileQueue() *FileQueue { + return &FileQueue{ + availability: make(map[string][]string), + queued: make(map[string]bool), + } +} + +func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) { + q.fmut.Lock() + defer q.fmut.Unlock() + + if q.queued[name] { + return + } + + q.files = append(q.files, queuedFile{ + name: name, + blocks: blocks, + activeBlocks: make([]bool, len(blocks)), + remaining: len(blocks), + channel: make(chan content), + monitor: monitor, + }) + q.queued[name] = true + q.sorted = false +} + +func (q *FileQueue) Len() int { + q.fmut.Lock() + defer q.fmut.Unlock() + + return len(q.files) +} + +func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { + q.fmut.Lock() + defer q.fmut.Unlock() + + if !q.sorted { + sort.Sort(q.files) + q.sorted = true + } + + for i := range q.files { + qf := &q.files[i] + + q.amut.Lock() + av := q.availability[qf.name] + q.amut.Unlock() + + if len(av) == 0 { + // Noone has the file we want; abort. + if qf.remaining != len(qf.blocks) { + // We have already started on this file; close it down + close(qf.channel) + if mon := qf.monitor; mon != nil { + mon.FileDone() + } + } + delete(q.queued, qf.name) + q.deleteAt(i) + return queuedBlock{}, false + } + + for _, ni := range av { + // Find and return the next block in the queue + if ni == nodeID { + for j, b := range qf.blocks { + if !qf.activeBlocks[j] { + qf.activeBlocks[j] = true + qf.given++ + return queuedBlock{ + name: qf.name, + block: b, + index: j, + }, true + } + } + break + } + } + } + + // We found nothing to do + return queuedBlock{}, false +} + +func (q *FileQueue) Done(file string, offset int64, data []byte) { + q.fmut.Lock() + defer q.fmut.Unlock() + + c := content{ + offset: offset, + data: data, + } + for i := range q.files { + qf := &q.files[i] + + if qf.name == file { + if qf.monitor != nil && qf.remaining == len(qf.blocks) { + err := qf.monitor.FileBegins(qf.channel) + if err != nil { + log.Printf("WARNING: %s: %v (not synced)", qf.name, err) + delete(q.queued, qf.name) + q.deleteAt(i) + return + } + } + + qf.channel <- c + qf.remaining-- + + if qf.remaining == 0 { + close(qf.channel) + if qf.monitor != nil { + err := qf.monitor.FileDone() + if err != nil { + log.Printf("WARNING: %s: %v", qf.name, err) + } + } + delete(q.queued, qf.name) + q.deleteAt(i) + } + return + } + } + + // We found nothing, might have errored out already +} + +func (q *FileQueue) QueuedFiles() (files []string) { + q.fmut.Lock() + defer q.fmut.Unlock() + + for _, qf := range q.files { + files = append(files, qf.name) + } + return +} + +func (q *FileQueue) deleteAt(i int) { + q.files = append(q.files[:i], q.files[i+1:]...) +} + +func (q *FileQueue) deleteFile(n string) { + for i, file := range q.files { + if n == file.name { + q.deleteAt(i) + delete(q.queued, file.name) + return + } + } +} + +func (q *FileQueue) SetAvailable(file string, nodes []string) { + q.amut.Lock() + defer q.amut.Unlock() + + q.availability[file] = nodes +} + +func (q *FileQueue) RemoveAvailable(toRemove string) { + q.fmut.Lock() + q.amut.Lock() + defer q.amut.Unlock() + defer q.fmut.Unlock() + + for file, nodes := range q.availability { + for i, node := range nodes { + if node == toRemove { + q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])] + if len(q.availability[file]) == 0 { + q.deleteFile(file) + } + } + break + } + } +} diff --git a/cmd/syncthing/filequeue_test.go b/cmd/syncthing/filequeue_test.go new file mode 100644 index 00000000..bf0add74 --- /dev/null +++ b/cmd/syncthing/filequeue_test.go @@ -0,0 +1,295 @@ +package main + +import ( + "reflect" + "sync" + "sync/atomic" + "testing" +) + +func TestFileQueueAdd(t *testing.T) { + q := NewFileQueue() + q.Add("foo", nil, nil) +} + +func TestFileQueueAddSorting(t *testing.T) { + q := NewFileQueue() + q.SetAvailable("zzz", []string{"nodeID"}) + q.SetAvailable("aaa", []string{"nodeID"}) + + q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + b, _ := q.Get("nodeID") + if b.name != "aaa" { + t.Errorf("Incorrectly sorted get: %+v", b) + } + + q = NewFileQueue() + q.SetAvailable("zzz", []string{"nodeID"}) + q.SetAvailable("aaa", []string{"nodeID"}) + + q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + b, _ = q.Get("nodeID") // Start on zzzz + if b.name != "zzz" { + t.Errorf("Incorrectly sorted get: %+v", b) + } + q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + b, _ = q.Get("nodeID") + if b.name != "zzz" { + // Continue rather than starting a new file + t.Errorf("Incorrectly sorted get: %+v", b) + } +} + +func TestFileQueueLen(t *testing.T) { + q := NewFileQueue() + q.Add("foo", nil, nil) + q.Add("bar", nil, nil) + + if l := q.Len(); l != 2 { + t.Errorf("Incorrect len %d != 2 after adds", l) + } +} + +func TestFileQueueGet(t *testing.T) { + q := NewFileQueue() + q.SetAvailable("foo", []string{"nodeID"}) + q.SetAvailable("bar", []string{"nodeID"}) + + q.Add("foo", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other foo hash bytes")}, + {Offset: 256, Size: 128, Hash: []byte("more foo hash bytes")}, + }, nil) + q.Add("bar", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some bar hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other bar hash bytes")}, + }, nil) + + // First get should return the first block of the first file + + expected := queuedBlock{ + name: "bar", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some bar hash bytes"), + }, + } + actual, ok := q.Get("nodeID") + + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned (first)\n E: %+v\n A: %+v", expected, actual) + } + + // Second get should return the next block of the first file + + expected = queuedBlock{ + name: "bar", + block: Block{ + Offset: 128, + Size: 128, + Hash: []byte("some other bar hash bytes"), + }, + index: 1, + } + actual, ok = q.Get("nodeID") + + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned (second)\n E: %+v\n A: %+v", expected, actual) + } + + // Third get should return the first block of the second file + + expected = queuedBlock{ + name: "foo", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some foo hash bytes"), + }, + } + actual, ok = q.Get("nodeID") + + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned (third)\n E: %+v\n A: %+v", expected, actual) + } +} + +/* +func TestFileQueueDone(t *testing.T) { + ch := make(chan content) + var recv sync.WaitGroup + recv.Add(1) + go func() { + content := <-ch + if bytes.Compare(content.data, []byte("first block bytes")) != 0 { + t.Error("Incorrect data in first content block") + } + + content = <-ch + if bytes.Compare(content.data, []byte("second block bytes")) != 0 { + t.Error("Incorrect data in second content block") + } + + _, ok := <-ch + if ok { + t.Error("Content channel not closed") + } + + recv.Done() + }() + + q := FileQueue{resolver: fakeResolver{}} + q.Add("foo", []Block{ + {Offset: 0, Length: 128, Hash: []byte("some foo hash bytes")}, + {Offset: 128, Length: 128, Hash: []byte("some other foo hash bytes")}, + }, ch) + + b0, _ := q.Get("nodeID") + b1, _ := q.Get("nodeID") + + q.Done(b0.name, b0.block.Offset, []byte("first block bytes")) + q.Done(b1.name, b1.block.Offset, []byte("second block bytes")) + + recv.Wait() + + // Queue should now have one file less + + if l := q.Len(); l != 0 { + t.Error("Queue not empty") + } + + _, ok := q.Get("nodeID") + if ok { + t.Error("Unexpected OK Get()") + } +} +*/ + +func TestFileQueueGetNodeIDs(t *testing.T) { + q := NewFileQueue() + q.SetAvailable("a-foo", []string{"nodeID", "a"}) + q.SetAvailable("b-bar", []string{"nodeID", "b"}) + + q.Add("a-foo", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other foo hash bytes")}, + {Offset: 256, Size: 128, Hash: []byte("more foo hash bytes")}, + }, nil) + q.Add("b-bar", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some bar hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other bar hash bytes")}, + }, nil) + + expected := queuedBlock{ + name: "b-bar", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some bar hash bytes"), + }, + } + actual, ok := q.Get("b") + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned\n E: %+v\n A: %+v", expected, actual) + } + + expected = queuedBlock{ + name: "a-foo", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some foo hash bytes"), + }, + } + actual, ok = q.Get("a") + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned\n E: %+v\n A: %+v", expected, actual) + } + + expected = queuedBlock{ + name: "a-foo", + block: Block{ + Offset: 128, + Size: 128, + Hash: []byte("some other foo hash bytes"), + }, + index: 1, + } + actual, ok = q.Get("nodeID") + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned\n E: %+v\n A: %+v", expected, actual) + } +} + +func TestFileQueueThreadHandling(t *testing.T) { + // This should pass with go test -race + + const n = 100 + var total int + var blocks []Block + for i := 1; i <= n; i++ { + blocks = append(blocks, Block{Offset: int64(i), Size: 1}) + total += i + } + + q := NewFileQueue() + q.Add("foo", blocks, nil) + q.SetAvailable("foo", []string{"nodeID"}) + + var start = make(chan bool) + var gotTot uint32 + var wg sync.WaitGroup + wg.Add(n) + for i := 1; i <= n; i++ { + go func() { + <-start + b, _ := q.Get("nodeID") + atomic.AddUint32(&gotTot, uint32(b.block.Offset)) + wg.Done() + }() + } + + close(start) + wg.Wait() + if int(gotTot) != total { + t.Errorf("Total mismatch; %d != %d", gotTot, total) + } +} + +func TestDeleteAt(t *testing.T) { + q := FileQueue{} + + for i := 0; i < 4; i++ { + q.files = queuedFileList{{name: "a"}, {name: "b"}, {name: "c"}, {name: "d"}} + q.deleteAt(i) + if l := len(q.files); l != 3 { + t.Fatalf("deleteAt(%d) failed; %d != 3", i, l) + } + } + + q.files = queuedFileList{{name: "a"}} + q.deleteAt(0) + if l := len(q.files); l != 0 { + t.Fatalf("deleteAt(only) failed; %d != 0", l) + } +} diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go new file mode 100644 index 00000000..93bbbed1 --- /dev/null +++ b/cmd/syncthing/gui.go @@ -0,0 +1,172 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "runtime" + "sync" + "time" + + "github.com/codegangsta/martini" +) + +type guiError struct { + Time time.Time + Error string +} + +var ( + configInSync = true + guiErrors = []guiError{} + guiErrorsMut sync.Mutex +) + +func startGUI(addr string, m *Model) { + router := martini.NewRouter() + router.Get("/", getRoot) + router.Get("/rest/version", restGetVersion) + router.Get("/rest/model", restGetModel) + router.Get("/rest/connections", restGetConnections) + router.Get("/rest/config", restGetConfig) + router.Get("/rest/config/sync", restGetConfigInSync) + router.Get("/rest/need", restGetNeed) + router.Get("/rest/system", restGetSystem) + router.Get("/rest/errors", restGetErrors) + + router.Post("/rest/config", restPostConfig) + router.Post("/rest/restart", restPostRestart) + router.Post("/rest/error", restPostError) + + go func() { + mr := martini.New() + mr.Use(embeddedStatic()) + mr.Use(martini.Recovery()) + mr.Action(router.Handle) + mr.Map(m) + err := http.ListenAndServe(addr, mr) + if err != nil { + warnln("GUI not possible:", err) + } + }() +} + +func getRoot(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/index.html", 302) +} + +func restGetVersion() string { + return Version +} + +func restGetModel(m *Model, w http.ResponseWriter) { + var res = make(map[string]interface{}) + + globalFiles, globalDeleted, globalBytes := m.GlobalSize() + res["globalFiles"], res["globalDeleted"], res["globalBytes"] = globalFiles, globalDeleted, globalBytes + + localFiles, localDeleted, localBytes := m.LocalSize() + res["localFiles"], res["localDeleted"], res["localBytes"] = localFiles, localDeleted, localBytes + + inSyncFiles, inSyncBytes := m.InSyncSize() + res["inSyncFiles"], res["inSyncBytes"] = inSyncFiles, inSyncBytes + + files, total := m.NeedFiles() + res["needFiles"], res["needBytes"] = len(files), total + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(res) +} + +func restGetConnections(m *Model, w http.ResponseWriter) { + var res = m.ConnectionStats() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(res) +} + +func restGetConfig(w http.ResponseWriter) { + json.NewEncoder(w).Encode(cfg) +} + +func restPostConfig(req *http.Request) { + err := json.NewDecoder(req.Body).Decode(&cfg) + if err != nil { + log.Println(err) + } else { + saveConfig() + configInSync = false + } +} + +func restGetConfigInSync(w http.ResponseWriter) { + json.NewEncoder(w).Encode(map[string]bool{"configInSync": configInSync}) +} + +func restPostRestart(req *http.Request) { + restart() +} + +type guiFile File + +func (f guiFile) MarshalJSON() ([]byte, error) { + type t struct { + Name string + Size int64 + } + return json.Marshal(t{ + Name: f.Name, + Size: File(f).Size, + }) +} + +func restGetNeed(m *Model, w http.ResponseWriter) { + files, _ := m.NeedFiles() + gfs := make([]guiFile, len(files)) + for i, f := range files { + gfs[i] = guiFile(f) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(gfs) +} + +var cpuUsagePercent float64 +var cpuUsageLock sync.RWMutex + +func restGetSystem(w http.ResponseWriter) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + res := make(map[string]interface{}) + res["myID"] = myID + res["goroutines"] = runtime.NumGoroutine() + res["alloc"] = m.Alloc + res["sys"] = m.Sys + cpuUsageLock.RLock() + res["cpuPercent"] = cpuUsagePercent + cpuUsageLock.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(res) +} + +func restGetErrors(w http.ResponseWriter) { + guiErrorsMut.Lock() + json.NewEncoder(w).Encode(guiErrors) + guiErrorsMut.Unlock() +} + +func restPostError(req *http.Request) { + bs, _ := ioutil.ReadAll(req.Body) + req.Body.Close() + showGuiError(string(bs)) +} + +func showGuiError(err string) { + guiErrorsMut.Lock() + guiErrors = append(guiErrors, guiError{time.Now(), err}) + if len(guiErrors) > 5 { + guiErrors = guiErrors[len(guiErrors)-5:] + } + guiErrorsMut.Unlock() +} diff --git a/cmd/syncthing/gui_development.go b/cmd/syncthing/gui_development.go new file mode 100644 index 00000000..6c495247 --- /dev/null +++ b/cmd/syncthing/gui_development.go @@ -0,0 +1,9 @@ +//+build guidev + +package main + +import "github.com/codegangsta/martini" + +func embeddedStatic() interface{} { + return martini.Static("gui") +} diff --git a/cmd/syncthing/gui_embedded.go b/cmd/syncthing/gui_embedded.go new file mode 100644 index 00000000..04cc5af8 --- /dev/null +++ b/cmd/syncthing/gui_embedded.go @@ -0,0 +1,40 @@ +//+build !guidev + +package main + +import ( + "fmt" + "log" + "mime" + "net/http" + "path/filepath" + "time" + + "github.com/calmh/syncthing/auto" +) + +func embeddedStatic() interface{} { + var modt = time.Now().UTC().Format(http.TimeFormat) + + return func(res http.ResponseWriter, req *http.Request, log *log.Logger) { + file := req.URL.Path + + if file[0] == '/' { + file = file[1:] + } + + bs, ok := auto.Assets[file] + if !ok { + return + } + + mtype := mime.TypeByExtension(filepath.Ext(req.URL.Path)) + if len(mtype) != 0 { + res.Header().Set("Content-Type", mtype) + } + res.Header().Set("Content-Size", fmt.Sprintf("%d", len(bs))) + res.Header().Set("Last-Modified", modt) + + res.Write(bs) + } +} diff --git a/cmd/syncthing/gui_unix.go b/cmd/syncthing/gui_unix.go new file mode 100644 index 00000000..f9d8e52a --- /dev/null +++ b/cmd/syncthing/gui_unix.go @@ -0,0 +1,31 @@ +//+build !windows,!solaris + +package main + +import ( + "syscall" + "time" +) + +func init() { + go trackCPUUsage() +} + +func trackCPUUsage() { + var prevUsage int64 + var prevTime = time.Now().UnixNano() + var rusage syscall.Rusage + for { + time.Sleep(10 * time.Second) + syscall.Getrusage(syscall.RUSAGE_SELF, &rusage) + curTime := time.Now().UnixNano() + timeDiff := curTime - prevTime + curUsage := rusage.Utime.Nano() + rusage.Stime.Nano() + usageDiff := curUsage - prevUsage + cpuUsageLock.Lock() + cpuUsagePercent = 100 * float64(usageDiff) / float64(timeDiff) + cpuUsageLock.Unlock() + prevTime = curTime + prevUsage = curUsage + } +} diff --git a/cmd/syncthing/locktrace.go b/cmd/syncthing/locktrace.go new file mode 100644 index 00000000..4b662a20 --- /dev/null +++ b/cmd/syncthing/locktrace.go @@ -0,0 +1,43 @@ +//+build locktrace + +package main + +import ( + "log" + "path" + "runtime" + "time" +) + +var ( + lockTime time.Time +) + +func (m *Model) Lock() { + _, file, line, _ := runtime.Caller(1) + log.Printf("%s:%d: Lock()...", path.Base(file), line) + blockTime := time.Now() + m.RWMutex.Lock() + lockTime = time.Now() + log.Printf("%s:%d: ...Lock() [%.04f ms]", path.Base(file), line, time.Since(blockTime).Seconds()*1000) +} + +func (m *Model) Unlock() { + _, file, line, _ := runtime.Caller(1) + m.RWMutex.Unlock() + log.Printf("%s:%d: Unlock() [%.04f ms]", path.Base(file), line, time.Since(lockTime).Seconds()*1000) +} + +func (m *Model) RLock() { + _, file, line, _ := runtime.Caller(1) + log.Printf("%s:%d: RLock()...", path.Base(file), line) + blockTime := time.Now() + m.RWMutex.RLock() + log.Printf("%s:%d: ...RLock() [%.04f ms]", path.Base(file), line, time.Since(blockTime).Seconds()*1000) +} + +func (m *Model) RUnlock() { + _, file, line, _ := runtime.Caller(1) + m.RWMutex.RUnlock() + log.Printf("%s:%d: RUnlock()", path.Base(file), line) +} diff --git a/cmd/syncthing/logger.go b/cmd/syncthing/logger.go new file mode 100644 index 00000000..d2f03d6e --- /dev/null +++ b/cmd/syncthing/logger.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + "log" + "os" +) + +var logger *log.Logger + +func init() { + log.SetOutput(os.Stderr) + logger = log.New(os.Stderr, "", log.Flags()) +} + +func debugln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "DEBUG: "+s) +} + +func debugf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "DEBUG: "+s) +} + +func infoln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "INFO: "+s) +} + +func infof(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "INFO: "+s) +} + +func okln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "OK: "+s) +} + +func okf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "OK: "+s) +} + +func warnln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + showGuiError(s) + logger.Output(2, "WARNING: "+s) +} + +func warnf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + showGuiError(s) + logger.Output(2, "WARNING: "+s) +} + +func fatalln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "FATAL: "+s) + os.Exit(3) +} + +func fatalf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "FATAL: "+s) + os.Exit(3) +} + +func fatalErr(err error) { + if err != nil { + fatalf(err.Error()) + } +} diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go new file mode 100644 index 00000000..069c6dd6 --- /dev/null +++ b/cmd/syncthing/main.go @@ -0,0 +1,569 @@ +package main + +import ( + "compress/gzip" + "crypto/tls" + "flag" + "fmt" + "log" + "net" + "net/http" + _ "net/http/pprof" + "os" + "os/exec" + "path" + "runtime" + "runtime/debug" + "strconv" + "strings" + "time" + + "github.com/calmh/ini" + "github.com/calmh/syncthing/discover" + "github.com/calmh/syncthing/protocol" +) + +var cfg Configuration +var Version = "unknown-dev" + +var ( + myID string +) + +var ( + showVersion bool + confDir string + trace string + profiler string + verbose bool + startupDelay int +) + +func main() { + flag.StringVar(&confDir, "home", "~/.syncthing", "Set configuration directory") + flag.StringVar(&trace, "debug.trace", "", "(connect,net,idx,file,pull)") + flag.StringVar(&profiler, "debug.profiler", "", "(addr)") + flag.BoolVar(&showVersion, "version", false, "Show version") + flag.BoolVar(&verbose, "v", false, "Be more verbose") + flag.IntVar(&startupDelay, "delay", 0, "Startup delay (s)") + flag.Usage = usageFor(flag.CommandLine, "syncthing [options]") + flag.Parse() + + if startupDelay > 0 { + time.Sleep(time.Duration(startupDelay) * time.Second) + } + + if showVersion { + fmt.Println(Version) + os.Exit(0) + } + + if len(os.Getenv("GOGC")) == 0 { + debug.SetGCPercent(25) + } + + if len(os.Getenv("GOMAXPROCS")) == 0 { + runtime.GOMAXPROCS(runtime.NumCPU()) + } + + if len(trace) > 0 { + log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds) + logger.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds) + } + confDir = expandTilde(confDir) + + // Ensure that our home directory exists and that we have a certificate and key. + + ensureDir(confDir, 0700) + cert, err := loadCert(confDir) + if err != nil { + newCertificate(confDir) + cert, err = loadCert(confDir) + fatalErr(err) + } + + myID = string(certID(cert.Certificate[0])) + log.SetPrefix("[" + myID[0:5] + "] ") + logger.SetPrefix("[" + myID[0:5] + "] ") + + infoln("Version", Version) + infoln("My ID:", myID) + + // Prepare to be able to save configuration + + cfgFile := path.Join(confDir, "config.xml") + go saveConfigLoop(cfgFile) + + // Load the configuration file, if it exists. + // If it does not, create a template. + + cf, err := os.Open(cfgFile) + if err == nil { + // Read config.xml + cfg, err = readConfigXML(cf) + if err != nil { + fatalln(err) + } + cf.Close() + } else { + // No config.xml, let's try the old syncthing.ini + iniFile := path.Join(confDir, "syncthing.ini") + cf, err := os.Open(iniFile) + if err == nil { + infoln("Migrating syncthing.ini to config.xml") + iniCfg := ini.Parse(cf) + cf.Close() + os.Rename(iniFile, path.Join(confDir, "migrated_syncthing.ini")) + + cfg, _ = readConfigXML(nil) + cfg.Repositories = []RepositoryConfiguration{ + {Directory: iniCfg.Get("repository", "dir")}, + } + readConfigINI(iniCfg.OptionMap("settings"), &cfg.Options) + for name, addrs := range iniCfg.OptionMap("nodes") { + n := NodeConfiguration{ + NodeID: name, + Addresses: strings.Fields(addrs), + } + cfg.Repositories[0].Nodes = append(cfg.Repositories[0].Nodes, n) + } + + saveConfig() + } + } + + if len(cfg.Repositories) == 0 { + infoln("No config file; starting with empty defaults") + + cfg, err = readConfigXML(nil) + cfg.Repositories = []RepositoryConfiguration{ + { + Directory: "~/Sync", + Nodes: []NodeConfiguration{ + {NodeID: myID, Addresses: []string{"dynamic"}}, + }, + }, + } + + saveConfig() + infof("Edit %s to taste or use the GUI\n", cfgFile) + } + + // Make sure the local node is in the node list. + cfg.Repositories[0].Nodes = cleanNodeList(cfg.Repositories[0].Nodes, myID) + + var dir = expandTilde(cfg.Repositories[0].Directory) + + if len(profiler) > 0 { + go func() { + err := http.ListenAndServe(profiler, nil) + if err != nil { + warnln(err) + } + }() + } + + // The TLS configuration is used for both the listening socket and outgoing + // connections. + + tlsCfg := &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"bep/1.0"}, + ServerName: myID, + ClientAuth: tls.RequestClientCert, + SessionTicketsDisabled: true, + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS12, + } + + ensureDir(dir, -1) + m := NewModel(dir, cfg.Options.MaxChangeKbps*1000) + for _, t := range strings.Split(trace, ",") { + m.Trace(t) + } + if cfg.Options.MaxSendKbps > 0 { + m.LimitRate(cfg.Options.MaxSendKbps) + } + + // GUI + if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" { + addr, err := net.ResolveTCPAddr("tcp", cfg.Options.GUIAddress) + if err != nil { + warnf("Cannot start GUI on %q: %v", cfg.Options.GUIAddress, err) + } else { + var hostOpen, hostShow string + switch { + case addr.IP == nil: + hostOpen = "localhost" + hostShow = "0.0.0.0" + case addr.IP.IsUnspecified(): + hostOpen = "localhost" + hostShow = addr.IP.String() + default: + hostOpen = addr.IP.String() + hostShow = hostOpen + } + + infof("Starting web GUI on http://%s:%d/", hostShow, addr.Port) + startGUI(cfg.Options.GUIAddress, m) + openURL(fmt.Sprintf("http://%s:%d", hostOpen, addr.Port)) + } + } + + // Walk the repository and update the local model before establishing any + // connections to other nodes. + + if verbose { + infoln("Populating repository index") + } + loadIndex(m) + updateLocalModel(m) + + connOpts := map[string]string{ + "clientId": "syncthing", + "clientVersion": Version, + "clusterHash": clusterHash(cfg.Repositories[0].Nodes), + } + + // Routine to listen for incoming connections + if verbose { + infoln("Listening for incoming connections") + } + for _, addr := range cfg.Options.ListenAddress { + go listen(myID, addr, m, tlsCfg, connOpts) + } + + // Routine to connect out to configured nodes + if verbose { + infoln("Attempting to connect to other nodes") + } + disc := discovery(cfg.Options.ListenAddress[0]) + go connect(myID, disc, m, tlsCfg, connOpts) + + // Routine to pull blocks from other nodes to synchronize the local + // repository. Does not run when we are in read only (publish only) mode. + if !cfg.Options.ReadOnly { + if verbose { + if cfg.Options.AllowDelete { + infoln("Deletes from peer nodes are allowed") + } else { + infoln("Deletes from peer nodes will be ignored") + } + okln("Ready to synchronize (read-write)") + } + m.StartRW(cfg.Options.AllowDelete, cfg.Options.ParallelRequests) + } else if verbose { + okln("Ready to synchronize (read only; no external updates accepted)") + } + + // Periodically scan the repository and update the local + // XXX: Should use some fsnotify mechanism. + go func() { + td := time.Duration(cfg.Options.RescanIntervalS) * time.Second + for { + time.Sleep(td) + if m.LocalAge() > (td / 2).Seconds() { + updateLocalModel(m) + } + } + }() + + if verbose { + // Periodically print statistics + go printStatsLoop(m) + } + + select {} +} + +func restart() { + infoln("Restarting") + args := os.Args + doAppend := true + for _, arg := range args { + if arg == "-delay" { + doAppend = false + break + } + } + if doAppend { + args = append(args, "-delay", "2") + } + pgm, err := exec.LookPath(os.Args[0]) + if err != nil { + warnln(err) + return + } + proc, err := os.StartProcess(pgm, args, &os.ProcAttr{ + Env: os.Environ(), + Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}, + }) + if err != nil { + fatalln(err) + } + proc.Release() + os.Exit(0) +} + +var saveConfigCh = make(chan struct{}) + +func saveConfigLoop(cfgFile string) { + for _ = range saveConfigCh { + fd, err := os.Create(cfgFile + ".tmp") + if err != nil { + warnln(err) + continue + } + + err = writeConfigXML(fd, cfg) + if err != nil { + warnln(err) + fd.Close() + continue + } + + err = fd.Close() + if err != nil { + warnln(err) + continue + } + + err = os.Rename(cfgFile+".tmp", cfgFile) + if err != nil { + warnln(err) + } + } +} + +func saveConfig() { + saveConfigCh <- struct{}{} +} + +func printStatsLoop(m *Model) { + var lastUpdated int64 + var lastStats = make(map[string]ConnectionInfo) + + for { + time.Sleep(60 * time.Second) + + for node, stats := range m.ConnectionStats() { + secs := time.Since(lastStats[node].At).Seconds() + inbps := 8 * int(float64(stats.InBytesTotal-lastStats[node].InBytesTotal)/secs) + outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs) + + if inbps+outbps > 0 { + infof("%s: %sb/s in, %sb/s out", node[0:5], MetricPrefix(int64(inbps)), MetricPrefix(int64(outbps))) + } + + lastStats[node] = stats + } + + if lu := m.Generation(); lu > lastUpdated { + lastUpdated = lu + files, _, bytes := m.GlobalSize() + infof("%6d files, %9sB in cluster", files, BinaryPrefix(bytes)) + files, _, bytes = m.LocalSize() + infof("%6d files, %9sB in local repo", files, BinaryPrefix(bytes)) + needFiles, bytes := m.NeedFiles() + infof("%6d files, %9sB to synchronize", len(needFiles), BinaryPrefix(bytes)) + } + } +} + +func listen(myID string, addr string, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { + if strings.Contains(trace, "connect") { + debugln("NET: Listening on", addr) + } + l, err := tls.Listen("tcp", addr, tlsCfg) + fatalErr(err) + +listen: + for { + conn, err := l.Accept() + if err != nil { + warnln(err) + continue + } + + if strings.Contains(trace, "connect") { + debugln("NET: Connect from", conn.RemoteAddr()) + } + + tc := conn.(*tls.Conn) + err = tc.Handshake() + if err != nil { + warnln(err) + tc.Close() + continue + } + + remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw) + + if remoteID == myID { + warnf("Connect from myself (%s) - should not happen", remoteID) + conn.Close() + continue + } + + if m.ConnectedTo(remoteID) { + warnf("Connect from connected node (%s)", remoteID) + } + + for _, nodeCfg := range cfg.Repositories[0].Nodes { + if nodeCfg.NodeID == remoteID { + protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) + m.AddConnection(conn, protoConn) + continue listen + } + } + conn.Close() + } +} + +func discovery(addr string) *discover.Discoverer { + _, portstr, err := net.SplitHostPort(addr) + fatalErr(err) + port, _ := strconv.Atoi(portstr) + + if !cfg.Options.LocalAnnEnabled { + port = -1 + } else if verbose { + infoln("Sending local discovery announcements") + } + + if !cfg.Options.GlobalAnnEnabled { + cfg.Options.GlobalAnnServer = "" + } else if verbose { + infoln("Sending external discovery announcements") + } + + disc, err := discover.NewDiscoverer(myID, port, cfg.Options.GlobalAnnServer) + + if err != nil { + warnf("No discovery possible (%v)", err) + } + + return disc +} + +func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { + for { + nextNode: + for _, nodeCfg := range cfg.Repositories[0].Nodes { + if nodeCfg.NodeID == myID { + continue + } + if m.ConnectedTo(nodeCfg.NodeID) { + continue + } + for _, addr := range nodeCfg.Addresses { + if addr == "dynamic" { + if disc != nil { + t := disc.Lookup(nodeCfg.NodeID) + if len(t) == 0 { + continue + } + addr = t[0] //XXX: Handle all of them + } + } + + if strings.Contains(trace, "connect") { + debugln("NET: Dial", nodeCfg.NodeID, addr) + } + conn, err := tls.Dial("tcp", addr, tlsCfg) + if err != nil { + if strings.Contains(trace, "connect") { + debugln("NET:", err) + } + continue + } + + remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw) + if remoteID != nodeCfg.NodeID { + warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID) + conn.Close() + continue + } + + protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) + m.AddConnection(conn, protoConn) + continue nextNode + } + } + + time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second) + } +} + +func updateLocalModel(m *Model) { + files, _ := m.Walk(cfg.Options.FollowSymlinks) + m.ReplaceLocal(files) + saveIndex(m) +} + +func saveIndex(m *Model) { + name := m.RepoID() + ".idx.gz" + fullName := path.Join(confDir, name) + idxf, err := os.Create(fullName + ".tmp") + if err != nil { + return + } + + gzw := gzip.NewWriter(idxf) + + protocol.IndexMessage{ + Repository: "local", + Files: m.ProtocolIndex(), + }.EncodeXDR(gzw) + gzw.Close() + idxf.Close() + os.Rename(fullName+".tmp", fullName) +} + +func loadIndex(m *Model) { + name := m.RepoID() + ".idx.gz" + idxf, err := os.Open(path.Join(confDir, name)) + if err != nil { + return + } + defer idxf.Close() + + gzr, err := gzip.NewReader(idxf) + if err != nil { + return + } + defer gzr.Close() + + var im protocol.IndexMessage + err = im.DecodeXDR(gzr) + if err != nil || im.Repository != "local" { + return + } + m.SeedLocal(im.Files) +} + +func ensureDir(dir string, mode int) { + fi, err := os.Stat(dir) + if os.IsNotExist(err) { + err := os.MkdirAll(dir, 0700) + fatalErr(err) + } else if mode >= 0 && err == nil && int(fi.Mode()&0777) != mode { + err := os.Chmod(dir, os.FileMode(mode)) + fatalErr(err) + } +} + +func expandTilde(p string) string { + if strings.HasPrefix(p, "~/") { + return strings.Replace(p, "~", getHomeDir(), 1) + } + return p +} + +func getHomeDir() string { + home := os.Getenv("HOME") + if home == "" { + fatalln("No home directory?") + } + return home +} diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go new file mode 100644 index 00000000..2617fb42 --- /dev/null +++ b/cmd/syncthing/model.go @@ -0,0 +1,914 @@ +package main + +import ( + "crypto/sha1" + "errors" + "fmt" + "io" + "net" + "os" + "path" + "sync" + "time" + + "github.com/calmh/syncthing/buffers" + "github.com/calmh/syncthing/protocol" +) + +type Model struct { + dir string + + global map[string]File // the latest version of each file as it exists in the cluster + gmut sync.RWMutex // protects global + local map[string]File // the files we currently have locally on disk + lmut sync.RWMutex // protects local + remote map[string]map[string]File + rmut sync.RWMutex // protects remote + protoConn map[string]Connection + rawConn map[string]io.Closer + pmut sync.RWMutex // protects protoConn and rawConn + + // Queue for files to fetch. fq can call back into the model, so we must ensure + // to hold no locks when calling methods on fq. + fq *FileQueue + dq chan File // queue for files to delete + + updatedLocal int64 // timestamp of last update to local + updateGlobal int64 // timestamp of last update to remote + lastIdxBcast time.Time + lastIdxBcastRequest time.Time + umut sync.RWMutex // provides updated* and lastIdx* + + rwRunning bool + delete bool + initmut sync.Mutex // protects rwRunning and delete + + trace map[string]bool + + sup suppressor + + parallelRequests int + limitRequestRate chan struct{} + + imut sync.Mutex // protects Index +} + +type Connection interface { + ID() string + Index(string, []protocol.FileInfo) + Request(repo, name string, offset int64, size int) ([]byte, error) + Statistics() protocol.Statistics + Option(key string) string +} + +const ( + idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification + idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long +) + +var ( + ErrNoSuchFile = errors.New("no such file") + ErrInvalid = errors.New("file is invalid") +) + +// NewModel creates and starts a new model. The model starts in read-only mode, +// where it sends index information to connected peers and responds to requests +// for file data without altering the local repository in any way. +func NewModel(dir string, maxChangeBw int) *Model { + m := &Model{ + dir: dir, + global: make(map[string]File), + local: make(map[string]File), + remote: make(map[string]map[string]File), + protoConn: make(map[string]Connection), + rawConn: make(map[string]io.Closer), + lastIdxBcast: time.Now(), + trace: make(map[string]bool), + sup: suppressor{threshold: int64(maxChangeBw)}, + fq: NewFileQueue(), + dq: make(chan File), + } + + go m.broadcastIndexLoop() + return m +} + +func (m *Model) LimitRate(kbps int) { + m.limitRequestRate = make(chan struct{}, kbps) + n := kbps/10 + 1 + go func() { + for { + time.Sleep(100 * time.Millisecond) + for i := 0; i < n; i++ { + select { + case m.limitRequestRate <- struct{}{}: + } + } + } + }() +} + +// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace. +func (m *Model) Trace(t string) { + m.trace[t] = true +} + +// StartRW starts read/write processing on the current model. When in +// read/write mode the model will attempt to keep in sync with the cluster by +// pulling needed files from peer nodes. +func (m *Model) StartRW(del bool, threads int) { + m.initmut.Lock() + defer m.initmut.Unlock() + + if m.rwRunning { + panic("starting started model") + } + + m.rwRunning = true + m.delete = del + m.parallelRequests = threads + + go m.cleanTempFiles() + if del { + go m.deleteLoop() + } +} + +// Generation returns an opaque integer that is guaranteed to increment on +// every change to the local repository or global model. +func (m *Model) Generation() int64 { + m.umut.RLock() + defer m.umut.RUnlock() + + return m.updatedLocal + m.updateGlobal +} + +func (m *Model) LocalAge() float64 { + m.umut.RLock() + defer m.umut.RUnlock() + + return time.Since(time.Unix(m.updatedLocal, 0)).Seconds() +} + +type ConnectionInfo struct { + protocol.Statistics + Address string + ClientID string + ClientVersion string + Completion int +} + +// ConnectionStats returns a map with connection statistics for each connected node. +func (m *Model) ConnectionStats() map[string]ConnectionInfo { + type remoteAddrer interface { + RemoteAddr() net.Addr + } + + m.gmut.RLock() + m.pmut.RLock() + m.rmut.RLock() + + var tot int64 + for _, f := range m.global { + tot += f.Size + } + + var res = make(map[string]ConnectionInfo) + for node, conn := range m.protoConn { + ci := ConnectionInfo{ + Statistics: conn.Statistics(), + ClientID: conn.Option("clientId"), + ClientVersion: conn.Option("clientVersion"), + } + if nc, ok := m.rawConn[node].(remoteAddrer); ok { + ci.Address = nc.RemoteAddr().String() + } + + var have int64 + for _, f := range m.remote[node] { + if f.Equals(m.global[f.Name]) { + have += f.Size + } + } + + ci.Completion = int(100 * have / tot) + + res[node] = ci + } + + m.rmut.RUnlock() + m.pmut.RUnlock() + m.gmut.RUnlock() + return res +} + +// GlobalSize returns the number of files, deleted files and total bytes for all +// files in the global model. +func (m *Model) GlobalSize() (files, deleted int, bytes int64) { + m.gmut.RLock() + + for _, f := range m.global { + if f.Flags&protocol.FlagDeleted == 0 { + files++ + bytes += f.Size + } else { + deleted++ + } + } + + m.gmut.RUnlock() + return +} + +// LocalSize returns the number of files, deleted files and total bytes for all +// files in the local repository. +func (m *Model) LocalSize() (files, deleted int, bytes int64) { + m.lmut.RLock() + + for _, f := range m.local { + if f.Flags&protocol.FlagDeleted == 0 { + files++ + bytes += f.Size + } else { + deleted++ + } + } + + m.lmut.RUnlock() + return +} + +// InSyncSize returns the number and total byte size of the local files that +// are in sync with the global model. +func (m *Model) InSyncSize() (files, bytes int64) { + m.gmut.RLock() + m.lmut.RLock() + + for n, f := range m.local { + if gf, ok := m.global[n]; ok && f.Equals(gf) { + files++ + bytes += f.Size + } + } + + m.lmut.RUnlock() + m.gmut.RUnlock() + return +} + +// NeedFiles returns the list of currently needed files and the total size. +func (m *Model) NeedFiles() (files []File, bytes int64) { + qf := m.fq.QueuedFiles() + + m.gmut.RLock() + + for _, n := range qf { + f := m.global[n] + files = append(files, f) + bytes += f.Size + } + + m.gmut.RUnlock() + return +} + +// Index is called when a new node is connected and we receive their full index. +// Implements the protocol.Model interface. +func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { + var files = make([]File, len(fs)) + for i := range fs { + files[i] = fileFromFileInfo(fs[i]) + } + + m.imut.Lock() + defer m.imut.Unlock() + + if m.trace["net"] { + debugf("NET IDX(in): %s: %d files", nodeID, len(fs)) + } + + repo := make(map[string]File) + for _, f := range files { + m.indexUpdate(repo, f) + } + + m.rmut.Lock() + m.remote[nodeID] = repo + m.rmut.Unlock() + + m.recomputeGlobal() + m.recomputeNeedForFiles(files) +} + +// IndexUpdate is called for incremental updates to connected nodes' indexes. +// Implements the protocol.Model interface. +func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { + var files = make([]File, len(fs)) + for i := range fs { + files[i] = fileFromFileInfo(fs[i]) + } + + m.imut.Lock() + defer m.imut.Unlock() + + if m.trace["net"] { + debugf("NET IDXUP(in): %s: %d files", nodeID, len(files)) + } + + m.rmut.Lock() + repo, ok := m.remote[nodeID] + if !ok { + warnf("Index update from node %s that does not have an index", nodeID) + m.rmut.Unlock() + return + } + + for _, f := range files { + m.indexUpdate(repo, f) + } + m.rmut.Unlock() + + m.recomputeGlobal() + m.recomputeNeedForFiles(files) +} + +func (m *Model) indexUpdate(repo map[string]File, f File) { + if m.trace["idx"] { + var flagComment string + if f.Flags&protocol.FlagDeleted != 0 { + flagComment = " (deleted)" + } + debugf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks)) + } + + if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 { + warnf("IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f) + return + } + + repo[f.Name] = f +} + +// Close removes the peer from the model and closes the underlying connection if possible. +// Implements the protocol.Model interface. +func (m *Model) Close(node string, err error) { + if m.trace["net"] { + debugf("NET: %s: %v", node, err) + } + if err == protocol.ErrClusterHash { + warnf("Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node) + } else if err != io.EOF { + warnf("Connection to %s closed: %v", node, err) + } + + m.fq.RemoveAvailable(node) + + m.pmut.Lock() + m.rmut.Lock() + + conn, ok := m.rawConn[node] + if ok { + conn.Close() + } + + delete(m.remote, node) + delete(m.protoConn, node) + delete(m.rawConn, node) + + m.rmut.Unlock() + m.pmut.Unlock() + + m.recomputeGlobal() + m.recomputeNeedForGlobal() +} + +// Request returns the specified data segment by reading it from local disk. +// Implements the protocol.Model interface. +func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) { + // Verify that the requested file exists in the local and global model. + m.lmut.RLock() + lf, localOk := m.local[name] + m.lmut.RUnlock() + + m.gmut.RLock() + _, globalOk := m.global[name] + m.gmut.RUnlock() + + if !localOk || !globalOk { + warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) + return nil, ErrNoSuchFile + } + if lf.Flags&protocol.FlagInvalid != 0 { + return nil, ErrInvalid + } + + if m.trace["net"] && nodeID != "" { + debugf("NET REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) + } + fn := path.Join(m.dir, name) + fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? + if err != nil { + return nil, err + } + defer fd.Close() + + buf := buffers.Get(int(size)) + _, err = fd.ReadAt(buf, offset) + if err != nil { + return nil, err + } + + if m.limitRequestRate != nil { + for s := 0; s < len(buf); s += 1024 { + <-m.limitRequestRate + } + } + + return buf, nil +} + +// ReplaceLocal replaces the local repository index with the given list of files. +func (m *Model) ReplaceLocal(fs []File) { + var updated bool + var newLocal = make(map[string]File) + + m.lmut.RLock() + for _, f := range fs { + newLocal[f.Name] = f + if ef := m.local[f.Name]; !ef.Equals(f) { + updated = true + } + } + m.lmut.RUnlock() + + if m.markDeletedLocals(newLocal) { + updated = true + } + + m.lmut.RLock() + if len(newLocal) != len(m.local) { + updated = true + } + m.lmut.RUnlock() + + if updated { + m.lmut.Lock() + m.local = newLocal + m.lmut.Unlock() + + m.recomputeGlobal() + m.recomputeNeedForGlobal() + + m.umut.Lock() + m.updatedLocal = time.Now().Unix() + m.lastIdxBcastRequest = time.Now() + m.umut.Unlock() + } +} + +// SeedLocal replaces the local repository index with the given list of files, +// in protocol data types. Does not track deletes, should only be used to seed +// the local index from a cache file at startup. +func (m *Model) SeedLocal(fs []protocol.FileInfo) { + m.lmut.Lock() + m.local = make(map[string]File) + for _, f := range fs { + m.local[f.Name] = fileFromFileInfo(f) + } + m.lmut.Unlock() + + m.recomputeGlobal() + m.recomputeNeedForGlobal() +} + +// ConnectedTo returns true if we are connected to the named node. +func (m *Model) ConnectedTo(nodeID string) bool { + m.pmut.RLock() + _, ok := m.protoConn[nodeID] + m.pmut.RUnlock() + return ok +} + +// RepoID returns a unique ID representing the current repository location. +func (m *Model) RepoID() string { + return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir))) +} + +// AddConnection adds a new peer connection to the model. An initial index will +// be sent to the connected peer, thereafter index updates whenever the local +// repository changes. +func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { + nodeID := protoConn.ID() + m.pmut.Lock() + m.protoConn[nodeID] = protoConn + m.rawConn[nodeID] = rawConn + m.pmut.Unlock() + + go func() { + idx := m.ProtocolIndex() + protoConn.Index("default", idx) + }() + + m.initmut.Lock() + rw := m.rwRunning + m.initmut.Unlock() + if !rw { + return + } + + for i := 0; i < m.parallelRequests; i++ { + i := i + go func() { + if m.trace["pull"] { + debugln("PULL: Starting", nodeID, i) + } + for { + m.pmut.RLock() + if _, ok := m.protoConn[nodeID]; !ok { + if m.trace["pull"] { + debugln("PULL: Exiting", nodeID, i) + } + m.pmut.RUnlock() + return + } + m.pmut.RUnlock() + + qb, ok := m.fq.Get(nodeID) + if ok { + if m.trace["pull"] { + debugln("PULL: Request", nodeID, i, qb.name, qb.block.Offset) + } + data, _ := protoConn.Request("default", qb.name, qb.block.Offset, int(qb.block.Size)) + m.fq.Done(qb.name, qb.block.Offset, data) + } else { + time.Sleep(1 * time.Second) + } + } + }() + } +} + +// ProtocolIndex returns the current local index in protocol data types. +// Must be called with the read lock held. +func (m *Model) ProtocolIndex() []protocol.FileInfo { + var index []protocol.FileInfo + + m.lmut.RLock() + + for _, f := range m.local { + mf := fileInfoFromFile(f) + if m.trace["idx"] { + var flagComment string + if mf.Flags&protocol.FlagDeleted != 0 { + flagComment = " (deleted)" + } + debugf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) + } + index = append(index, mf) + } + + m.lmut.RUnlock() + return index +} + +func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash []byte) ([]byte, error) { + m.pmut.RLock() + nc, ok := m.protoConn[nodeID] + m.pmut.RUnlock() + + if !ok { + return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID) + } + + if m.trace["net"] { + debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + } + + return nc.Request("default", name, offset, size) +} + +func (m *Model) broadcastIndexLoop() { + for { + m.umut.RLock() + bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast) + holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime + m.umut.RUnlock() + + maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay + if bcastRequested && (holdtimeExceeded || maxDelayExceeded) { + idx := m.ProtocolIndex() + + var indexWg sync.WaitGroup + indexWg.Add(len(m.protoConn)) + + m.umut.Lock() + m.lastIdxBcast = time.Now() + m.umut.Unlock() + + m.pmut.RLock() + for _, node := range m.protoConn { + node := node + if m.trace["net"] { + debugf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx)) + } + go func() { + node.Index("default", idx) + indexWg.Done() + }() + } + m.pmut.RUnlock() + + indexWg.Wait() + } + time.Sleep(idxBcastHoldtime) + } +} + +// markDeletedLocals sets the deleted flag on files that have gone missing locally. +func (m *Model) markDeletedLocals(newLocal map[string]File) bool { + // For every file in the existing local table, check if they are also + // present in the new local table. If they are not, check that we already + // had the newest version available according to the global table and if so + // note the file as having been deleted. + var updated bool + + m.gmut.RLock() + m.lmut.RLock() + + for n, f := range m.local { + if _, ok := newLocal[n]; !ok { + if gf := m.global[n]; !gf.NewerThan(f) { + if f.Flags&protocol.FlagDeleted == 0 { + f.Flags = protocol.FlagDeleted + f.Version++ + f.Blocks = nil + updated = true + } + newLocal[n] = f + } + } + } + + m.lmut.RUnlock() + m.gmut.RUnlock() + + return updated +} + +func (m *Model) updateLocal(f File) { + var updated bool + + m.lmut.Lock() + if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) { + m.local[f.Name] = f + updated = true + } + m.lmut.Unlock() + + if updated { + m.recomputeGlobal() + // We don't recomputeNeed here for two reasons: + // - a need shouldn't have arisen due to having a newer local file + // - recomputeNeed might call into fq.Add but we might have been called by + // fq which would be a deadlock on fq + + m.umut.Lock() + m.updatedLocal = time.Now().Unix() + m.lastIdxBcastRequest = time.Now() + m.umut.Unlock() + } +} + +/* +XXX: Not done, needs elegant handling of availability + +func (m *Model) recomputeGlobalFor(files []File) bool { + m.gmut.Lock() + defer m.gmut.Unlock() + + var updated bool + for _, f := range files { + if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) { + m.global[f.Name] = f + updated = true + // Fix availability + } + } + return updated +} +*/ + +func (m *Model) recomputeGlobal() { + var newGlobal = make(map[string]File) + + m.lmut.RLock() + for n, f := range m.local { + newGlobal[n] = f + } + m.lmut.RUnlock() + + var available = make(map[string][]string) + + m.rmut.RLock() + var highestMod int64 + for nodeID, fs := range m.remote { + for n, nf := range fs { + if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) { + newGlobal[n] = nf + available[n] = []string{nodeID} + if nf.Modified > highestMod { + highestMod = nf.Modified + } + } else if lf.Equals(nf) { + available[n] = append(available[n], nodeID) + } + } + } + m.rmut.RUnlock() + + for f, ns := range available { + m.fq.SetAvailable(f, ns) + } + + // Figure out if anything actually changed + + m.gmut.RLock() + var updated bool + if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) { + updated = true + } else { + for n, f0 := range newGlobal { + if f1, ok := m.global[n]; !ok || !f0.Equals(f1) { + updated = true + break + } + } + } + m.gmut.RUnlock() + + if updated { + m.gmut.Lock() + m.umut.Lock() + m.global = newGlobal + m.updateGlobal = time.Now().Unix() + m.umut.Unlock() + m.gmut.Unlock() + } +} + +type addOrder struct { + n string + remote []Block + fm *fileMonitor +} + +func (m *Model) recomputeNeedForGlobal() { + var toDelete []File + var toAdd []addOrder + + m.gmut.RLock() + + for _, gf := range m.global { + toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete) + } + + m.gmut.RUnlock() + + for _, ao := range toAdd { + m.fq.Add(ao.n, ao.remote, ao.fm) + } + for _, gf := range toDelete { + m.dq <- gf + } +} + +func (m *Model) recomputeNeedForFiles(files []File) { + var toDelete []File + var toAdd []addOrder + + m.gmut.RLock() + + for _, gf := range files { + toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete) + } + + m.gmut.RUnlock() + + for _, ao := range toAdd { + m.fq.Add(ao.n, ao.remote, ao.fm) + } + for _, gf := range toDelete { + m.dq <- gf + } +} + +func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) { + m.lmut.RLock() + lf, ok := m.local[gf.Name] + m.lmut.RUnlock() + + if !ok || gf.NewerThan(lf) { + if gf.Flags&protocol.FlagInvalid != 0 { + // Never attempt to sync invalid files + return toAdd, toDelete + } + if gf.Flags&protocol.FlagDeleted != 0 && !m.delete { + // Don't want to delete files, so forget this need + return toAdd, toDelete + } + if gf.Flags&protocol.FlagDeleted != 0 && !ok { + // Don't have the file, so don't need to delete it + return toAdd, toDelete + } + if m.trace["need"] { + debugf("NEED: lf:%v gf:%v", lf, gf) + } + + if gf.Flags&protocol.FlagDeleted != 0 { + toDelete = append(toDelete, gf) + } else { + local, remote := BlockDiff(lf.Blocks, gf.Blocks) + fm := fileMonitor{ + name: gf.Name, + path: path.Clean(path.Join(m.dir, gf.Name)), + global: gf, + model: m, + localBlocks: local, + } + toAdd = append(toAdd, addOrder{gf.Name, remote, &fm}) + } + } + + return toAdd, toDelete +} + +func (m *Model) WhoHas(name string) []string { + var remote []string + + m.gmut.RLock() + m.rmut.RLock() + + gf := m.global[name] + for node, files := range m.remote { + if file, ok := files[name]; ok && file.Equals(gf) { + remote = append(remote, node) + } + } + + m.rmut.RUnlock() + m.gmut.RUnlock() + return remote +} + +func (m *Model) deleteLoop() { + for file := range m.dq { + if m.trace["file"] { + debugln("FILE: Delete", file.Name) + } + path := path.Clean(path.Join(m.dir, file.Name)) + err := os.Remove(path) + if err != nil { + warnf("%s: %v", file.Name, err) + } + + m.updateLocal(file) + } +} + +func fileFromFileInfo(f protocol.FileInfo) File { + var blocks = make([]Block, len(f.Blocks)) + var offset int64 + for i, b := range f.Blocks { + blocks[i] = Block{ + Offset: offset, + Size: b.Size, + Hash: b.Hash, + } + offset += int64(b.Size) + } + return File{ + Name: f.Name, + Size: offset, + Flags: f.Flags, + Modified: f.Modified, + Version: f.Version, + Blocks: blocks, + } +} + +func fileInfoFromFile(f File) protocol.FileInfo { + var blocks = make([]protocol.BlockInfo, len(f.Blocks)) + for i, b := range f.Blocks { + blocks[i] = protocol.BlockInfo{ + Size: b.Size, + Hash: b.Hash, + } + } + return protocol.FileInfo{ + Name: f.Name, + Flags: f.Flags, + Modified: f.Modified, + Version: f.Version, + Blocks: blocks, + } +} diff --git a/cmd/syncthing/model_test.go b/cmd/syncthing/model_test.go new file mode 100644 index 00000000..8be7cfe6 --- /dev/null +++ b/cmd/syncthing/model_test.go @@ -0,0 +1,540 @@ +package main + +import ( + "bytes" + "fmt" + "os" + "reflect" + "testing" + "time" + + "github.com/calmh/syncthing/protocol" +) + +func TestNewModel(t *testing.T) { + m := NewModel("foo", 1e6) + + if m == nil { + t.Fatalf("NewModel returned nil") + } + + if fs, _ := m.NeedFiles(); len(fs) > 0 { + t.Errorf("New model should have no Need") + } + + if len(m.local) > 0 { + t.Errorf("New model should have no Have") + } +} + +var testDataExpected = map[string]File{ + "foo": File{ + Name: "foo", + Flags: 0, + Modified: 0, + Size: 7, + Blocks: []Block{{Offset: 0x0, Size: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}}, + }, + "empty": File{ + Name: "empty", + Flags: 0, + Modified: 0, + Size: 0, + Blocks: []Block{{Offset: 0x0, Size: 0x0, Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}}, + }, + "bar": File{ + Name: "bar", + Flags: 0, + Modified: 0, + Size: 10, + Blocks: []Block{{Offset: 0x0, Size: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}}, + }, +} + +func init() { + // Fix expected test data to match reality + for n, f := range testDataExpected { + fi, _ := os.Stat("testdata/" + n) + f.Flags = uint32(fi.Mode()) + f.Modified = fi.ModTime().Unix() + testDataExpected[n] = f + } +} + +func TestUpdateLocal(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + if fs, _ := m.NeedFiles(); len(fs) > 0 { + t.Fatalf("Model with only local data should have no need") + } + + if l1, l2 := len(m.local), len(testDataExpected); l1 != l2 { + t.Fatalf("Model len(local) incorrect, %d != %d", l1, l2) + } + if l1, l2 := len(m.global), len(testDataExpected); l1 != l2 { + t.Fatalf("Model len(global) incorrect, %d != %d", l1, l2) + } + for name, file := range testDataExpected { + if f, ok := m.local[name]; ok { + if !reflect.DeepEqual(f, file) { + t.Errorf("Incorrect local\n%v !=\n%v\nfor file %q", f, file, name) + } + } else { + t.Errorf("Missing file %q in local table", name) + } + if f, ok := m.global[name]; ok { + if !reflect.DeepEqual(f, file) { + t.Errorf("Incorrect global\n%v !=\n%v\nfor file %q", f, file, name) + } + } else { + t.Errorf("Missing file %q in global table", name) + } + } + + for _, f := range fs { + if hf, ok := m.local[f.Name]; !ok || hf.Modified != f.Modified { + t.Fatalf("Incorrect local for %q", f.Name) + } + if cf, ok := m.global[f.Name]; !ok || cf.Modified != f.Modified { + t.Fatalf("Incorrect global for %q", f.Name) + } + } +} + +func TestRemoteUpdateExisting(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + newFile := protocol.FileInfo{ + Name: "foo", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index("42", []protocol.FileInfo{newFile}) + + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model missing Need for one file (%d != 1)", len(fs)) + } +} + +func TestRemoteAddNew(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + newFile := protocol.FileInfo{ + Name: "a new file", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index("42", []protocol.FileInfo{newFile}) + + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model len(m.need) incorrect (%d != 1)", len(fs)) + } +} + +func TestRemoteUpdateOld(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + oldTimeStamp := int64(1234) + newFile := protocol.FileInfo{ + Name: "foo", + Modified: oldTimeStamp, + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index("42", []protocol.FileInfo{newFile}) + + if fs, _ := m.NeedFiles(); len(fs) != 0 { + t.Errorf("Model len(need) incorrect (%d != 0)", len(fs)) + } +} + +func TestRemoteIndexUpdate(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + foo := protocol.FileInfo{ + Name: "foo", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + + bar := protocol.FileInfo{ + Name: "bar", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + + m.Index("42", []protocol.FileInfo{foo}) + + if fs, _ := m.NeedFiles(); fs[0].Name != "foo" { + t.Error("Model doesn't need 'foo'") + } + + m.IndexUpdate("42", []protocol.FileInfo{bar}) + + if fs, _ := m.NeedFiles(); fs[0].Name != "foo" { + t.Error("Model doesn't need 'foo'") + } + if fs, _ := m.NeedFiles(); fs[1].Name != "bar" { + t.Error("Model doesn't need 'bar'") + } +} + +func TestDelete(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs); l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + ot := time.Now().Unix() + newFile := File{ + Name: "a new file", + Modified: ot, + Blocks: []Block{{0, 100, []byte("some hash bytes")}}, + } + m.updateLocal(newFile) + + if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + // The deleted file is kept in the local and global tables and marked as deleted. + + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + if m.local["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in local table") + } + if len(m.local["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in local") + } + if ft := m.local["a new file"].Modified; ft != ot { + t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot+1) + } + if fv := m.local["a new file"].Version; fv != 1 { + t.Errorf("Unexpected version %d != 1 for deleted file in local", fv) + } + + if m.global["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in global table") + } + if len(m.global["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in global") + } + if ft := m.global["a new file"].Modified; ft != ot { + t.Errorf("Unexpected time %d != %d for deleted file in global", ft, ot+1) + } + if fv := m.local["a new file"].Version; fv != 1 { + t.Errorf("Unexpected version %d != 1 for deleted file in global", fv) + } + + // Another update should change nothing + + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + if m.local["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in local table") + } + if len(m.local["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in local") + } + if ft := m.local["a new file"].Modified; ft != ot { + t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot) + } + if fv := m.local["a new file"].Version; fv != 1 { + t.Errorf("Unexpected version %d != 1 for deleted file in local", fv) + } + + if m.global["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in global table") + } + if len(m.global["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in global") + } + if ft := m.global["a new file"].Modified; ft != ot { + t.Errorf("Unexpected time %d != %d for deleted file in global", ft, ot) + } + if fv := m.local["a new file"].Version; fv != 1 { + t.Errorf("Unexpected version %d != 1 for deleted file in global", fv) + } +} + +func TestForgetNode(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs); l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + if fs, _ := m.NeedFiles(); len(fs) != 0 { + t.Errorf("Model len(need) incorrect (%d != 0)", len(fs)) + } + + newFile := protocol.FileInfo{ + Name: "new file", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index("42", []protocol.FileInfo{newFile}) + + newFile = protocol.FileInfo{ + Name: "new file 2", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index("43", []protocol.FileInfo{newFile}) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+2; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + if fs, _ := m.NeedFiles(); len(fs) != 2 { + t.Errorf("Model len(need) incorrect (%d != 2)", len(fs)) + } + + m.Close("42", nil) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model len(need) incorrect (%d != 1)", len(fs)) + } +} + +func TestRequest(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + bs, err := m.Request("some node", "default", "foo", 0, 6) + if err != nil { + t.Fatal(err) + } + if bytes.Compare(bs, []byte("foobar")) != 0 { + t.Errorf("Incorrect data from request: %q", string(bs)) + } + + bs, err = m.Request("some node", "default", "../walk.go", 0, 6) + if err == nil { + t.Error("Unexpected nil error on insecure file read") + } + if bs != nil { + t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) + } +} + +func TestIgnoreWithUnknownFlags(t *testing.T) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + valid := protocol.FileInfo{ + Name: "valid", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + Flags: protocol.FlagDeleted | 0755, + } + + invalid := protocol.FileInfo{ + Name: "invalid", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + Flags: 1<<27 | protocol.FlagDeleted | 0755, + } + + m.Index("42", []protocol.FileInfo{valid, invalid}) + + if _, ok := m.global[valid.Name]; !ok { + t.Error("Model should include", valid) + } + if _, ok := m.global[invalid.Name]; ok { + t.Error("Model not should include", invalid) + } +} + +func genFiles(n int) []protocol.FileInfo { + files := make([]protocol.FileInfo, n) + t := time.Now().Unix() + for i := 0; i < n; i++ { + files[i] = protocol.FileInfo{ + Name: fmt.Sprintf("file%d", i), + Modified: t, + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + } + + return files +} + +func BenchmarkIndex10000(b *testing.B) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + files := genFiles(10000) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.Index("42", files) + } +} + +func BenchmarkIndex00100(b *testing.B) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + files := genFiles(100) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.Index("42", files) + } +} + +func BenchmarkIndexUpdate10000f10000(b *testing.B) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + files := genFiles(10000) + m.Index("42", files) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.IndexUpdate("42", files) + } +} + +func BenchmarkIndexUpdate10000f00100(b *testing.B) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + files := genFiles(10000) + m.Index("42", files) + + ufiles := genFiles(100) + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.IndexUpdate("42", ufiles) + } +} + +func BenchmarkIndexUpdate10000f00001(b *testing.B) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + files := genFiles(10000) + m.Index("42", files) + + ufiles := genFiles(1) + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.IndexUpdate("42", ufiles) + } +} + +type FakeConnection struct { + id string + requestData []byte +} + +func (FakeConnection) Close() error { + return nil +} + +func (f FakeConnection) ID() string { + return string(f.id) +} + +func (f FakeConnection) Option(string) string { + return "" +} + +func (FakeConnection) Index(string, []protocol.FileInfo) {} + +func (f FakeConnection) Request(repo, name string, offset int64, size int) ([]byte, error) { + return f.requestData, nil +} + +func (FakeConnection) Ping() bool { + return true +} + +func (FakeConnection) Statistics() protocol.Statistics { + return protocol.Statistics{} +} + +func BenchmarkRequest(b *testing.B) { + m := NewModel("testdata", 1e6) + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + const n = 1000 + files := make([]protocol.FileInfo, n) + t := time.Now().Unix() + for i := 0; i < n; i++ { + files[i] = protocol.FileInfo{ + Name: fmt.Sprintf("file%d", i), + Modified: t, + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + } + + fc := FakeConnection{ + id: "42", + requestData: []byte("some data to return"), + } + m.AddConnection(fc, fc) + m.Index("42", files) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + data, err := m.requestGlobal("42", files[i%n].Name, 0, 32, nil) + if err != nil { + b.Error(err) + } + if data == nil { + b.Error("nil data") + } + } +} diff --git a/cmd/syncthing/openurl.go b/cmd/syncthing/openurl.go new file mode 100644 index 00000000..b75c1d38 --- /dev/null +++ b/cmd/syncthing/openurl.go @@ -0,0 +1,34 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "os/exec" + "runtime" +) + +func openURL(url string) error { + if runtime.GOOS == "windows" { + return exec.Command("cmd.exe", "/C", "start "+url).Run() + } + + if runtime.GOOS == "darwin" { + return exec.Command("open", url).Run() + } + + return exec.Command("xdg-open", url).Run() +} diff --git a/cmd/syncthing/suppressor.go b/cmd/syncthing/suppressor.go new file mode 100644 index 00000000..0b1bdc5e --- /dev/null +++ b/cmd/syncthing/suppressor.go @@ -0,0 +1,72 @@ +package main + +import ( + "sync" + "time" +) + +const ( + MaxChangeHistory = 4 +) + +type change struct { + size int64 + when time.Time +} + +type changeHistory struct { + changes []change + next int64 + prevSup bool +} + +type suppressor struct { + sync.Mutex + changes map[string]changeHistory + threshold int64 // bytes/s +} + +func (h changeHistory) bandwidth(t time.Time) int64 { + if len(h.changes) == 0 { + return 0 + } + + var t0 = h.changes[0].when + if t == t0 { + return 0 + } + + var bw float64 + for _, c := range h.changes { + bw += float64(c.size) + } + return int64(bw / t.Sub(t0).Seconds()) +} + +func (h *changeHistory) append(size int64, t time.Time) { + c := change{size, t} + if len(h.changes) == MaxChangeHistory { + h.changes = h.changes[1:MaxChangeHistory] + } + h.changes = append(h.changes, c) +} + +func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) { + s.Lock() + + if s.changes == nil { + s.changes = make(map[string]changeHistory) + } + h := s.changes[name] + sup := h.bandwidth(t) > s.threshold + prevSup := h.prevSup + h.prevSup = sup + if !sup { + h.append(size, t) + } + s.changes[name] = h + + s.Unlock() + + return sup, prevSup +} diff --git a/cmd/syncthing/suppressor_test.go b/cmd/syncthing/suppressor_test.go new file mode 100644 index 00000000..80f3d824 --- /dev/null +++ b/cmd/syncthing/suppressor_test.go @@ -0,0 +1,113 @@ +package main + +import ( + "testing" + "time" +) + +func TestSuppressor(t *testing.T) { + s := suppressor{threshold: 10000} + t0 := time.Now() + + t1 := t0 + sup, prev := s.suppress("foo", 10000, t1) + if sup { + t.Fatal("Never suppress first change") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is 10000 / 10 = 1000 + t1 = t0.Add(10 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 1000 { + t.Errorf("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 10000, t1) + if sup { + t.Fatal("Should still be fine") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is (10000 + 10000) / 11 = 1818 + t1 = t0.Add(11 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 1818 { + t.Errorf("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 100500, t1) + if sup { + t.Fatal("Should still be fine") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is (10000 + 10000 + 100500) / 12 = 10041 + t1 = t0.Add(12 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 10041 { + t.Errorf("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored + if !sup { + t.Fatal("Should be over threshold") + } + if prev { + t.Fatal("Incorrect prev status") + } + + // bw is (10000 + 10000 + 100500) / 15 = 8033 + t1 = t0.Add(15 * time.Second) + if bw := s.changes["foo"].bandwidth(t1); bw != 8033 { + t.Errorf("Incorrect bw %d", bw) + } + sup, prev = s.suppress("foo", 10000000, t1) + if sup { + t.Fatal("Should be Ok") + } + if !prev { + t.Fatal("Incorrect prev status") + } +} + +func TestHistory(t *testing.T) { + h := changeHistory{} + + t0 := time.Now() + h.append(40, t0) + + if l := len(h.changes); l != 1 { + t.Errorf("Incorrect history length %d", l) + } + if s := h.changes[0].size; s != 40 { + t.Errorf("Incorrect first record size %d", s) + } + + for i := 1; i < MaxChangeHistory; i++ { + h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second)) + } + + if l := len(h.changes); l != MaxChangeHistory { + t.Errorf("Incorrect history length %d", l) + } + if s := h.changes[0].size; s != 40 { + t.Errorf("Incorrect first record size %d", s) + } + if s := h.changes[MaxChangeHistory-1].size; s != 40+MaxChangeHistory-1 { + t.Errorf("Incorrect last record size %d", s) + } + + h.append(999, t0.Add(time.Duration(999)*time.Second)) + + if l := len(h.changes); l != MaxChangeHistory { + t.Errorf("Incorrect history length %d", l) + } + if s := h.changes[0].size; s != 41 { + t.Errorf("Incorrect first record size %d", s) + } + if s := h.changes[MaxChangeHistory-1].size; s != 999 { + t.Errorf("Incorrect last record size %d", s) + } + +} diff --git a/cmd/syncthing/testdata/.stignore b/cmd/syncthing/testdata/.stignore new file mode 100644 index 00000000..cf8fb0fe --- /dev/null +++ b/cmd/syncthing/testdata/.stignore @@ -0,0 +1,2 @@ +.* +quux diff --git a/cmd/syncthing/testdata/bar b/cmd/syncthing/testdata/bar new file mode 100644 index 00000000..b33c1389 --- /dev/null +++ b/cmd/syncthing/testdata/bar @@ -0,0 +1 @@ +foobarbaz diff --git a/cmd/syncthing/testdata/baz/quux b/cmd/syncthing/testdata/baz/quux new file mode 100644 index 00000000..55976ea0 --- /dev/null +++ b/cmd/syncthing/testdata/baz/quux @@ -0,0 +1 @@ +baazquux diff --git a/cmd/syncthing/testdata/empty b/cmd/syncthing/testdata/empty new file mode 100644 index 00000000..e69de29b diff --git a/cmd/syncthing/testdata/foo b/cmd/syncthing/testdata/foo new file mode 100644 index 00000000..323fae03 --- /dev/null +++ b/cmd/syncthing/testdata/foo @@ -0,0 +1 @@ +foobar diff --git a/cmd/syncthing/tls.go b/cmd/syncthing/tls.go new file mode 100644 index 00000000..7a848f06 --- /dev/null +++ b/cmd/syncthing/tls.go @@ -0,0 +1,71 @@ +package main + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/base32" + "encoding/pem" + "math/big" + "os" + "path" + "strings" + "time" +) + +const ( + tlsRSABits = 3072 + tlsName = "syncthing" +) + +func loadCert(dir string) (tls.Certificate, error) { + return tls.LoadX509KeyPair(path.Join(dir, "cert.pem"), path.Join(dir, "key.pem")) +} + +func certID(bs []byte) string { + hf := sha256.New() + hf.Write(bs) + id := hf.Sum(nil) + return strings.Trim(base32.StdEncoding.EncodeToString(id), "=") +} + +func newCertificate(dir string) { + infoln("Generating RSA certificate and key...") + + priv, err := rsa.GenerateKey(rand.Reader, tlsRSABits) + fatalErr(err) + + notBefore := time.Now() + notAfter := time.Date(2049, 12, 31, 23, 59, 59, 0, time.UTC) + + template := x509.Certificate{ + SerialNumber: new(big.Int).SetInt64(0), + Subject: pkix.Name{ + CommonName: tlsName, + }, + NotBefore: notBefore, + NotAfter: notAfter, + + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + fatalErr(err) + + certOut, err := os.Create(path.Join(dir, "cert.pem")) + fatalErr(err) + pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + certOut.Close() + okln("Created RSA certificate file") + + keyOut, err := os.OpenFile(path.Join(dir, "key.pem"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + fatalErr(err) + pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)}) + keyOut.Close() + okln("Created RSA key file") +} diff --git a/cmd/syncthing/usage.go b/cmd/syncthing/usage.go new file mode 100644 index 00000000..cfd1f303 --- /dev/null +++ b/cmd/syncthing/usage.go @@ -0,0 +1,52 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "text/tabwriter" +) + +func optionTable(w io.Writer, rows [][]string) { + tw := tabwriter.NewWriter(w, 2, 4, 2, ' ', 0) + for _, row := range rows { + for i, cell := range row { + if i > 0 { + tw.Write([]byte("\t")) + } + tw.Write([]byte(cell)) + } + tw.Write([]byte("\n")) + } + tw.Flush() +} + +func usageFor(fs *flag.FlagSet, usage string) func() { + return func() { + var b bytes.Buffer + b.WriteString("Usage:\n " + usage + "\n") + + var options [][]string + fs.VisitAll(func(f *flag.Flag) { + var dash = "-" + if len(f.Name) > 1 { + dash = "--" + } + var opt = " " + dash + f.Name + + if f.DefValue != "false" { + opt += "=" + f.DefValue + } + + options = append(options, []string{opt, f.Usage}) + }) + + if len(options) > 0 { + b.WriteString("\nOptions:\n") + optionTable(&b, options) + } + + fmt.Println(b.String()) + } +} diff --git a/cmd/syncthing/util.go b/cmd/syncthing/util.go new file mode 100644 index 00000000..4e14adc6 --- /dev/null +++ b/cmd/syncthing/util.go @@ -0,0 +1,29 @@ +package main + +import "fmt" + +func MetricPrefix(n int64) string { + if n > 1e9 { + return fmt.Sprintf("%.02f G", float64(n)/1e9) + } + if n > 1e6 { + return fmt.Sprintf("%.02f M", float64(n)/1e6) + } + if n > 1e3 { + return fmt.Sprintf("%.01f k", float64(n)/1e3) + } + return fmt.Sprintf("%d ", n) +} + +func BinaryPrefix(n int64) string { + if n > 1<<30 { + return fmt.Sprintf("%.02f Gi", float64(n)/(1<<30)) + } + if n > 1<<20 { + return fmt.Sprintf("%.02f Mi", float64(n)/(1<<20)) + } + if n > 1<<10 { + return fmt.Sprintf("%.01f Ki", float64(n)/(1<<10)) + } + return fmt.Sprintf("%d ", n) +} diff --git a/cmd/syncthing/walk.go b/cmd/syncthing/walk.go new file mode 100644 index 00000000..3f4c35c5 --- /dev/null +++ b/cmd/syncthing/walk.go @@ -0,0 +1,238 @@ +package main + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/calmh/syncthing/protocol" +) + +const BlockSize = 128 * 1024 + +type File struct { + Name string + Flags uint32 + Modified int64 + Version uint32 + Size int64 + Blocks []Block +} + +func (f File) String() string { + return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d, Size:%d, NumBlocks:%d}", + f.Name, f.Flags, f.Modified, f.Version, f.Size, len(f.Blocks)) +} + +func (f File) Equals(o File) bool { + return f.Modified == o.Modified && f.Version == o.Version +} + +func (f File) NewerThan(o File) bool { + return f.Modified > o.Modified || (f.Modified == o.Modified && f.Version > o.Version) +} + +func isTempName(name string) bool { + return strings.HasPrefix(path.Base(name), ".syncthing.") +} + +func tempName(name string, modified int64) string { + tdir := path.Dir(name) + tname := fmt.Sprintf(".syncthing.%s.%d", path.Base(name), modified) + return path.Join(tdir, tname) +} + +func (m *Model) loadIgnoreFiles(ign map[string][]string) filepath.WalkFunc { + return func(p string, info os.FileInfo, err error) error { + if err != nil { + return nil + } + + rn, err := filepath.Rel(m.dir, p) + if err != nil { + return nil + } + + if pn, sn := path.Split(rn); sn == ".stignore" { + pn := strings.Trim(pn, "/") + bs, _ := ioutil.ReadFile(p) + lines := bytes.Split(bs, []byte("\n")) + var patterns []string + for _, line := range lines { + if len(line) > 0 { + patterns = append(patterns, string(line)) + } + } + ign[pn] = patterns + } + + return nil + } +} + +func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.WalkFunc { + return func(p string, info os.FileInfo, err error) error { + if err != nil { + if m.trace["file"] { + log.Printf("FILE: %q: %v", p, err) + } + return nil + } + + if isTempName(p) { + return nil + } + + rn, err := filepath.Rel(m.dir, p) + if err != nil { + return nil + } + + if _, sn := path.Split(rn); sn == ".stignore" { + // We never sync the .stignore files + return nil + } + + if ignoreFile(ign, rn) { + if m.trace["file"] { + log.Println("FILE: IGNORE:", rn) + } + return nil + } + + if info.Mode()&os.ModeType == 0 { + modified := info.ModTime().Unix() + + m.lmut.RLock() + lf, ok := m.local[rn] + m.lmut.RUnlock() + + if ok && lf.Modified == modified { + if nf := uint32(info.Mode()); nf != lf.Flags { + lf.Flags = nf + lf.Version++ + } + *res = append(*res, lf) + } else { + if cur, prev := m.sup.suppress(rn, info.Size(), time.Now()); cur { + if m.trace["file"] { + log.Printf("FILE: SUPPRESS: %q change bw over threshold", rn) + } + if !prev { + log.Printf("INFO: Changes to %q are being temporarily suppressed because it changes too frequently.", rn) + } + + if ok { + lf.Flags = protocol.FlagInvalid + lf.Version++ + *res = append(*res, lf) + } + return nil + } else if prev && !cur { + log.Printf("INFO: Changes to %q are no longer suppressed.", rn) + } + + if m.trace["file"] { + log.Printf("FILE: Hash %q", p) + } + fd, err := os.Open(p) + if err != nil { + if m.trace["file"] { + log.Printf("FILE: %q: %v", p, err) + } + return nil + } + defer fd.Close() + + blocks, err := Blocks(fd, BlockSize) + if err != nil { + if m.trace["file"] { + log.Printf("FILE: %q: %v", p, err) + } + return nil + } + f := File{ + Name: rn, + Size: info.Size(), + Flags: uint32(info.Mode()), + Modified: modified, + Blocks: blocks, + } + *res = append(*res, f) + } + } + + return nil + } +} + +// Walk returns the list of files found in the local repository by scanning the +// file system. Files are blockwise hashed. +func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]string) { + ignore = make(map[string][]string) + + hashFiles := m.walkAndHashFiles(&files, ignore) + + filepath.Walk(m.dir, m.loadIgnoreFiles(ignore)) + filepath.Walk(m.dir, hashFiles) + + if followSymlinks { + d, err := os.Open(m.dir) + if err != nil { + return + } + defer d.Close() + + fis, err := d.Readdir(-1) + if err != nil { + return + } + + for _, info := range fis { + if info.Mode()&os.ModeSymlink != 0 { + dir := path.Join(m.dir, info.Name()) + "/" + filepath.Walk(dir, m.loadIgnoreFiles(ignore)) + filepath.Walk(dir, hashFiles) + } + } + } + + return +} + +func (m *Model) cleanTempFile(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.Mode()&os.ModeType == 0 && isTempName(path) { + if m.trace["file"] { + log.Printf("FILE: Remove %q", path) + } + os.Remove(path) + } + return nil +} + +func (m *Model) cleanTempFiles() { + filepath.Walk(m.dir, m.cleanTempFile) +} + +func ignoreFile(patterns map[string][]string, file string) bool { + first, last := path.Split(file) + for prefix, pats := range patterns { + if len(prefix) == 0 || prefix == first || strings.HasPrefix(first, prefix+"/") { + for _, pattern := range pats { + if match, _ := path.Match(pattern, last); match { + return true + } + } + } + } + return false +} diff --git a/cmd/syncthing/walk_test.go b/cmd/syncthing/walk_test.go new file mode 100644 index 00000000..43707e50 --- /dev/null +++ b/cmd/syncthing/walk_test.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + "reflect" + "testing" + "time" +) + +var testdata = []struct { + name string + size int + hash string +}{ + {"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"}, + {"empty", 0, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"}, + {"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"}, +} + +var correctIgnores = map[string][]string{ + "": {".*", "quux"}, +} + +func TestWalk(t *testing.T) { + m := NewModel("testdata", 1e6) + files, ignores := m.Walk(false) + + if l1, l2 := len(files), len(testdata); l1 != l2 { + t.Fatalf("Incorrect number of walked files %d != %d", l1, l2) + } + + for i := range testdata { + if n1, n2 := testdata[i].name, files[i].Name; n1 != n2 { + t.Errorf("Incorrect file name %q != %q for case #%d", n1, n2, i) + } + + if h1, h2 := fmt.Sprintf("%x", files[i].Blocks[0].Hash), testdata[i].hash; h1 != h2 { + t.Errorf("Incorrect hash %q != %q for case #%d", h1, h2, i) + } + + t0 := time.Date(2010, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + t1 := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + if mt := files[i].Modified; mt < t0 || mt > t1 { + t.Errorf("Unrealistic modtime %d for test %d", mt, i) + } + } + + if !reflect.DeepEqual(ignores, correctIgnores) { + t.Errorf("Incorrect ignores\n %v\n %v", correctIgnores, ignores) + } +} + +func TestIgnore(t *testing.T) { + var patterns = map[string][]string{ + "": {"t2"}, + "foo": {"bar", "z*"}, + "foo/baz": {"quux", ".*"}, + } + var tests = []struct { + f string + r bool + }{ + {"foo/bar", true}, + {"foo/quux", false}, + {"foo/zuux", true}, + {"foo/qzuux", false}, + {"foo/baz/t1", false}, + {"foo/baz/t2", true}, + {"foo/baz/bar", true}, + {"foo/baz/quuxa", false}, + {"foo/baz/aquux", false}, + {"foo/baz/.quux", true}, + {"foo/baz/zquux", true}, + {"foo/baz/quux", true}, + {"foo/bazz/quux", false}, + } + + for i, tc := range tests { + if r := ignoreFile(patterns, tc.f); r != tc.r { + t.Errorf("Incorrect ignoreFile() #%d; E: %v, A: %v", i, tc.r, r) + } + } +}