lib/db: Keep folder meta data persistently in db (fixes #4400)
This keeps the data we need about sequence numbers and object counts persistently in the database. The sizeTracker is expanded into a metadataTracker than handled multiple folders, and the Counts struct is made protobuf serializable. It gains a Sequence field to assist in tracking that as well, and a collection of Counts become a CountsSet (for serialization purposes). The initial database scan is also a consistency check of the global entries. This shouldn't strictly be necessary. Nonetheless I added a created timestamp to the metadata and set a variable to compare against that. When the time since the metadata creation is old enough, we drop the metadata and rebuild from scratch like we used to, while also consistency checking. A new environment variable STCHECKDBEVERY can override this interval, and for example be set to zero to force the check immediately. GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4547 LGTM: imsodin
This commit is contained in:
committed by
Audrius Butkevicius
parent
8c91ced784
commit
d1d967f0cf
209
lib/db/set.go
209
lib/db/set.go
@@ -13,8 +13,8 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
stdsync "sync"
|
||||
"sync/atomic"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
"github.com/syncthing/syncthing/lib/osutil"
|
||||
@@ -23,16 +23,13 @@ import (
|
||||
)
|
||||
|
||||
type FileSet struct {
|
||||
sequence int64 // Our local sequence number
|
||||
folder string
|
||||
fs fs.Filesystem
|
||||
db *Instance
|
||||
blockmap *BlockMap
|
||||
localSize sizeTracker
|
||||
globalSize sizeTracker
|
||||
folder string
|
||||
fs fs.Filesystem
|
||||
db *Instance
|
||||
blockmap *BlockMap
|
||||
meta *metadataTracker
|
||||
|
||||
remoteSequence map[protocol.DeviceID]int64 // Highest seen sequence numbers for other devices
|
||||
updateMutex sync.Mutex // protects remoteSequence and database updates
|
||||
updateMutex sync.Mutex // protects database updates and the corresponding metadata changes
|
||||
}
|
||||
|
||||
// FileIntf is the set of methods implemented by both protocol.FileInfo and
|
||||
@@ -45,6 +42,7 @@ type FileIntf interface {
|
||||
IsDirectory() bool
|
||||
IsSymlink() bool
|
||||
HasPermissionBits() bool
|
||||
SequenceNo() int64
|
||||
}
|
||||
|
||||
// The Iterator is called with either a protocol.FileInfo or a
|
||||
@@ -52,126 +50,76 @@ type FileIntf interface {
|
||||
// continue iteration, false to stop.
|
||||
type Iterator func(f FileIntf) bool
|
||||
|
||||
type Counts struct {
|
||||
Files int
|
||||
Directories int
|
||||
Symlinks int
|
||||
Deleted int
|
||||
Bytes int64
|
||||
}
|
||||
var databaseRecheckInterval = 30 * 24 * time.Hour
|
||||
|
||||
type sizeTracker struct {
|
||||
Counts
|
||||
mut stdsync.Mutex
|
||||
}
|
||||
|
||||
func (s *sizeTracker) addFile(f FileIntf) {
|
||||
if f.IsInvalid() {
|
||||
return
|
||||
func init() {
|
||||
if dur, err := time.ParseDuration(os.Getenv("STRECHECKDBEVERY")); err == nil {
|
||||
databaseRecheckInterval = dur
|
||||
}
|
||||
|
||||
s.mut.Lock()
|
||||
switch {
|
||||
case f.IsDeleted():
|
||||
s.Deleted++
|
||||
case f.IsDirectory() && !f.IsSymlink():
|
||||
s.Directories++
|
||||
case f.IsSymlink():
|
||||
s.Symlinks++
|
||||
default:
|
||||
s.Files++
|
||||
}
|
||||
s.Bytes += f.FileSize()
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
func (s *sizeTracker) removeFile(f FileIntf) {
|
||||
if f.IsInvalid() {
|
||||
return
|
||||
}
|
||||
|
||||
s.mut.Lock()
|
||||
switch {
|
||||
case f.IsDeleted():
|
||||
s.Deleted--
|
||||
case f.IsDirectory() && !f.IsSymlink():
|
||||
s.Directories--
|
||||
case f.IsSymlink():
|
||||
s.Symlinks--
|
||||
default:
|
||||
s.Files--
|
||||
}
|
||||
s.Bytes -= f.FileSize()
|
||||
if s.Deleted < 0 || s.Files < 0 || s.Directories < 0 || s.Symlinks < 0 {
|
||||
panic("bug: removed more than added")
|
||||
}
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
func (s *sizeTracker) reset() {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
s.Counts = Counts{}
|
||||
}
|
||||
|
||||
func (s *sizeTracker) Size() Counts {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
return s.Counts
|
||||
}
|
||||
|
||||
func NewFileSet(folder string, fs fs.Filesystem, db *Instance) *FileSet {
|
||||
var s = FileSet{
|
||||
remoteSequence: make(map[protocol.DeviceID]int64),
|
||||
folder: folder,
|
||||
fs: fs,
|
||||
db: db,
|
||||
blockmap: NewBlockMap(db, db.folderIdx.ID([]byte(folder))),
|
||||
updateMutex: sync.NewMutex(),
|
||||
folder: folder,
|
||||
fs: fs,
|
||||
db: db,
|
||||
blockmap: NewBlockMap(db, db.folderIdx.ID([]byte(folder))),
|
||||
meta: newMetadataTracker(),
|
||||
updateMutex: sync.NewMutex(),
|
||||
}
|
||||
|
||||
s.db.checkGlobals([]byte(folder), &s.globalSize)
|
||||
|
||||
var deviceID protocol.DeviceID
|
||||
s.db.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
|
||||
copy(deviceID[:], device)
|
||||
if deviceID == protocol.LocalDeviceID {
|
||||
if f.Sequence > s.sequence {
|
||||
s.sequence = f.Sequence
|
||||
}
|
||||
s.localSize.addFile(f)
|
||||
} else if f.Sequence > s.remoteSequence[deviceID] {
|
||||
s.remoteSequence[deviceID] = f.Sequence
|
||||
}
|
||||
return true
|
||||
})
|
||||
l.Debugf("loaded sequence for %q: %#v", folder, s.sequence)
|
||||
if err := s.meta.fromDB(db, []byte(folder)); err != nil {
|
||||
l.Infof("No stored folder metadata for %q: recalculating", folder)
|
||||
s.recalcCounts()
|
||||
} else if age := time.Since(s.meta.Created()); age > databaseRecheckInterval {
|
||||
l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
|
||||
s.recalcCounts()
|
||||
}
|
||||
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s *FileSet) recalcCounts() {
|
||||
s.meta = newMetadataTracker()
|
||||
|
||||
s.db.checkGlobals([]byte(s.folder), s.meta)
|
||||
|
||||
var deviceID protocol.DeviceID
|
||||
s.db.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool {
|
||||
copy(deviceID[:], device)
|
||||
s.meta.addFile(deviceID, f)
|
||||
return true
|
||||
})
|
||||
|
||||
s.meta.SetCreated()
|
||||
s.meta.toDB(s.db, []byte(s.folder))
|
||||
}
|
||||
|
||||
func (s *FileSet) Drop(device protocol.DeviceID) {
|
||||
l.Debugf("%s Drop(%v)", s.folder, device)
|
||||
|
||||
s.updateMutex.Lock()
|
||||
defer s.updateMutex.Unlock()
|
||||
|
||||
s.db.dropDeviceFolder(device[:], []byte(s.folder), &s.globalSize)
|
||||
s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta)
|
||||
|
||||
if device == protocol.LocalDeviceID {
|
||||
s.blockmap.Drop()
|
||||
s.localSize.reset()
|
||||
// We deliberately do not reset s.sequence here. Dropping all files
|
||||
// for the local device ID only happens in testing - which expects
|
||||
// the sequence to be retained, like an old Replace() of all files
|
||||
// would do. However, if we ever did it "in production" we would
|
||||
// anyway want to retain the sequence for delta indexes to be happy.
|
||||
s.meta.resetCounts(device)
|
||||
// We deliberately do not reset the sequence number here. Dropping
|
||||
// all files for the local device ID only happens in testing - which
|
||||
// expects the sequence to be retained, like an old Replace() of all
|
||||
// files would do. However, if we ever did it "in production" we
|
||||
// would anyway want to retain the sequence for delta indexes to be
|
||||
// happy.
|
||||
} else {
|
||||
// Here, on the other hand, we want to make sure that any file
|
||||
// announced from the remote is newer than our current sequence
|
||||
// number.
|
||||
s.remoteSequence[device] = 0
|
||||
s.meta.resetAll(device)
|
||||
}
|
||||
|
||||
s.meta.toDB(s.db, []byte(s.folder))
|
||||
}
|
||||
|
||||
func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
||||
@@ -181,12 +129,6 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
||||
s.updateMutex.Lock()
|
||||
defer s.updateMutex.Unlock()
|
||||
|
||||
s.updateLocked(device, fs)
|
||||
}
|
||||
|
||||
func (s *FileSet) updateLocked(device protocol.DeviceID, fs []protocol.FileInfo) {
|
||||
// names must be normalized and the lock held
|
||||
|
||||
if device == protocol.LocalDeviceID {
|
||||
discards := make([]protocol.FileInfo, 0, len(fs))
|
||||
updates := make([]protocol.FileInfo, 0, len(fs))
|
||||
@@ -200,7 +142,7 @@ func (s *FileSet) updateLocked(device protocol.DeviceID, fs []protocol.FileInfo)
|
||||
continue
|
||||
}
|
||||
|
||||
nf.Sequence = atomic.AddInt64(&s.sequence, 1)
|
||||
nf.Sequence = s.meta.nextSeq(protocol.LocalDeviceID)
|
||||
fs = append(fs, nf)
|
||||
|
||||
if ok {
|
||||
@@ -210,10 +152,10 @@ func (s *FileSet) updateLocked(device protocol.DeviceID, fs []protocol.FileInfo)
|
||||
}
|
||||
s.blockmap.Discard(discards)
|
||||
s.blockmap.Update(updates)
|
||||
} else {
|
||||
s.remoteSequence[device] = maxSequence(fs)
|
||||
}
|
||||
s.db.updateFiles([]byte(s.folder), device[:], fs, &s.localSize, &s.globalSize)
|
||||
|
||||
s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta)
|
||||
s.meta.toDB(s.db, []byte(s.folder))
|
||||
}
|
||||
|
||||
func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) {
|
||||
@@ -298,21 +240,15 @@ func (s *FileSet) Availability(file string) []protocol.DeviceID {
|
||||
}
|
||||
|
||||
func (s *FileSet) Sequence(device protocol.DeviceID) int64 {
|
||||
if device == protocol.LocalDeviceID {
|
||||
return atomic.LoadInt64(&s.sequence)
|
||||
}
|
||||
|
||||
s.updateMutex.Lock()
|
||||
defer s.updateMutex.Unlock()
|
||||
return s.remoteSequence[device]
|
||||
return s.meta.Counts(device).Sequence
|
||||
}
|
||||
|
||||
func (s *FileSet) LocalSize() Counts {
|
||||
return s.localSize.Size()
|
||||
return s.meta.Counts(protocol.LocalDeviceID)
|
||||
}
|
||||
|
||||
func (s *FileSet) GlobalSize() Counts {
|
||||
return s.globalSize.Size()
|
||||
return s.meta.Counts(globalDeviceID)
|
||||
}
|
||||
|
||||
func (s *FileSet) IndexID(device protocol.DeviceID) protocol.IndexID {
|
||||
@@ -339,29 +275,7 @@ func (s *FileSet) MtimeFS() *fs.MtimeFS {
|
||||
}
|
||||
|
||||
func (s *FileSet) ListDevices() []protocol.DeviceID {
|
||||
s.updateMutex.Lock()
|
||||
devices := make([]protocol.DeviceID, 0, len(s.remoteSequence))
|
||||
for id, seq := range s.remoteSequence {
|
||||
if seq > 0 {
|
||||
devices = append(devices, id)
|
||||
}
|
||||
}
|
||||
s.updateMutex.Unlock()
|
||||
return devices
|
||||
}
|
||||
|
||||
// maxSequence returns the highest of the Sequence numbers found in
|
||||
// the given slice of FileInfos. This should really be the Sequence of
|
||||
// the last item, but Syncthing v0.14.0 and other implementations may not
|
||||
// implement update sorting....
|
||||
func maxSequence(fs []protocol.FileInfo) int64 {
|
||||
var max int64
|
||||
for _, f := range fs {
|
||||
if f.Sequence > max {
|
||||
max = f.Sequence
|
||||
}
|
||||
}
|
||||
return max
|
||||
return s.meta.devices()
|
||||
}
|
||||
|
||||
// DropFolder clears out all information related to the given folder from the
|
||||
@@ -369,6 +283,7 @@ func maxSequence(fs []protocol.FileInfo) int64 {
|
||||
func DropFolder(db *Instance, folder string) {
|
||||
db.dropFolder([]byte(folder))
|
||||
db.dropMtimes([]byte(folder))
|
||||
db.dropFolderMeta([]byte(folder))
|
||||
bm := &BlockMap{
|
||||
db: db,
|
||||
folder: db.folderIdx.ID([]byte(folder)),
|
||||
|
||||
Reference in New Issue
Block a user