diff --git a/lib/db/leveldb.go b/lib/db/leveldb.go index e6ee43e6..732d6146 100644 --- a/lib/db/leveldb.go +++ b/lib/db/leveldb.go @@ -11,11 +11,9 @@ import ( "fmt" "github.com/syncthing/syncthing/lib/protocol" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" ) -const dbVersion = 1 +const dbVersion = 2 const ( KeyTypeDevice = iota @@ -29,6 +27,7 @@ const ( KeyTypeIndexID KeyTypeFolderMeta KeyTypeMiscData + KeyTypeSequence ) func (l VersionList) String() string { @@ -60,28 +59,5 @@ func (l fileList) Less(a, b int) bool { return l[a].Name < l[b].Name } -type dbReader interface { - Get([]byte, *opt.ReadOptions) ([]byte, error) -} - // Flush batches to disk when they contain this many records. const batchFlushSize = 64 - -func getFile(db dbReader, key []byte) (protocol.FileInfo, bool) { - bs, err := db.Get(key, nil) - if err == leveldb.ErrNotFound { - return protocol.FileInfo{}, false - } - if err != nil { - l.Debugln("surprise error:", err) - return protocol.FileInfo{}, false - } - - var f protocol.FileInfo - err = f.Unmarshal(bs) - if err != nil { - l.Debugln("unmarshal error:", err) - return protocol.FileInfo{}, false - } - return f, true -} diff --git a/lib/db/leveldb_dbinstance.go b/lib/db/leveldb_dbinstance.go index 39136d9c..fa581b82 100644 --- a/lib/db/leveldb_dbinstance.go +++ b/lib/db/leveldb_dbinstance.go @@ -35,10 +35,13 @@ type Instance struct { } const ( - keyPrefixLen = 1 - keyFolderLen = 4 // indexed - keyDeviceLen = 4 // indexed - keyHashLen = 32 + keyPrefixLen = 1 + keyFolderLen = 4 // indexed + keyDeviceLen = 4 // indexed + keySequenceLen = 8 + keyHashLen = 32 + + maxInt64 int64 = 1<<63 - 1 ) func Open(file string) (*Instance, error) { @@ -88,13 +91,21 @@ func (db *Instance) UpdateSchema() { miscDB := NewNamespacedKV(db, string(KeyTypeMiscData)) prevVersion, _ := miscDB.Int64("dbVersion") + if prevVersion >= dbVersion { + return + } + + l.Infof("Updating database schema version from %v to %v...", prevVersion, dbVersion) + if prevVersion == 0 { db.updateSchema0to1() } - - if prevVersion != dbVersion { - miscDB.PutInt64("dbVersion", dbVersion) + if prevVersion <= 1 { + db.updateSchema1to2() } + + l.Infof("Finished updating database schema version from %v to %v", prevVersion, dbVersion) + miscDB.PutInt64("dbVersion", dbVersion) } // Committed returns the number of items committed to the database since startup @@ -112,9 +123,10 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m defer t.close() var fk []byte + var gk []byte for _, f := range fs { name := []byte(f.Name) - fk = db.deviceKeyInto(fk[:cap(fk)], folder, device, name) + fk = db.deviceKeyInto(fk, folder, device, name) // Get and unmarshal the file entry. If it doesn't exist or can't be // unmarshalled we'll add it as a new entry. @@ -135,8 +147,10 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m } meta.addFile(devID, f) - t.insertFile(folder, device, f) - t.updateGlobal(folder, device, f, meta) + t.insertFile(fk, folder, device, f) + + gk = db.globalKeyInto(gk, folder, name) + t.updateGlobal(gk, folder, device, f, meta) // Write out and reuse the batch every few records, to avoid the batch // growing too large and thus allocating unnecessarily much memory. @@ -144,6 +158,33 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m } } +func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) { + t := db.newReadWriteTransaction() + defer t.close() + + var sk []byte + var dk []byte + for _, f := range fs { + sk = db.sequenceKeyInto(sk, folder, f.Sequence) + dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name)) + t.Put(sk, dk) + l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name) + t.checkFlush() + } +} + +func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) { + t := db.newReadWriteTransaction() + defer t.close() + + var sk []byte + for _, f := range fs { + t.Delete(db.sequenceKeyInto(sk, folder, f.Sequence)) + l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name) + t.checkFlush() + } +} + func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) { t := db.newReadOnlyTransaction() defer t.close() @@ -171,7 +212,26 @@ func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn It l.Debugln("unmarshal error:", err) continue } - if cont := fn(f); !cont { + if !fn(f) { + return + } + } +} + +func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) { + t := db.newReadOnlyTransaction() + defer t.close() + + dbi := t.NewIterator(&util.Range{Start: db.sequenceKey(folder, startSeq), Limit: db.sequenceKey(folder, maxInt64)}, nil) + defer dbi.Release() + + for dbi.Next() { + f, ok := db.getFile(dbi.Value()) + if !ok { + l.Debugln("missing file for sequence number", db.sequenceKeySequence(dbi.Key())) + continue + } + if !fn(f) { return } } @@ -184,6 +244,8 @@ func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen]), nil) defer dbi.Release() + var gk []byte + for dbi.Next() { device := db.deviceKeyDevice(dbi.Key()) var f FileInfoTruncated @@ -200,20 +262,37 @@ func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, switch f.Name { case "", ".", "..", "/": // A few obviously invalid filenames l.Infof("Dropping invalid filename %q from database", f.Name) - t.removeFromGlobal(folder, device, nil, nil) + name := []byte(f.Name) + gk = db.globalKeyInto(gk, folder, name) + t.removeFromGlobal(gk, folder, device, name, nil) t.Delete(dbi.Key()) t.checkFlush() continue } - if cont := fn(device, f); !cont { + if !fn(device, f) { return } } } -func (db *Instance) getFile(folder, device, file []byte) (protocol.FileInfo, bool) { - return getFile(db, db.deviceKey(folder, device, file)) +func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) { + bs, err := db.Get(key, nil) + if err == leveldb.ErrNotFound { + return protocol.FileInfo{}, false + } + if err != nil { + l.Debugln("surprise error:", err) + return protocol.FileInfo{}, false + } + + var f protocol.FileInfo + err = f.Unmarshal(bs) + if err != nil { + l.Debugln("unmarshal error:", err) + return protocol.FileInfo{}, false + } + return f, true } func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) { @@ -286,7 +365,7 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator return } - fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.Versions[0].Device, name) + fk = db.deviceKeyInto(fk, folder, vl.Versions[0].Device, name) bs, err := t.Get(fk, nil) if err != nil { l.Debugln("surprise error:", err) @@ -299,7 +378,7 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator continue } - if cont := fn(f); !cont { + if !fn(f) { return } } @@ -392,7 +471,7 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) continue } - fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.Versions[i].Device, name) + fk = db.deviceKeyInto(fk, folder, vl.Versions[i].Device, name) bs, err := t.Get(fk, nil) if err != nil { l.Debugln("surprise error:", err) @@ -412,7 +491,7 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) l.Debugf("need folder=%q device=%v name=%q need=%v have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, need, have, haveFileVersion.Invalid, haveFileVersion.Version, needVersion, needDevice) - if cont := fn(gf); !cont { + if !fn(gf) { return } @@ -478,10 +557,13 @@ func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracke dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)), nil) defer dbi.Release() + var gk []byte + for dbi.Next() { key := dbi.Key() name := db.deviceKeyName(key) - t.removeFromGlobal(folder, device, name, meta) + gk = db.globalKeyInto(gk, folder, name) + t.removeFromGlobal(gk, folder, device, name, meta) t.Delete(key) t.checkFlush() } @@ -512,8 +594,7 @@ func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) { name := db.globalKeyName(gk) var newVL VersionList for i, version := range vl.Versions { - fk = db.deviceKeyInto(fk[:cap(fk)], folder, version.Device, name) - + fk = db.deviceKeyInto(fk, folder, version.Device, name) _, err := t.Get(fk, nil) if err == leveldb.ErrNotFound { continue @@ -525,7 +606,7 @@ func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) { newVL.Versions = append(newVL.Versions, version) if i == 0 { - if fi, ok := t.getFile(folder, version.Device, name); ok { + if fi, ok := db.getFile(fk); ok { meta.addFile(globalDeviceID, fi) } } @@ -550,18 +631,20 @@ func (db *Instance) updateSchema0to1() { changedFolders := make(map[string]struct{}) ignAdded := 0 meta := newMetadataTracker() // dummy metadata tracker + var gk []byte for dbi.Next() { folder := db.deviceKeyFolder(dbi.Key()) device := db.deviceKeyDevice(dbi.Key()) - name := string(db.deviceKeyName(dbi.Key())) + name := db.deviceKeyName(dbi.Key()) // Remove files with absolute path (see #4799) - if strings.HasPrefix(name, "/") { + if strings.HasPrefix(string(name), "/") { if _, ok := changedFolders[string(folder)]; !ok { changedFolders[string(folder)] = struct{}{} } - t.removeFromGlobal(folder, device, nil, nil) + gk = db.globalKeyInto(gk, folder, name) + t.removeFromGlobal(gk, folder, device, nil, nil) t.Delete(dbi.Key()) t.checkFlush() continue @@ -590,7 +673,8 @@ func (db *Instance) updateSchema0to1() { // Add invalid files to global list if f.Invalid { - if t.updateGlobal(folder, device, f, meta) { + gk = db.globalKeyInto(gk, folder, name) + if t.updateGlobal(gk, folder, device, f, meta) { if _, ok := changedFolders[string(folder)]; !ok { changedFolders[string(folder)] = struct{}{} } @@ -606,6 +690,25 @@ func (db *Instance) updateSchema0to1() { l.Infof("Updated symlink type for %d index entries and added %d invalid files to global list", symlinkConv, ignAdded) } +func (db *Instance) updateSchema1to2() { + t := db.newReadWriteTransaction() + defer t.close() + + var sk []byte + var dk []byte + + for _, folderStr := range db.ListFolders() { + folder := []byte(folderStr) + db.withHave(folder, protocol.LocalDeviceID[:], nil, true, func(f FileIntf) bool { + sk = db.sequenceKeyInto(sk, folder, f.SequenceNo()) + dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.FileName())) + t.Put(sk, dk) + t.checkFlush() + return true + }) + } +} + // deviceKey returns a byte slice encoding the following information: // keyTypeDevice (1 byte) // folder (4 bytes) @@ -615,16 +718,14 @@ func (db *Instance) deviceKey(folder, device, file []byte) []byte { return db.deviceKeyInto(nil, folder, device, file) } -func (db *Instance) deviceKeyInto(k []byte, folder, device, file []byte) []byte { +func (db *Instance) deviceKeyInto(k, folder, device, file []byte) []byte { reqLen := keyPrefixLen + keyFolderLen + keyDeviceLen + len(file) - if len(k) < reqLen { - k = make([]byte, reqLen) - } + k = resize(k, reqLen) k[0] = KeyTypeDevice binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder)) binary.BigEndian.PutUint32(k[keyPrefixLen+keyFolderLen:], db.deviceIdx.ID(device)) copy(k[keyPrefixLen+keyFolderLen+keyDeviceLen:], file) - return k[:reqLen] + return k } // deviceKeyName returns the device ID from the key @@ -655,11 +756,16 @@ func (db *Instance) deviceKeyDevice(key []byte) []byte { // folder (4 bytes) // name (variable size) func (db *Instance) globalKey(folder, file []byte) []byte { - k := make([]byte, keyPrefixLen+keyFolderLen+len(file)) - k[0] = KeyTypeGlobal - binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder)) - copy(k[keyPrefixLen+keyFolderLen:], file) - return k + return db.globalKeyInto(nil, folder, file) +} + +func (db *Instance) globalKeyInto(gk, folder, file []byte) []byte { + reqLen := keyPrefixLen + keyFolderLen + len(file) + gk = resize(gk, reqLen) + gk[0] = KeyTypeGlobal + binary.BigEndian.PutUint32(gk[keyPrefixLen:], db.folderIdx.ID(folder)) + copy(gk[keyPrefixLen+keyFolderLen:], file) + return gk[:reqLen] } // globalKeyName returns the filename from the key @@ -672,6 +778,28 @@ func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) { return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:])) } +// sequenceKey returns a byte slice encoding the following information: +// KeyTypeSequence (1 byte) +// folder (4 bytes) +// sequence number (8 bytes) +func (db *Instance) sequenceKey(folder []byte, seq int64) []byte { + return db.sequenceKeyInto(nil, folder, seq) +} + +func (db *Instance) sequenceKeyInto(k []byte, folder []byte, seq int64) []byte { + reqLen := keyPrefixLen + keyFolderLen + keySequenceLen + k = resize(k, reqLen) + k[0] = KeyTypeSequence + binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder)) + binary.BigEndian.PutUint64(k[keyPrefixLen+keyFolderLen:], uint64(seq)) + return k[:reqLen] +} + +// sequenceKeySequence returns the sequence number from the key +func (db *Instance) sequenceKeySequence(key []byte) int64 { + return int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:])) +} + func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID { key := db.indexIDKey(device, folder) cur, err := db.Get(key, nil) @@ -887,3 +1015,11 @@ func (i *smallIndex) Val(id uint32) ([]byte, bool) { return []byte(val), true } + +// resize returns a byte array of length reqLen, reusing k if possible +func resize(k []byte, reqLen int) []byte { + if cap(k) < reqLen { + return make([]byte, reqLen) + } + return k[:reqLen] +} diff --git a/lib/db/leveldb_transactions.go b/lib/db/leveldb_transactions.go index f34a159f..8838bc1a 100644 --- a/lib/db/leveldb_transactions.go +++ b/lib/db/leveldb_transactions.go @@ -36,7 +36,7 @@ func (t readOnlyTransaction) close() { } func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) { - return getFile(t, t.db.deviceKey(folder, device, file)) + return t.db.getFile(t.db.deviceKey(folder, device, file)) } // A readWriteTransaction is a readOnlyTransaction plus a batch for writes. @@ -74,21 +74,18 @@ func (t readWriteTransaction) flush() { atomic.AddInt64(&t.db.committed, int64(t.Batch.Len())) } -func (t readWriteTransaction) insertFile(folder, device []byte, file protocol.FileInfo) { +func (t readWriteTransaction) insertFile(fk, folder, device []byte, file protocol.FileInfo) { l.Debugf("insert; folder=%q device=%v %v", folder, protocol.DeviceIDFromBytes(device), file) - name := []byte(file.Name) - nk := t.db.deviceKey(folder, device, name) - t.Put(nk, mustMarshal(&file)) + t.Put(fk, mustMarshal(&file)) } // updateGlobal adds this device+version to the version list for the given // file. If the device is already present in the list, the version is updated. // If the file does not have an entry in the global list, it is created. -func (t readWriteTransaction) updateGlobal(folder, device []byte, file protocol.FileInfo, meta *metadataTracker) bool { +func (t readWriteTransaction) updateGlobal(gk, folder, device []byte, file protocol.FileInfo, meta *metadataTracker) bool { l.Debugf("update global; folder=%q device=%v file=%q version=%v invalid=%v", folder, protocol.DeviceIDFromBytes(device), file.Name, file.Version, file.Invalid) name := []byte(file.Name) - gk := t.db.globalKey(folder, name) svl, _ := t.Get(gk, nil) // skip error, we check len(svl) != 0 later var fl VersionList @@ -150,8 +147,7 @@ insert: // to determine the winner.) // // A surprise missing file entry here is counted as a win for us. - of, ok := t.getFile(folder, fl.Versions[i].Device, name) - if !ok || file.WinsConflict(of) { + if of, ok := t.getFile(folder, fl.Versions[i].Device, name); !ok || file.WinsConflict(of) { fl.Versions = insertVersion(fl.Versions, i, nv) insertedAt = i break insert @@ -193,10 +189,9 @@ insert: // removeFromGlobal removes the device from the global version list for the // given file. If the version list is empty after this, the file entry is // removed entirely. -func (t readWriteTransaction) removeFromGlobal(folder, device, file []byte, meta *metadataTracker) { +func (t readWriteTransaction) removeFromGlobal(gk, folder, device, file []byte, meta *metadataTracker) { l.Debugf("remove from global; folder=%q device=%v file=%q", folder, protocol.DeviceIDFromBytes(device), file) - gk := t.db.globalKey(folder, file) svl, err := t.Get(gk, nil) if err != nil { // We might be called to "remove" a global version that doesn't exist diff --git a/lib/db/set.go b/lib/db/set.go index d6cf2630..61c58386 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -141,8 +141,11 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { // filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating oldFs := fs fs = fs[:0] + var dk []byte + folder := []byte(s.folder) for _, nf := range oldFs { - ef, ok := s.db.getFile([]byte(s.folder), device[:], []byte(nf.Name)) + dk = s.db.deviceKeyInto(dk, folder, device[:], []byte(osutil.NormalizedFilename(nf.Name))) + ef, ok := s.db.getFile(dk) if ok && ef.Version.Equal(nf.Version) && ef.Invalid == nf.Invalid { continue } @@ -157,6 +160,8 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { } s.blockmap.Discard(discards) s.blockmap.Update(updates) + s.db.removeSequences(folder, discards) + s.db.addSequences(folder, updates) } s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta) @@ -183,6 +188,11 @@ func (s *FileSet) WithHaveTruncated(device protocol.DeviceID, fn Iterator) { s.db.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)) } +func (s *FileSet) WithHaveSequence(startSeq int64, fn Iterator) { + l.Debugf("%s WithHaveSequence(%v)", s.folder, startSeq) + s.db.withHaveSequence([]byte(s.folder), startSeq, nativeFileIterator(fn)) +} + func (s *FileSet) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) { l.Debugf("%s WithPrefixedHaveTruncated(%v)", s.folder, device) s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)) @@ -203,7 +213,7 @@ func (s *FileSet) WithPrefixedGlobalTruncated(prefix string, fn Iterator) { } func (s *FileSet) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) { - f, ok := s.db.getFile([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file))) + f, ok := s.db.getFile(s.db.deviceKey([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file)))) f.Name = osutil.NativeFilename(f.Name) return f, ok } diff --git a/lib/db/set_test.go b/lib/db/set_test.go index 5bf7bd9d..fd64e5e1 100644 --- a/lib/db/set_test.go +++ b/lib/db/set_test.go @@ -865,6 +865,33 @@ func TestIssue4701(t *testing.T) { } } +func TestWithHaveSequence(t *testing.T) { + ldb := db.OpenMemory() + + folder := "test)" + s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) + + // The files must not be in alphabetical order + localHave := fileList{ + protocol.FileInfo{Name: "e", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1003}}}, Invalid: true}, + protocol.FileInfo{Name: "b", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1001}}}, Blocks: genBlocks(2)}, + protocol.FileInfo{Name: "d", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1003}}}, Blocks: genBlocks(7)}, + protocol.FileInfo{Name: "a", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}, Blocks: genBlocks(1)}, + protocol.FileInfo{Name: "c", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1002}}}, Blocks: genBlocks(5), Invalid: true}, + } + + replace(s, protocol.LocalDeviceID, localHave) + + i := 2 + s.WithHaveSequence(int64(i), func(fi db.FileIntf) bool { + if f := fi.(protocol.FileInfo); !f.IsEquivalent(localHave[i-1], false, false) { + t.Fatalf("Got %v\nExpected %v", f, localHave[i-1]) + } + i++ + return true + }) +} + func replace(fs *db.FileSet, device protocol.DeviceID, files []protocol.FileInfo) { fs.Drop(device) fs.Update(device, files) diff --git a/lib/model/model.go b/lib/model/model.go index c80149af..a6d232a8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1639,14 +1639,15 @@ func (m *Model) receivedFile(folder string, file protocol.FileInfo) { m.folderStatRef(folder).ReceivedFile(file.Name, file.IsDeleted()) } -func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, startSequence int64, dbLocation string, dropSymlinks bool) { +func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, prevSequence int64, dbLocation string, dropSymlinks bool) { deviceID := conn.ID() var err error - l.Debugf("Starting sendIndexes for %s to %s at %s (slv=%d)", folder, deviceID, conn, startSequence) + l.Debugf("Starting sendIndexes for %s to %s at %s (slv=%d)", folder, deviceID, conn, prevSequence) defer l.Debugf("Exiting sendIndexes for %s to %s at %s: %v", folder, deviceID, conn, err) - minSequence, err := sendIndexTo(startSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks) + // We need to send one index, regardless of whether there is something to send or not + prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks) // Subscribe to LocalIndexUpdated (we have new information to send) and // DeviceDisconnected (it might be us who disconnected, so we should @@ -1664,12 +1665,12 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignore // currently in the database, wait for the local index to update. The // local index may update for other folders than the one we are // sending for. - if fs.Sequence(protocol.LocalDeviceID) <= minSequence { + if fs.Sequence(protocol.LocalDeviceID) <= prevSequence { sub.Poll(time.Minute) continue } - minSequence, err = sendIndexTo(minSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks) + prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks) // Wait a short amount of time before entering the next loop. If there // are continuous changes happening to the local index, this gives us @@ -1678,43 +1679,20 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignore } } -func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string, dropSymlinks bool) (int64, error) { +// sendIndexTo sends file infos with a sequence number higher than prevSequence and +// returns the highest sent sequence number. +func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string, dropSymlinks bool) (int64, error) { deviceID := conn.ID() batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles) batchSizeBytes := 0 - initial := minSequence == 0 - maxSequence := minSequence + initial := prevSequence == 0 var err error + var f protocol.FileInfo debugMsg := func(t string) string { return fmt.Sprintf("Sending indexes for %s to %s at %s: %d files (<%d bytes) (%s)", folder, deviceID, conn, len(batch), batchSizeBytes, t) } - sorter := NewIndexSorter(dbLocation) - defer sorter.Close() - - fs.WithHave(protocol.LocalDeviceID, func(fi db.FileIntf) bool { - f := fi.(protocol.FileInfo) - if f.Sequence <= minSequence { - return true - } - - if f.Sequence > maxSequence { - maxSequence = f.Sequence - } - - if dropSymlinks && f.IsSymlink() { - // Do not send index entries with symlinks to clients that can't - // handle it. Fixes issue #3802. Once both sides are upgraded, a - // rescan (i.e., change) of the symlink is required for it to - // sync again, due to delta indexes. - return true - } - - sorter.Append(f) - return true - }) - - sorter.Sorted(func(f protocol.FileInfo) bool { + fs.WithHaveSequence(prevSequence+1, func(fi db.FileIntf) bool { if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { if initial { if err = conn.Index(folder, batch); err != nil { @@ -1733,24 +1711,43 @@ func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs batchSizeBytes = 0 } + f = fi.(protocol.FileInfo) + + if dropSymlinks && f.IsSymlink() { + // Do not send index entries with symlinks to clients that can't + // handle it. Fixes issue #3802. Once both sides are upgraded, a + // rescan (i.e., change) of the symlink is required for it to + // sync again, due to delta indexes. + return true + } + batch = append(batch, f) batchSizeBytes += f.ProtoSize() return true }) - if initial && err == nil { + if err != nil { + return prevSequence, err + } + + if initial { err = conn.Index(folder, batch) if err == nil { l.Debugln(debugMsg("small initial index")) } - } else if len(batch) > 0 && err == nil { + } else if len(batch) > 0 { err = conn.IndexUpdate(folder, batch) if err == nil { l.Debugln(debugMsg("last batch")) } } - return maxSequence, err + // True if there was nothing to be sent + if f.Sequence == 0 { + return prevSequence, err + } + + return f.Sequence, err } func (m *Model) updateLocalsFromScanning(folder string, fs []protocol.FileInfo) { diff --git a/lib/model/sorter.go b/lib/model/sorter.go deleted file mode 100644 index 695fddef..00000000 --- a/lib/model/sorter.go +++ /dev/null @@ -1,199 +0,0 @@ -// Copyright (C) 2016 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at https://mozilla.org/MPL/2.0/. - -package model - -import ( - "encoding/binary" - "io/ioutil" - "os" - "sort" - - "github.com/syncthing/syncthing/lib/protocol" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" -) - -const ( - maxBytesInMemory = 512 << 10 -) - -// The IndexSorter sorts FileInfos based on their Sequence. You use it -// by first Append()ing all entries to be sorted, then calling Sorted() -// which will iterate over all the items in correctly sorted order. -type IndexSorter interface { - Append(f protocol.FileInfo) - Sorted(fn func(f protocol.FileInfo) bool) - Close() -} - -type internalIndexSorter interface { - IndexSorter - full() bool - copyTo(to IndexSorter) -} - -// NewIndexSorter returns a new IndexSorter that will start out in memory -// for efficiency but switch to on disk storage once the amount of data -// becomes large. -func NewIndexSorter(location string) IndexSorter { - return &autoSwitchingIndexSorter{ - internalIndexSorter: newInMemoryIndexSorter(), - location: location, - } -} - -// An autoSwitchingSorter starts out as an inMemorySorter but becomes an -// onDiskSorter when the in memory sorter is full(). -type autoSwitchingIndexSorter struct { - internalIndexSorter - location string -} - -func (s *autoSwitchingIndexSorter) Append(f protocol.FileInfo) { - if s.internalIndexSorter.full() { - // We spill before adding a file instead of after, to handle the - // case where we're over max size but won't add any more files, in - // which case we *don't* need to spill. An example of this would be - // an index containing just a single large file. - l.Debugf("sorter %p spills to disk", s) - next := newOnDiskIndexSorter(s.location) - s.internalIndexSorter.copyTo(next) - s.internalIndexSorter = next - } - s.internalIndexSorter.Append(f) -} - -// An inMemoryIndexSorter is simply a slice of FileInfos. The full() method -// returns true when the number of files exceeds maxFiles or the total -// number of blocks exceeds maxBlocks. -type inMemoryIndexSorter struct { - files []protocol.FileInfo - bytes int - maxBytes int -} - -func newInMemoryIndexSorter() *inMemoryIndexSorter { - return &inMemoryIndexSorter{ - maxBytes: maxBytesInMemory, - } -} - -func (s *inMemoryIndexSorter) Append(f protocol.FileInfo) { - s.files = append(s.files, f) - s.bytes += f.ProtoSize() -} - -func (s *inMemoryIndexSorter) Sorted(fn func(protocol.FileInfo) bool) { - sort.Sort(bySequence(s.files)) - for _, f := range s.files { - if !fn(f) { - break - } - } -} - -func (s *inMemoryIndexSorter) Close() { -} - -func (s *inMemoryIndexSorter) full() bool { - return s.bytes >= s.maxBytes -} - -func (s *inMemoryIndexSorter) copyTo(dst IndexSorter) { - for _, f := range s.files { - dst.Append(f) - } -} - -// bySequence sorts FileInfos by Sequence -type bySequence []protocol.FileInfo - -func (l bySequence) Len() int { - return len(l) -} -func (l bySequence) Swap(a, b int) { - l[a], l[b] = l[b], l[a] -} -func (l bySequence) Less(a, b int) bool { - return l[a].Sequence < l[b].Sequence -} - -// An onDiskIndexSorter is backed by a LevelDB database in the temporary -// directory. It relies on the fact that iterating over the database is done -// in key order and uses the Sequence as key. When done with an -// onDiskIndexSorter you must call Close() to remove the temporary database. -type onDiskIndexSorter struct { - db *leveldb.DB - dir string -} - -func newOnDiskIndexSorter(location string) *onDiskIndexSorter { - // Set options to minimize resource usage. - opts := &opt.Options{ - OpenFilesCacheCapacity: 10, - WriteBuffer: 512 << 10, - } - - // Use a temporary database directory. - tmp, err := ioutil.TempDir(location, "tmp-index-sorter.") - if err != nil { - panic("creating temporary directory: " + err.Error()) - } - db, err := leveldb.OpenFile(tmp, opts) - if err != nil { - panic("creating temporary database: " + err.Error()) - } - - s := &onDiskIndexSorter{ - db: db, - dir: tmp, - } - l.Debugf("onDiskIndexSorter %p created at %s", s, tmp) - return s -} - -func (s *onDiskIndexSorter) Append(f protocol.FileInfo) { - key := make([]byte, 8) - binary.BigEndian.PutUint64(key[:], uint64(f.Sequence)) - data, err := f.Marshal() - if err != nil { - panic("bug: marshalling FileInfo should never fail: " + err.Error()) - } - err = s.db.Put(key, data, nil) - if err != nil { - panic("writing to temporary database: " + err.Error()) - } -} - -func (s *onDiskIndexSorter) Sorted(fn func(protocol.FileInfo) bool) { - it := s.db.NewIterator(nil, nil) - defer it.Release() - for it.Next() { - var f protocol.FileInfo - if err := f.Unmarshal(it.Value()); err != nil { - panic("unmarshal failed: " + err.Error()) - } - if !fn(f) { - break - } - } -} - -func (s *onDiskIndexSorter) Close() { - l.Debugf("onDiskIndexSorter %p closes", s) - s.db.Close() - os.RemoveAll(s.dir) -} - -func (s *onDiskIndexSorter) full() bool { - return false -} - -func (s *onDiskIndexSorter) copyTo(dst IndexSorter) { - // Just wrap Sorted() if we need to support this in the future. - panic("unsupported") -} diff --git a/lib/model/sorter_test.go b/lib/model/sorter_test.go deleted file mode 100644 index 59759983..00000000 --- a/lib/model/sorter_test.go +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright (C) 2016 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at https://mozilla.org/MPL/2.0/. - -package model - -import ( - "fmt" - "os" - "testing" - - "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/rand" -) - -func TestInMemoryIndexSorter(t *testing.T) { - // An inMemorySorter should be able to absorb a few files in unsorted - // order, and return them sorted. - - s := newInMemoryIndexSorter() - addFiles(50, s) - verifySorted(t, s, 50) - verifyBreak(t, s, 50) - s.Close() -} - -func TestOnDiskIndexSorter(t *testing.T) { - // An onDiskSorter should be able to absorb a few files in unsorted - // order, and return them sorted. - - s := newOnDiskIndexSorter("testdata") - addFiles(50, s) - verifySorted(t, s, 50) - verifyBreak(t, s, 50) - - // The temporary database should exist on disk. When Close()d, it should - // be removed. - - info, err := os.Stat(s.dir) - if err != nil { - t.Fatal("temp database should exist on disk:", err) - } - if !info.IsDir() { - t.Fatal("temp database should be a directory") - } - - s.Close() - - _, err = os.Stat(s.dir) - if !os.IsNotExist(err) { - t.Fatal("temp database should have been removed") - } -} - -func TestIndexSorter(t *testing.T) { - // An default IndexSorter should be able to absorb files, have them in - // memory, and at some point switch to an on disk database. - - s := NewIndexSorter("testdata") - defer s.Close() - - // We should start out as an in memory store. - - nFiles := 1 - addFiles(1, s) - verifySorted(t, s, nFiles) - - as := s.(*autoSwitchingIndexSorter) - if _, ok := as.internalIndexSorter.(*inMemoryIndexSorter); !ok { - t.Fatalf("the sorter should be in memory after only one file") - } - - // At some point, for sure with less than maxBytesInMemory files, we - // should switch over to an on disk sorter. - for i := 0; i < maxBytesInMemory; i++ { - addFiles(1, s) - nFiles++ - if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); ok { - break - } - } - - if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); !ok { - t.Fatalf("the sorter should be on disk after %d files", nFiles) - } - - verifySorted(t, s, nFiles) - - // For test coverage, as some methods are called on the onDiskSorter - // only after switching to it. - - addFiles(1, s) - verifySorted(t, s, nFiles+1) -} - -// addFiles adds files with random Sequence to the Sorter. -func addFiles(n int, s IndexSorter) { - for i := 0; i < n; i++ { - rnd := rand.Int63() - f := protocol.FileInfo{ - Name: fmt.Sprintf("file-%d", rnd), - Size: rand.Int63(), - Permissions: uint32(rand.Intn(0777)), - ModifiedS: rand.Int63(), - ModifiedNs: int32(rand.Int63()), - Sequence: rnd, - Version: protocol.Vector{Counters: []protocol.Counter{{ID: 42, Value: uint64(rand.Int63())}}}, - Blocks: []protocol.BlockInfo{{ - Size: int32(rand.Intn(128 << 10)), - Hash: []byte(rand.String(32)), - }}, - } - s.Append(f) - } -} - -// verifySorted checks that the files are returned sorted by Sequence. -func verifySorted(t *testing.T, s IndexSorter, expected int) { - prevSequence := int64(-1) - seen := 0 - s.Sorted(func(f protocol.FileInfo) bool { - if f.Sequence <= prevSequence { - t.Fatalf("Unsorted Sequence, %d <= %d", f.Sequence, prevSequence) - } - prevSequence = f.Sequence - seen++ - return true - }) - if seen != expected { - t.Fatalf("expected %d files returned, got %d", expected, seen) - } -} - -// verifyBreak checks that the Sorter stops iteration once we return false. -func verifyBreak(t *testing.T, s IndexSorter, expected int) { - prevSequence := int64(-1) - seen := 0 - s.Sorted(func(f protocol.FileInfo) bool { - if f.Sequence <= prevSequence { - t.Fatalf("Unsorted Sequence, %d <= %d", f.Sequence, prevSequence) - } - if len(f.Blocks) != 1 { - t.Fatalf("incorrect number of blocks %d != 1", len(f.Blocks)) - } - if len(f.Version.Counters) != 1 { - t.Fatalf("incorrect number of version counters %d != 1", len(f.Version.Counters)) - } - prevSequence = f.Sequence - seen++ - return seen < expected/2 - }) - if seen != expected/2 { - t.Fatalf("expected %d files iterated over, got %d", expected, seen) - } -}