diff --git a/lib/db/blockmap.go b/lib/db/blockmap.go index eab18071..006eeb5a 100644 --- a/lib/db/blockmap.go +++ b/lib/db/blockmap.go @@ -11,125 +11,14 @@ import ( "fmt" "github.com/syncthing/syncthing/lib/osutil" - "github.com/syncthing/syncthing/lib/protocol" - "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" ) var blockFinder *BlockFinder -const maxBatchSize = 1000 - -type BlockMap struct { - db *Lowlevel - folder uint32 -} - -func NewBlockMap(db *Lowlevel, folder string) *BlockMap { - return &BlockMap{ - db: db, - folder: db.folderIdx.ID([]byte(folder)), - } -} - -// Add files to the block map, ignoring any deleted or invalid files. -func (m *BlockMap) Add(files []protocol.FileInfo) error { - batch := new(leveldb.Batch) - buf := make([]byte, 4) - var key []byte - for _, file := range files { - m.checkFlush(batch) - - if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() { - continue - } - - for i, block := range file.Blocks { - binary.BigEndian.PutUint32(buf, uint32(i)) - key = m.blockKeyInto(key, block.Hash, file.Name) - batch.Put(key, buf) - } - } - return m.db.Write(batch, nil) -} - -// Update block map state, removing any deleted or invalid files. -func (m *BlockMap) Update(files []protocol.FileInfo) error { - batch := new(leveldb.Batch) - buf := make([]byte, 4) - var key []byte - for _, file := range files { - m.checkFlush(batch) - - switch { - case file.IsDirectory(): - case file.IsDeleted() || file.IsInvalid(): - for _, block := range file.Blocks { - key = m.blockKeyInto(key, block.Hash, file.Name) - batch.Delete(key) - } - default: - for i, block := range file.Blocks { - binary.BigEndian.PutUint32(buf, uint32(i)) - key = m.blockKeyInto(key, block.Hash, file.Name) - batch.Put(key, buf) - } - } - } - return m.db.Write(batch, nil) -} - -// Discard block map state, removing the given files -func (m *BlockMap) Discard(files []protocol.FileInfo) error { - batch := new(leveldb.Batch) - var key []byte - for _, file := range files { - m.checkFlush(batch) - m.discard(file, key, batch) - } - return m.db.Write(batch, nil) -} - -func (m *BlockMap) discard(file protocol.FileInfo, key []byte, batch *leveldb.Batch) { - for _, block := range file.Blocks { - key = m.blockKeyInto(key, block.Hash, file.Name) - batch.Delete(key) - } -} - -func (m *BlockMap) checkFlush(batch *leveldb.Batch) error { - if batch.Len() > maxBatchSize { - if err := m.db.Write(batch, nil); err != nil { - return err - } - batch.Reset() - } - return nil -} - -// Drop block map, removing all entries related to this block map from the db. -func (m *BlockMap) Drop() error { - batch := new(leveldb.Batch) - iter := m.db.NewIterator(util.BytesPrefix(m.blockKeyInto(nil, nil, "")[:keyPrefixLen+keyFolderLen]), nil) - defer iter.Release() - for iter.Next() { - m.checkFlush(batch) - - batch.Delete(iter.Key()) - } - if iter.Error() != nil { - return iter.Error() - } - return m.db.Write(batch, nil) -} - -func (m *BlockMap) blockKeyInto(o, hash []byte, file string) []byte { - return blockKeyInto(o, hash, m.folder, file) -} - type BlockFinder struct { - db *Lowlevel + db *instance } func NewBlockFinder(db *Lowlevel) *BlockFinder { @@ -137,11 +26,9 @@ func NewBlockFinder(db *Lowlevel) *BlockFinder { return blockFinder } - f := &BlockFinder{ - db: db, + return &BlockFinder{ + db: newInstance(db), } - - return f } func (f *BlockFinder) String() string { @@ -154,52 +41,23 @@ func (f *BlockFinder) String() string { // reason. The iterator finally returns the result, whether or not a // satisfying block was eventually found. func (f *BlockFinder) Iterate(folders []string, hash []byte, iterFn func(string, string, int32) bool) bool { + t := f.db.newReadOnlyTransaction() + defer t.close() + var key []byte for _, folder := range folders { - folderID := f.db.folderIdx.ID([]byte(folder)) - key = blockKeyInto(key, hash, folderID, "") - iter := f.db.NewIterator(util.BytesPrefix(key), nil) - defer iter.Release() + key = f.db.keyer.GenerateBlockMapKey(key, []byte(folder), hash, nil) + iter := t.NewIterator(util.BytesPrefix(key), nil) for iter.Next() && iter.Error() == nil { - file := blockKeyName(iter.Key()) + file := string(f.db.keyer.NameFromBlockMapKey(iter.Key())) index := int32(binary.BigEndian.Uint32(iter.Value())) if iterFn(folder, osutil.NativeFilename(file), index) { + iter.Release() return true } } + iter.Release() } return false } - -// m.blockKey returns a byte slice encoding the following information: -// keyTypeBlock (1 byte) -// folder (4 bytes) -// block hash (32 bytes) -// file name (variable size) -func blockKeyInto(o, hash []byte, folder uint32, file string) []byte { - reqLen := keyPrefixLen + keyFolderLen + keyHashLen + len(file) - if cap(o) < reqLen { - o = make([]byte, reqLen) - } else { - o = o[:reqLen] - } - o[0] = KeyTypeBlock - binary.BigEndian.PutUint32(o[keyPrefixLen:], folder) - copy(o[keyPrefixLen+keyFolderLen:], hash) - copy(o[keyPrefixLen+keyFolderLen+keyHashLen:], []byte(file)) - return o -} - -// blockKeyName returns the file name from the block key -func blockKeyName(data []byte) string { - if len(data) < keyPrefixLen+keyFolderLen+keyHashLen+1 { - panic("Incorrect key length") - } - if data[0] != KeyTypeBlock { - panic("Incorrect key type") - } - - file := string(data[keyPrefixLen+keyFolderLen+keyHashLen:]) - return file -} diff --git a/lib/db/blockmap_test.go b/lib/db/blockmap_test.go index b55b6d84..f930a251 100644 --- a/lib/db/blockmap_test.go +++ b/lib/db/blockmap_test.go @@ -7,6 +7,7 @@ package db import ( + "encoding/binary" "testing" "github.com/syncthing/syncthing/lib/protocol" @@ -48,19 +49,53 @@ func init() { } } -func setup() (*Lowlevel, *BlockFinder) { +func setup() (*instance, *BlockFinder) { // Setup db := OpenMemory() - return db, NewBlockFinder(db) + return newInstance(db), NewBlockFinder(db) } -func dbEmpty(db *Lowlevel) bool { +func dbEmpty(db *instance) bool { iter := db.NewIterator(util.BytesPrefix([]byte{KeyTypeBlock}), nil) defer iter.Release() return !iter.Next() } +func addToBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) { + t := db.newReadWriteTransaction() + defer t.close() + + var keyBuf []byte + blockBuf := make([]byte, 4) + for _, f := range fs { + if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() { + name := []byte(f.Name) + for i, block := range f.Blocks { + binary.BigEndian.PutUint32(blockBuf, uint32(i)) + keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + t.Put(keyBuf, blockBuf) + } + } + } +} + +func discardFromBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) { + t := db.newReadWriteTransaction() + defer t.close() + + var keyBuf []byte + for _, ef := range fs { + if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() { + name := []byte(ef.Name) + for _, block := range ef.Blocks { + keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + t.Delete(keyBuf) + } + } + } +} + func TestBlockMapAddUpdateWipe(t *testing.T) { db, f := setup() @@ -68,14 +103,11 @@ func TestBlockMapAddUpdateWipe(t *testing.T) { t.Fatal("db not empty") } - m := NewBlockMap(db, "folder1") + folder := []byte("folder1") f3.Type = protocol.FileInfoTypeDirectory - err := m.Add([]protocol.FileInfo{f1, f2, f3}) - if err != nil { - t.Fatal(err) - } + addToBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3}) f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool { if folder != "folder1" || file != "f1" || index != 0 { @@ -96,14 +128,12 @@ func TestBlockMapAddUpdateWipe(t *testing.T) { return true }) + discardFromBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3}) + f1.Deleted = true f2.LocalFlags = protocol.FlagLocalMustRescan // one of the invalid markers - // Should remove - err = m.Update([]protocol.FileInfo{f1, f2, f3}) - if err != nil { - t.Fatal(err) - } + addToBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3}) f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool { t.Fatal("Unexpected block") @@ -122,20 +152,14 @@ func TestBlockMapAddUpdateWipe(t *testing.T) { return true }) - err = m.Drop() - if err != nil { - t.Fatal(err) - } + db.dropFolder(folder) if !dbEmpty(db) { t.Fatal("db not empty") } // Should not add - err = m.Add([]protocol.FileInfo{f1, f2}) - if err != nil { - t.Fatal(err) - } + addToBlockMap(db, folder, []protocol.FileInfo{f1, f2}) if !dbEmpty(db) { t.Fatal("db not empty") @@ -152,17 +176,11 @@ func TestBlockMapAddUpdateWipe(t *testing.T) { func TestBlockFinderLookup(t *testing.T) { db, f := setup() - m1 := NewBlockMap(db, "folder1") - m2 := NewBlockMap(db, "folder2") + folder1 := []byte("folder1") + folder2 := []byte("folder2") - err := m1.Add([]protocol.FileInfo{f1}) - if err != nil { - t.Fatal(err) - } - err = m2.Add([]protocol.FileInfo{f1}) - if err != nil { - t.Fatal(err) - } + addToBlockMap(db, folder1, []protocol.FileInfo{f1}) + addToBlockMap(db, folder2, []protocol.FileInfo{f1}) counter := 0 f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool { @@ -186,12 +204,11 @@ func TestBlockFinderLookup(t *testing.T) { t.Fatal("Incorrect count", counter) } + discardFromBlockMap(db, folder1, []protocol.FileInfo{f1}) + f1.Deleted = true - err = m1.Update([]protocol.FileInfo{f1}) - if err != nil { - t.Fatal(err) - } + addToBlockMap(db, folder1, []protocol.FileInfo{f1}) counter = 0 f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool { diff --git a/lib/db/instance.go b/lib/db/instance.go index 2d65e844..81544b60 100644 --- a/lib/db/instance.go +++ b/lib/db/instance.go @@ -8,6 +8,7 @@ package db import ( "bytes" + "encoding/binary" "fmt" "github.com/syncthing/syncthing/lib/protocol" @@ -30,7 +31,9 @@ func newInstance(ll *Lowlevel) *instance { } } -func (db *instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) { +// updateRemoteFiles adds a list of fileinfos to the database and updates the +// global versionlist and metadata. +func (db *instance) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) { t := db.newReadWriteTransaction() defer t.close() @@ -56,34 +59,65 @@ func (db *instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name) keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, device, f, meta) - // Write out and reuse the batch every few records, to avoid the batch - // growing too large and thus allocating unnecessarily much memory. t.checkFlush() } } -func (db *instance) addSequences(folder []byte, fs []protocol.FileInfo) { +// updateLocalFiles adds fileinfos to the db, and updates the global versionlist, +// metadata, sequence and blockmap buckets. +func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) { t := db.newReadWriteTransaction() defer t.close() - var dk, sk []byte + var dk, gk, keyBuf []byte + blockBuf := make([]byte, 4) for _, f := range fs { - sk = db.keyer.GenerateSequenceKey(sk, folder, f.Sequence) - dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name)) - t.Put(sk, dk) + name := []byte(f.Name) + dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name) + + ef, ok := t.getFileByKey(dk) + if ok && unchanged(f, ef) { + continue + } + + if ok { + if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() { + for _, block := range ef.Blocks { + keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + t.Delete(keyBuf) + } + } + + keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo()) + t.Delete(keyBuf) + l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName()) + } + + f.Sequence = meta.nextLocalSeq() + + if ok { + meta.removeFile(protocol.LocalDeviceID, ef) + } + meta.addFile(protocol.LocalDeviceID, f) + + l.Debugf("insert (local); folder=%q %v", folder, f) + t.Put(dk, mustMarshal(&f)) + + gk = t.db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name)) + keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta) + + keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence) + t.Put(keyBuf, dk) l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name) - t.checkFlush() - } -} -func (db *instance) removeSequences(folder []byte, fs []protocol.FileInfo) { - t := db.newReadWriteTransaction() - defer t.close() + if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() { + for i, block := range f.Blocks { + binary.BigEndian.PutUint32(blockBuf, uint32(i)) + keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + t.Put(keyBuf, blockBuf) + } + } - var sk []byte - for _, f := range fs { - t.Delete(db.keyer.GenerateSequenceKey(sk, folder, f.Sequence)) - l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name) t.checkFlush() } } @@ -383,6 +417,8 @@ func (db *instance) dropFolder(folder []byte) { db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName(), // Remove all needs related to the folder db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName(), + // Remove the blockmap of the folder + db.keyer.GenerateBlockMapKey(nil, folder, nil, nil).WithoutHashAndName(), } { t.deleteKeyPrefix(key) } @@ -403,6 +439,9 @@ func (db *instance) dropDeviceFolder(device, folder []byte, meta *metadataTracke t.Delete(dbi.Key()) t.checkFlush() } + if bytes.Equal(device, protocol.LocalDeviceID[:]) { + t.deleteKeyPrefix(db.keyer.GenerateBlockMapKey(nil, folder, nil, nil).WithoutHashAndName()) + } } func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) { diff --git a/lib/db/keyer.go b/lib/db/keyer.go index e40377e3..b5822642 100644 --- a/lib/db/keyer.go +++ b/lib/db/keyer.go @@ -73,6 +73,10 @@ type keyer interface { NameFromGlobalVersionKey(key []byte) []byte FolderFromGlobalVersionKey(key []byte) ([]byte, bool) + // block map key stuff (former BlockMap) + GenerateBlockMapKey(key, folder, hash, name []byte) blockMapKey + NameFromBlockMapKey(key []byte) []byte + // file need index GenerateNeedFileKey(key, folder, name []byte) needFileKey @@ -154,6 +158,25 @@ func (k defaultKeyer) FolderFromGlobalVersionKey(key []byte) ([]byte, bool) { return k.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:])) } +type blockMapKey []byte + +func (k defaultKeyer) GenerateBlockMapKey(key, folder, hash, name []byte) blockMapKey { + key = resize(key, keyPrefixLen+keyFolderLen+keyHashLen+len(name)) + key[0] = KeyTypeBlock + binary.BigEndian.PutUint32(key[keyPrefixLen:], k.folderIdx.ID(folder)) + copy(key[keyPrefixLen+keyFolderLen:], hash) + copy(key[keyPrefixLen+keyFolderLen+keyHashLen:], name) + return key +} + +func (k defaultKeyer) NameFromBlockMapKey(key []byte) []byte { + return key[keyPrefixLen+keyFolderLen+keyHashLen:] +} + +func (k blockMapKey) WithoutHashAndName() []byte { + return k[:keyPrefixLen+keyFolderLen] +} + type needFileKey []byte func (k needFileKey) WithoutName() []byte { diff --git a/lib/db/set.go b/lib/db/set.go index fc89e4e7..cdd409d5 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -14,7 +14,6 @@ package db import ( "os" - "sort" "time" "github.com/syncthing/syncthing/lib/fs" @@ -25,11 +24,10 @@ import ( ) type FileSet struct { - folder string - fs fs.Filesystem - db *instance - blockmap *BlockMap - meta *metadataTracker + folder string + fs fs.Filesystem + db *instance + meta *metadataTracker updateMutex sync.Mutex // protects database updates and the corresponding metadata changes } @@ -75,7 +73,6 @@ func NewFileSet(folder string, fs fs.Filesystem, ll *Lowlevel) *FileSet { folder: folder, fs: fs, db: db, - blockmap: NewBlockMap(ll, folder), meta: newMetadataTracker(), updateMutex: sync.NewMutex(), } @@ -116,7 +113,6 @@ func (s *FileSet) Drop(device protocol.DeviceID) { s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta) if device == protocol.LocalDeviceID { - s.blockmap.Drop() s.meta.resetCounts(device) // We deliberately do not reset the sequence number here. Dropping // all files for the local device ID only happens in testing - which @@ -147,52 +143,13 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { defer s.meta.toDB(s.db, []byte(s.folder)) - if device != protocol.LocalDeviceID { - // Easy case, just update the files and we're done. - s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta) + if device == protocol.LocalDeviceID { + // For the local device we have a bunch of metadata to track. + s.db.updateLocalFiles([]byte(s.folder), fs, s.meta) return } - - // For the local device we have a bunch of metadata to track however... - - discards := make([]protocol.FileInfo, 0, len(fs)) - updates := make([]protocol.FileInfo, 0, len(fs)) - // db.UpdateFiles will sort unchanged files out -> save one db lookup - // filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating - oldFs := fs - fs = fs[:0] - folder := []byte(s.folder) - for _, nf := range oldFs { - ef, ok := s.db.getFileDirty(folder, device[:], []byte(nf.Name)) - if ok && unchanged(nf, ef) { - continue - } - - nf.Sequence = s.meta.nextLocalSeq() - fs = append(fs, nf) - - if ok { - discards = append(discards, ef) - } - updates = append(updates, nf) - } - - // The ordering here is important. We first remove stuff that point to - // files we are going to update, then update them, then add new index - // pointers etc. In addition, we do the discards in reverse order so - // that a reader traversing the sequence index will get a consistent - // view up until the point they meet the writer. - - sort.Slice(discards, func(a, b int) bool { - // n.b. "b < a" instead of the usual "a < b" - return discards[b].Sequence < discards[a].Sequence - }) - - s.blockmap.Discard(discards) - s.db.removeSequences(folder, discards) - s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta) - s.db.addSequences(folder, updates) - s.blockmap.Update(updates) + // Easy case, just update the files and we're done. + s.db.updateRemoteFiles([]byte(s.folder), device[:], fs, s.meta) } func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) { @@ -327,8 +284,6 @@ func DropFolder(ll *Lowlevel, folder string) { db.dropFolder([]byte(folder)) db.dropMtimes([]byte(folder)) db.dropFolderMeta([]byte(folder)) - bm := NewBlockMap(ll, folder) - bm.Drop() // Also clean out the folder ID mapping. db.folderIdx.Delete([]byte(folder))