@@ -9,12 +9,15 @@ package db
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/storage"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -34,6 +37,8 @@ type Lowlevel struct {
|
||||
location string
|
||||
folderIdx *smallIndex
|
||||
deviceIdx *smallIndex
|
||||
closed bool
|
||||
closeMut *sync.RWMutex
|
||||
}
|
||||
|
||||
// Open attempts to open the database at the given location, and runs
|
||||
@@ -103,6 +108,36 @@ func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error {
|
||||
return db.DB.Delete(key, wo)
|
||||
}
|
||||
|
||||
func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
||||
db.closeMut.RLock()
|
||||
defer db.closeMut.RUnlock()
|
||||
if db.closed {
|
||||
return &closedIter{}
|
||||
}
|
||||
return db.DB.NewIterator(slice, ro)
|
||||
}
|
||||
|
||||
func (db *Lowlevel) GetSnapshot() snapshot {
|
||||
snap, err := db.DB.GetSnapshot()
|
||||
if err != nil {
|
||||
if err == leveldb.ErrClosed {
|
||||
return &closedSnap{}
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
return snap
|
||||
}
|
||||
|
||||
func (db *Lowlevel) Close() {
|
||||
db.closeMut.Lock()
|
||||
defer db.closeMut.Unlock()
|
||||
if db.closed {
|
||||
return
|
||||
}
|
||||
db.closed = true
|
||||
db.DB.Close()
|
||||
}
|
||||
|
||||
// NewLowlevel wraps the given *leveldb.DB into a *lowlevel
|
||||
func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
|
||||
return &Lowlevel{
|
||||
@@ -110,6 +145,7 @@ func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
|
||||
location: location,
|
||||
folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
|
||||
deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
|
||||
closeMut: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,7 +186,37 @@ func (b *batch) checkFlush() {
|
||||
}
|
||||
|
||||
func (b *batch) flush() {
|
||||
if err := b.db.Write(b.Batch, nil); err != nil {
|
||||
if err := b.db.Write(b.Batch, nil); err != nil && err != leveldb.ErrClosed {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
type closedIter struct{}
|
||||
|
||||
func (it *closedIter) Release() {}
|
||||
func (it *closedIter) Key() []byte { return nil }
|
||||
func (it *closedIter) Value() []byte { return nil }
|
||||
func (it *closedIter) Next() bool { return false }
|
||||
func (it *closedIter) Prev() bool { return false }
|
||||
func (it *closedIter) First() bool { return false }
|
||||
func (it *closedIter) Last() bool { return false }
|
||||
func (it *closedIter) Seek(key []byte) bool { return false }
|
||||
func (it *closedIter) Valid() bool { return false }
|
||||
func (it *closedIter) Error() error { return leveldb.ErrClosed }
|
||||
func (it *closedIter) SetReleaser(releaser util.Releaser) {}
|
||||
|
||||
type snapshot interface {
|
||||
Get([]byte, *opt.ReadOptions) ([]byte, error)
|
||||
Has([]byte, *opt.ReadOptions) (bool, error)
|
||||
NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator
|
||||
Release()
|
||||
}
|
||||
|
||||
type closedSnap struct{}
|
||||
|
||||
func (s *closedSnap) Get([]byte, *opt.ReadOptions) ([]byte, error) { return nil, leveldb.ErrClosed }
|
||||
func (s *closedSnap) Has([]byte, *opt.ReadOptions) (bool, error) { return false, leveldb.ErrClosed }
|
||||
func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator {
|
||||
return &closedIter{}
|
||||
}
|
||||
func (s *closedSnap) Release() {}
|
||||
|
||||
Reference in New Issue
Block a user