We should pass around db.Instance instead of leveldb.DB

We're going to need the db.Instance to keep some state, and for that to
work we need the same one passed around everywhere. Hence this moves the
leveldb-specific file opening stuff into the db package and exports the
dbInstance type.
This commit is contained in:
Jakob Borg
2015-10-31 12:31:25 +01:00
parent 313485e406
commit 2a4fc28318
20 changed files with 145 additions and 241 deletions

View File

@@ -8,27 +8,64 @@ package db
import (
"bytes"
"os"
"sort"
"strings"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator) int64
type dbInstance struct {
type Instance struct {
*leveldb.DB
}
func newDBInstance(db *leveldb.DB) *dbInstance {
return &dbInstance{
func Open(file string) (*Instance, error) {
opts := &opt.Options{
OpenFilesCacheCapacity: 100,
WriteBuffer: 4 << 20,
}
db, err := leveldb.OpenFile(file, opts)
if leveldbIsCorrupted(err) {
db, err = leveldb.RecoverFile(file, opts)
}
if leveldbIsCorrupted(err) {
// The database is corrupted, and we've tried to recover it but it
// didn't work. At this point there isn't much to do beyond dropping
// the database and reindexing...
l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
if err := os.RemoveAll(file); err != nil {
return nil, err
}
db, err = leveldb.OpenFile(file, opts)
}
if err != nil {
return nil, err
}
return newDBInstance(db), nil
}
func OpenMemory() *Instance {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
return newDBInstance(db)
}
func newDBInstance(db *leveldb.DB) *Instance {
return &Instance{
DB: db,
}
}
func (db *dbInstance) genericReplace(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker, deleteFn deletionHandler) int64 {
func (db *Instance) genericReplace(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker, deleteFn deletionHandler) int64 {
sort.Sort(fileList(fs)) // sort list on name, same as in the database
start := db.deviceKey(folder, device, nil) // before all folder/device files
@@ -126,7 +163,7 @@ func (db *dbInstance) genericReplace(folder, device []byte, fs []protocol.FileIn
return maxLocalVer
}
func (db *dbInstance) replace(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker) int64 {
func (db *Instance) replace(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker) int64 {
// TODO: Return the remaining maxLocalVer?
return db.genericReplace(folder, device, fs, localSize, globalSize, func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator) int64 {
// Database has a file that we are missing. Remove it.
@@ -137,7 +174,7 @@ func (db *dbInstance) replace(folder, device []byte, fs []protocol.FileInfo, loc
})
}
func (db *dbInstance) updateFiles(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker) int64 {
func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker) int64 {
t := db.newReadWriteTransaction()
defer t.close()
@@ -195,7 +232,7 @@ func (db *dbInstance) updateFiles(folder, device []byte, fs []protocol.FileInfo,
return maxLocalVer
}
func (db *dbInstance) withHave(folder, device []byte, truncate bool, fn Iterator) {
func (db *Instance) withHave(folder, device []byte, truncate bool, fn Iterator) {
start := db.deviceKey(folder, device, nil) // before all folder/device files
limit := db.deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
@@ -216,7 +253,7 @@ func (db *dbInstance) withHave(folder, device []byte, truncate bool, fn Iterator
}
}
func (db *dbInstance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
start := db.deviceKey(folder, nil, nil) // before all folder/device files
limit := db.deviceKey(folder, protocol.LocalDeviceID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
@@ -249,11 +286,11 @@ func (db *dbInstance) withAllFolderTruncated(folder []byte, fn func(device []byt
}
}
func (db *dbInstance) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
func (db *Instance) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
return getFile(db, db.deviceKey(folder, device, file))
}
func (db *dbInstance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
k := db.globalKey(folder, file)
t := db.newReadOnlyTransaction()
@@ -290,7 +327,7 @@ func (db *dbInstance) getGlobal(folder, file []byte, truncate bool) (FileIntf, b
return fi, true
}
func (db *dbInstance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
t := db.newReadOnlyTransaction()
defer t.close()
@@ -333,7 +370,7 @@ func (db *dbInstance) withGlobal(folder, prefix []byte, truncate bool, fn Iterat
}
}
func (db *dbInstance) availability(folder, file []byte) []protocol.DeviceID {
func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
k := db.globalKey(folder, file)
bs, err := db.Get(k, nil)
if err == leveldb.ErrNotFound {
@@ -361,7 +398,7 @@ func (db *dbInstance) availability(folder, file []byte) []protocol.DeviceID {
return devices
}
func (db *dbInstance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
start := db.globalKey(folder, nil)
limit := db.globalKey(folder, []byte{0xff, 0xff, 0xff, 0xff})
@@ -452,7 +489,7 @@ nextFile:
}
}
func (db *dbInstance) listFolders() []string {
func (db *Instance) ListFolders() []string {
t := db.newReadOnlyTransaction()
defer t.close()
@@ -476,7 +513,7 @@ func (db *dbInstance) listFolders() []string {
return folders
}
func (db *dbInstance) dropFolder(folder []byte) {
func (db *Instance) dropFolder(folder []byte) {
t := db.newReadOnlyTransaction()
defer t.close()
@@ -501,7 +538,7 @@ func (db *dbInstance) dropFolder(folder []byte) {
dbi.Release()
}
func (db *dbInstance) checkGlobals(folder []byte, globalSize *sizeTracker) {
func (db *Instance) checkGlobals(folder []byte, globalSize *sizeTracker) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -560,11 +597,11 @@ func (db *dbInstance) checkGlobals(folder []byte, globalSize *sizeTracker) {
// folder (64 bytes)
// device (32 bytes)
// name (variable size)
func (db *dbInstance) deviceKey(folder, device, file []byte) []byte {
func (db *Instance) deviceKey(folder, device, file []byte) []byte {
return db.deviceKeyInto(nil, folder, device, file)
}
func (db *dbInstance) deviceKeyInto(k []byte, folder, device, file []byte) []byte {
func (db *Instance) deviceKeyInto(k []byte, folder, device, file []byte) []byte {
reqLen := 1 + 64 + 32 + len(file)
if len(k) < reqLen {
k = make([]byte, reqLen)
@@ -579,11 +616,11 @@ func (db *dbInstance) deviceKeyInto(k []byte, folder, device, file []byte) []byt
return k[:reqLen]
}
func (db *dbInstance) deviceKeyName(key []byte) []byte {
func (db *Instance) deviceKeyName(key []byte) []byte {
return key[1+64+32:]
}
func (db *dbInstance) deviceKeyFolder(key []byte) []byte {
func (db *Instance) deviceKeyFolder(key []byte) []byte {
folder := key[1 : 1+64]
izero := bytes.IndexByte(folder, 0)
if izero < 0 {
@@ -592,7 +629,7 @@ func (db *dbInstance) deviceKeyFolder(key []byte) []byte {
return folder[:izero]
}
func (db *dbInstance) deviceKeyDevice(key []byte) []byte {
func (db *Instance) deviceKeyDevice(key []byte) []byte {
return key[1+64 : 1+64+32]
}
@@ -600,7 +637,7 @@ func (db *dbInstance) deviceKeyDevice(key []byte) []byte {
// keyTypeGlobal (1 byte)
// folder (64 bytes)
// name (variable size)
func (db *dbInstance) globalKey(folder, file []byte) []byte {
func (db *Instance) globalKey(folder, file []byte) []byte {
k := make([]byte, 1+64+len(file))
k[0] = KeyTypeGlobal
if len(folder) > 64 {
@@ -611,11 +648,11 @@ func (db *dbInstance) globalKey(folder, file []byte) []byte {
return k
}
func (db *dbInstance) globalKeyName(key []byte) []byte {
func (db *Instance) globalKeyName(key []byte) []byte {
return key[1+64:]
}
func (db *dbInstance) globalKeyFolder(key []byte) []byte {
func (db *Instance) globalKeyFolder(key []byte) []byte {
folder := key[1 : 1+64]
izero := bytes.IndexByte(folder, 0)
if izero < 0 {
@@ -635,3 +672,19 @@ func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
err := tf.UnmarshalXDR(bs)
return tf, err
}
// A "better" version of leveldb's errors.IsCorrupted.
func leveldbIsCorrupted(err error) bool {
switch {
case err == nil:
return false
case errors.IsCorrupted(err):
return true
case strings.Contains(err.Error(), "corrupted"):
return true
}
return false
}