diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index a6f13671..f075b0b0 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -928,7 +928,17 @@ func syncthingMain(runtimeOptions RuntimeOptions) { code := exit.waitForExit() mainService.Stop() - ldb.Close() + + done := make(chan struct{}) + go func() { + ldb.Close() + close(done) + }() + select { + case <-done: + case <-time.After(10 * time.Second): + l.Warnln("Database failed to stop within 10s") + } l.Infoln("Exiting") diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index 4c77ed74..eda38af6 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -39,6 +39,7 @@ type Lowlevel struct { deviceIdx *smallIndex closed bool closeMut *sync.RWMutex + iterWG sync.WaitGroup } // Open attempts to open the database at the given location, and runs @@ -99,42 +100,76 @@ func (db *Lowlevel) Committed() int64 { } func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error { + db.closeMut.RLock() + defer db.closeMut.RUnlock() + if db.closed { + return leveldb.ErrClosed + } atomic.AddInt64(&db.committed, 1) return db.DB.Put(key, val, wo) } +func (db *Lowlevel) Write(batch *leveldb.Batch, wo *opt.WriteOptions) error { + db.closeMut.RLock() + defer db.closeMut.RUnlock() + if db.closed { + return leveldb.ErrClosed + } + return db.DB.Write(batch, wo) +} + func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error { + db.closeMut.RLock() + defer db.closeMut.RUnlock() + if db.closed { + return leveldb.ErrClosed + } atomic.AddInt64(&db.committed, 1) return db.DB.Delete(key, wo) } func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + return db.newIterator(func() iterator.Iterator { return db.DB.NewIterator(slice, ro) }) +} + +// newIterator returns an iterator created with the given constructor only if db +// is not yet closed. If it is closed, a closedIter is returned instead. +func (db *Lowlevel) newIterator(constr func() iterator.Iterator) iterator.Iterator { db.closeMut.RLock() defer db.closeMut.RUnlock() if db.closed { return &closedIter{} } - return db.DB.NewIterator(slice, ro) + db.iterWG.Add(1) + return &iter{ + Iterator: constr(), + db: db, + } } func (db *Lowlevel) GetSnapshot() snapshot { - snap, err := db.DB.GetSnapshot() + s, err := db.DB.GetSnapshot() if err != nil { if err == leveldb.ErrClosed { return &closedSnap{} } panic(err) } - return snap + return &snap{ + Snapshot: s, + db: db, + } } func (db *Lowlevel) Close() { db.closeMut.Lock() - defer db.closeMut.Unlock() if db.closed { + db.closeMut.Unlock() return } db.closed = true + db.closeMut.Unlock() + db.iterWG.Wait() db.DB.Close() } @@ -146,6 +181,7 @@ func NewLowlevel(db *leveldb.DB, location string) *Lowlevel { folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}), deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}), closeMut: &sync.RWMutex{}, + iterWG: sync.WaitGroup{}, } } @@ -220,3 +256,51 @@ func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterato return &closedIter{} } func (s *closedSnap) Release() {} + +type snap struct { + *leveldb.Snapshot + db *Lowlevel +} + +func (s *snap) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + return s.db.newIterator(func() iterator.Iterator { return s.Snapshot.NewIterator(slice, ro) }) +} + +// iter implements iterator.Iterator which allows tracking active iterators +// and aborts if the underlying database is being closed. +type iter struct { + iterator.Iterator + db *Lowlevel +} + +func (it *iter) Release() { + it.db.iterWG.Done() + it.Iterator.Release() +} + +func (it *iter) Next() bool { + return it.execIfNotClosed(it.Iterator.Next) +} +func (it *iter) Prev() bool { + return it.execIfNotClosed(it.Iterator.Prev) +} +func (it *iter) First() bool { + return it.execIfNotClosed(it.Iterator.First) +} +func (it *iter) Last() bool { + return it.execIfNotClosed(it.Iterator.Last) +} +func (it *iter) Seek(key []byte) bool { + return it.execIfNotClosed(func() bool { + return it.Iterator.Seek(key) + }) +} + +func (it *iter) execIfNotClosed(fn func() bool) bool { + it.db.closeMut.RLock() + defer it.db.closeMut.RUnlock() + if it.db.closed { + return false + } + return fn() +}