From 8ab6b60778c59b700c5e622b007cdd0669a9371b Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 21 Jul 2016 17:21:15 +0000 Subject: [PATCH] lib/model: Sort outgoing index updates by LocalVersion GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3411 --- lib/model/model.go | 8 ++ lib/model/sorter.go | 197 +++++++++++++++++++++++++++++++++++++++ lib/model/sorter_test.go | 156 +++++++++++++++++++++++++++++++ 3 files changed, 361 insertions(+) create mode 100644 lib/model/sorter.go create mode 100644 lib/model/sorter_test.go diff --git a/lib/model/model.go b/lib/model/model.go index a647cef0..56a50b84 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1194,6 +1194,9 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold maxLocalVer := int64(0) var err error + sorter := NewIndexSorter() + defer sorter.Close() + fs.WithHave(protocol.LocalDeviceID, func(fi db.FileIntf) bool { f := fi.(protocol.FileInfo) if f.LocalVersion <= minLocalVer { @@ -1209,6 +1212,11 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold return true } + sorter.Append(f) + return true + }) + + sorter.Sorted(func(f protocol.FileInfo) bool { if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize { if initial { if err = conn.Index(folder, batch); err != nil { diff --git a/lib/model/sorter.go b/lib/model/sorter.go new file mode 100644 index 00000000..a25c5743 --- /dev/null +++ b/lib/model/sorter.go @@ -0,0 +1,197 @@ +// Copyright (C) 2016 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import ( + "encoding/binary" + "io/ioutil" + "sort" + + "github.com/syncthing/syncthing/lib/osutil" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +const ( + maxBytesInMemory = 512 << 10 +) + +// The IndexSorter sorts FileInfos based on their LocalVersion. You use it +// by first Append()ing all entries to be sorted, then calling Sorted() +// which will iterate over all the items in correctly sorted order. +type IndexSorter interface { + Append(f protocol.FileInfo) + Sorted(fn func(f protocol.FileInfo) bool) + Close() +} + +type internalIndexSorter interface { + IndexSorter + full() bool + copyTo(to IndexSorter) +} + +// NewIndexSorter returns a new IndexSorter that will start out in memory +// for efficiency but switch to on disk storage once the amount of data +// becomes large. +func NewIndexSorter() IndexSorter { + return &autoSwitchingIndexSorter{ + internalIndexSorter: newInMemoryIndexSorter(), + } +} + +// An autoSwitchingSorter starts out as an inMemorySorter but becomes an +// onDiskSorter when the in memory sorter is full(). +type autoSwitchingIndexSorter struct { + internalIndexSorter +} + +func (s *autoSwitchingIndexSorter) Append(f protocol.FileInfo) { + if s.internalIndexSorter.full() { + // We spill before adding a file instead of after, to handle the + // case where we're over max size but won't add any more files, in + // which case we *don't* need to spill. An example of this would be + // an index containing just a single large file. + l.Debugf("sorter %p spills to disk", s) + next := newOnDiskIndexSorter() + s.internalIndexSorter.copyTo(next) + s.internalIndexSorter = next + } + s.internalIndexSorter.Append(f) +} + +// An inMemoryIndexSorter is simply a slice of FileInfos. The full() method +// returns true when the number of files exceeds maxFiles or the total +// number of blocks exceeds maxBlocks. +type inMemoryIndexSorter struct { + files []protocol.FileInfo + bytes int + maxBytes int +} + +func newInMemoryIndexSorter() *inMemoryIndexSorter { + return &inMemoryIndexSorter{ + maxBytes: maxBytesInMemory, + } +} + +func (s *inMemoryIndexSorter) Append(f protocol.FileInfo) { + s.files = append(s.files, f) + s.bytes += f.ProtoSize() +} + +func (s *inMemoryIndexSorter) Sorted(fn func(protocol.FileInfo) bool) { + sort.Sort(byLocalVersion(s.files)) + for _, f := range s.files { + if !fn(f) { + break + } + } +} + +func (s *inMemoryIndexSorter) Close() { +} + +func (s *inMemoryIndexSorter) full() bool { + return s.bytes >= s.maxBytes +} + +func (s *inMemoryIndexSorter) copyTo(dst IndexSorter) { + for _, f := range s.files { + dst.Append(f) + } +} + +// byLocalVersion sorts FileInfos by LocalVersion +type byLocalVersion []protocol.FileInfo + +func (l byLocalVersion) Len() int { + return len(l) +} +func (l byLocalVersion) Swap(a, b int) { + l[a], l[b] = l[b], l[a] +} +func (l byLocalVersion) Less(a, b int) bool { + return l[a].LocalVersion < l[b].LocalVersion +} + +// An onDiskIndexSorter is backed by a LevelDB database in the temporary +// directory. It relies on the fact that iterating over the database is done +// in key order and uses the LocalVersion as key. When done with an +// onDiskIndexSorter you must call Close() to remove the temporary database. +type onDiskIndexSorter struct { + db *leveldb.DB + dir string +} + +func newOnDiskIndexSorter() *onDiskIndexSorter { + // Set options to minimize resource usage. + opts := &opt.Options{ + OpenFilesCacheCapacity: 10, + WriteBuffer: 512 << 10, + } + + // Use a temporary database directory. + tmp, err := ioutil.TempDir("", "syncthing-db.") + if err != nil { + panic("creating temporary directory: " + err.Error()) + } + db, err := leveldb.OpenFile(tmp, opts) + if err != nil { + panic("creating temporary database: " + err.Error()) + } + + s := &onDiskIndexSorter{ + db: db, + dir: tmp, + } + l.Debugf("onDiskIndexSorter %p created at %s", s, tmp) + return s +} + +func (s *onDiskIndexSorter) Append(f protocol.FileInfo) { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key[:], uint64(f.LocalVersion)) + data, err := f.Marshal() + if err != nil { + panic("bug: marshalling FileInfo should never fail: " + err.Error()) + } + err = s.db.Put(key, data, nil) + if err != nil { + panic("writing to temporary database: " + err.Error()) + } +} + +func (s *onDiskIndexSorter) Sorted(fn func(protocol.FileInfo) bool) { + it := s.db.NewIterator(nil, nil) + defer it.Release() + for it.Next() { + var f protocol.FileInfo + if err := f.Unmarshal(it.Value()); err != nil { + panic("unmarshal failed: " + err.Error()) + } + if !fn(f) { + break + } + } +} + +func (s *onDiskIndexSorter) Close() { + l.Debugf("onDiskIndexSorter %p closes", s) + s.db.Close() + osutil.RemoveAll(s.dir) +} + +func (s *onDiskIndexSorter) full() bool { + return false +} + +func (s *onDiskIndexSorter) copyTo(dst IndexSorter) { + // Just wrap Sorted() if we need to support this in the future. + panic("unsupported") +} diff --git a/lib/model/sorter_test.go b/lib/model/sorter_test.go new file mode 100644 index 00000000..e5613898 --- /dev/null +++ b/lib/model/sorter_test.go @@ -0,0 +1,156 @@ +// Copyright (C) 2016 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import ( + "fmt" + "os" + "testing" + + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/rand" +) + +func TestInMemoryIndexSorter(t *testing.T) { + // An inMemorySorter should be able to absorb a few files in unsorted + // order, and return them sorted. + + s := newInMemoryIndexSorter() + addFiles(50, s) + verifySorted(t, s, 50) + verifyBreak(t, s, 50) + s.Close() +} + +func TestOnDiskIndexSorter(t *testing.T) { + // An onDiskSorter should be able to absorb a few files in unsorted + // order, and return them sorted. + + s := newOnDiskIndexSorter() + addFiles(50, s) + verifySorted(t, s, 50) + verifyBreak(t, s, 50) + + // The temporary database should exist on disk. When Close()d, it should + // be removed. + + info, err := os.Stat(s.dir) + if err != nil { + t.Fatal("temp database should exist on disk:", err) + } + if !info.IsDir() { + t.Fatal("temp database should be a directory") + } + + s.Close() + + _, err = os.Stat(s.dir) + if !os.IsNotExist(err) { + t.Fatal("temp database should have been removed") + } +} + +func TestIndexSorter(t *testing.T) { + // An default IndexSorter should be able to absorb files, have them in + // memory, and at some point switch to an on disk database. + + s := NewIndexSorter() + defer s.Close() + + // We should start out as an in memory store. + + nFiles := 1 + addFiles(1, s) + verifySorted(t, s, nFiles) + + as := s.(*autoSwitchingIndexSorter) + if _, ok := as.internalIndexSorter.(*inMemoryIndexSorter); !ok { + t.Fatalf("the sorter should be in memory after only one file") + } + + // At some point, for sure with less than maxBytesInMemory files, we + // should switch over to an on disk sorter. + for i := 0; i < maxBytesInMemory; i++ { + addFiles(1, s) + nFiles++ + if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); ok { + break + } + } + + if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); !ok { + t.Fatalf("the sorter should be on disk after %d files", nFiles) + } + + verifySorted(t, s, nFiles) + + // For test coverage, as some methods are called on the onDiskSorter + // only after switching to it. + + addFiles(1, s) + verifySorted(t, s, nFiles+1) +} + +// addFiles adds files with random LocalVersion to the Sorter. +func addFiles(n int, s IndexSorter) { + for i := 0; i < n; i++ { + rnd := rand.Int63() + f := protocol.FileInfo{ + Name: fmt.Sprintf("file-%d", rnd), + Size: rand.Int63(), + Permissions: uint32(rand.Intn(0777)), + Modified: rand.Int63(), + LocalVersion: rnd, + Version: protocol.Vector{Counters: []protocol.Counter{{ID: 42, Value: uint64(rand.Int63())}}}, + Blocks: []protocol.BlockInfo{{ + Size: int32(rand.Intn(128 << 10)), + Hash: []byte(rand.String(32)), + }}, + } + s.Append(f) + } +} + +// verifySorted checks that the files are returned sorted by LocalVersion. +func verifySorted(t *testing.T, s IndexSorter, expected int) { + prevLocalVer := int64(-1) + seen := 0 + s.Sorted(func(f protocol.FileInfo) bool { + if f.LocalVersion <= prevLocalVer { + t.Fatalf("Unsorted LocalVer, %d <= %d", f.LocalVersion, prevLocalVer) + } + prevLocalVer = f.LocalVersion + seen++ + return true + }) + if seen != expected { + t.Fatalf("expected %d files returned, got %d", expected, seen) + } +} + +// verifyBreak checks that the Sorter stops iteration once we return false. +func verifyBreak(t *testing.T, s IndexSorter, expected int) { + prevLocalVer := int64(-1) + seen := 0 + s.Sorted(func(f protocol.FileInfo) bool { + if f.LocalVersion <= prevLocalVer { + t.Fatalf("Unsorted LocalVer, %d <= %d", f.LocalVersion, prevLocalVer) + } + if len(f.Blocks) != 1 { + t.Fatalf("incorrect number of blocks %d != 1", len(f.Blocks)) + } + if len(f.Version.Counters) != 1 { + t.Fatalf("incorrect number of version counters %d != 1", len(f.Version.Counters)) + } + prevLocalVer = f.LocalVersion + seen++ + return seen < expected/2 + }) + if seen != expected/2 { + t.Fatalf("expected %d files iterated over, got %d", expected, seen) + } +}