lib/db: Add index to track locally needed files (#4958)
To optimize WithNeed, which is called for the local device whenever an index update is received. No tracking for remote devices to conserve db space, as WithNeed is only queried for completion.
This commit is contained in:
@@ -103,6 +103,9 @@ func (db *Instance) UpdateSchema() {
|
||||
if prevVersion <= 1 {
|
||||
db.updateSchema1to2()
|
||||
}
|
||||
if prevVersion <= 2 {
|
||||
db.updateSchema2to3()
|
||||
}
|
||||
|
||||
l.Infof("Finished updating database schema version from %v to %v", prevVersion, dbVersion)
|
||||
miscDB.PutInt64("dbVersion", dbVersion)
|
||||
@@ -310,26 +313,32 @@ func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
|
||||
}
|
||||
|
||||
func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
|
||||
k := db.globalKey(folder, file)
|
||||
|
||||
t := db.newReadOnlyTransaction()
|
||||
defer t.close()
|
||||
|
||||
bs, err := t.Get(k, nil)
|
||||
_, _, f, ok := db.getGlobalInto(t, nil, nil, folder, file, truncate)
|
||||
return f, ok
|
||||
}
|
||||
|
||||
func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
|
||||
gk = db.globalKeyInto(gk, folder, file)
|
||||
|
||||
bs, err := t.Get(gk, nil)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
return gk, dk, nil, false
|
||||
}
|
||||
|
||||
vl, ok := unmarshalVersionList(bs)
|
||||
if !ok {
|
||||
return nil, false
|
||||
return gk, dk, nil, false
|
||||
}
|
||||
|
||||
if fi, ok := db.getFileTrunc(db.deviceKey(folder, vl.Versions[0].Device, file), truncate); ok {
|
||||
return fi, true
|
||||
dk = db.deviceKeyInto(dk, folder, vl.Versions[0].Device, file)
|
||||
if fi, ok := db.getFileTrunc(dk, truncate); ok {
|
||||
return gk, dk, fi, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
return gk, dk, nil, false
|
||||
}
|
||||
|
||||
func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
|
||||
@@ -409,6 +418,11 @@ func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
|
||||
}
|
||||
|
||||
func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
|
||||
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
|
||||
db.withNeedLocal(folder, truncate, fn)
|
||||
return
|
||||
}
|
||||
|
||||
t := db.newReadOnlyTransaction()
|
||||
defer t.close()
|
||||
|
||||
@@ -422,22 +436,11 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator)
|
||||
continue
|
||||
}
|
||||
|
||||
have := false // If we have the file, any version
|
||||
need := false // If we have a lower version of the file
|
||||
var haveFileVersion FileVersion
|
||||
for _, v := range vl.Versions {
|
||||
if bytes.Equal(v.Device, device) {
|
||||
have = true
|
||||
haveFileVersion = v
|
||||
// XXX: This marks Concurrent (i.e. conflicting) changes as
|
||||
// needs. Maybe we should do that, but it needs special
|
||||
// handling in the puller.
|
||||
need = !v.Version.GreaterEqual(vl.Versions[0].Version)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if have && !need {
|
||||
haveFV, have := vl.Get(device)
|
||||
// XXX: This marks Concurrent (i.e. conflicting) changes as
|
||||
// needs. Maybe we should do that, but it needs special
|
||||
// handling in the puller.
|
||||
if have && haveFV.Version.GreaterEqual(vl.Versions[0].Version) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -474,7 +477,7 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator)
|
||||
break
|
||||
}
|
||||
|
||||
l.Debugf("need folder=%q device=%v name=%q need=%v have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, need, have, haveFileVersion.Invalid, haveFileVersion.Version, needVersion, needDevice)
|
||||
l.Debugf("need folder=%q device=%v name=%q have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, have, haveFV.Invalid, haveFV.Version, needVersion, needDevice)
|
||||
|
||||
if !fn(gf) {
|
||||
return
|
||||
@@ -486,6 +489,28 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator)
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
|
||||
t := db.newReadOnlyTransaction()
|
||||
defer t.close()
|
||||
|
||||
dbi := t.NewIterator(util.BytesPrefix(db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
|
||||
defer dbi.Release()
|
||||
|
||||
var dk []byte
|
||||
var gk []byte
|
||||
var f FileIntf
|
||||
var ok bool
|
||||
for dbi.Next() {
|
||||
gk, dk, f, ok = db.getGlobalInto(t, gk, dk, folder, db.globalKeyName(dbi.Key()), truncate)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if !fn(f) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) ListFolders() []string {
|
||||
t := db.newReadOnlyTransaction()
|
||||
defer t.close()
|
||||
@@ -511,36 +536,21 @@ func (db *Instance) ListFolders() []string {
|
||||
}
|
||||
|
||||
func (db *Instance) dropFolder(folder []byte) {
|
||||
t := db.newReadOnlyTransaction()
|
||||
t := db.newReadWriteTransaction()
|
||||
defer t.close()
|
||||
|
||||
// Remove all items related to the given folder from the device->file bucket
|
||||
dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeDevice}), nil)
|
||||
for dbi.Next() {
|
||||
itemFolder := db.deviceKeyFolder(dbi.Key())
|
||||
if bytes.Equal(folder, itemFolder) {
|
||||
db.Delete(dbi.Key(), nil)
|
||||
}
|
||||
for _, key := range [][]byte{
|
||||
// Remove all items related to the given folder from the device->file bucket
|
||||
db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen],
|
||||
// Remove all sequences related to the folder
|
||||
db.sequenceKey([]byte(folder), 0)[:keyPrefixLen+keyFolderLen],
|
||||
// Remove all items related to the given folder from the global bucket
|
||||
db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen],
|
||||
// Remove all needs related to the folder
|
||||
db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen],
|
||||
} {
|
||||
t.deleteKeyPrefix(key)
|
||||
}
|
||||
dbi.Release()
|
||||
|
||||
// Remove all sequences related to the folder
|
||||
sequenceKey := db.sequenceKey([]byte(folder), 0)
|
||||
dbi = t.NewIterator(util.BytesPrefix(sequenceKey[:keyPrefixLen+keyFolderLen]), nil)
|
||||
for dbi.Next() {
|
||||
db.Delete(dbi.Key(), nil)
|
||||
}
|
||||
dbi.Release()
|
||||
|
||||
// Remove all items related to the given folder from the global bucket
|
||||
dbi = t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
|
||||
for dbi.Next() {
|
||||
itemFolder, ok := db.globalKeyFolder(dbi.Key())
|
||||
if ok && bytes.Equal(folder, itemFolder) {
|
||||
db.Delete(dbi.Key(), nil)
|
||||
}
|
||||
}
|
||||
dbi.Release()
|
||||
}
|
||||
|
||||
func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
|
||||
@@ -680,13 +690,14 @@ func (db *Instance) updateSchema0to1() {
|
||||
l.Infof("Updated symlink type for %d index entries and added %d invalid files to global list", symlinkConv, ignAdded)
|
||||
}
|
||||
|
||||
// updateSchema1to2 introduces a sequenceKey->deviceKey bucket for local items
|
||||
// to allow iteration in sequence order (simplifies sending indexes).
|
||||
func (db *Instance) updateSchema1to2() {
|
||||
t := db.newReadWriteTransaction()
|
||||
defer t.close()
|
||||
|
||||
var sk []byte
|
||||
var dk []byte
|
||||
|
||||
for _, folderStr := range db.ListFolders() {
|
||||
folder := []byte(folderStr)
|
||||
db.withHave(folder, protocol.LocalDeviceID[:], nil, true, func(f FileIntf) bool {
|
||||
@@ -699,6 +710,34 @@ func (db *Instance) updateSchema1to2() {
|
||||
}
|
||||
}
|
||||
|
||||
// updateSchema2to3 introduces a needKey->nil bucket for locally needed files.
|
||||
func (db *Instance) updateSchema2to3() {
|
||||
t := db.newReadWriteTransaction()
|
||||
defer t.close()
|
||||
|
||||
var nk []byte
|
||||
var dk []byte
|
||||
for _, folderStr := range db.ListFolders() {
|
||||
folder := []byte(folderStr)
|
||||
db.withGlobal(folder, nil, true, func(f FileIntf) bool {
|
||||
name := []byte(f.FileName())
|
||||
dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], name)
|
||||
var v protocol.Vector
|
||||
haveFile, ok := db.getFileTrunc(dk, true)
|
||||
if ok {
|
||||
v = haveFile.FileVersion()
|
||||
}
|
||||
if !need(f, ok, v) {
|
||||
return true
|
||||
}
|
||||
nk = t.db.needKeyInto(nk, folder, []byte(f.FileName()))
|
||||
t.Put(nk, nil)
|
||||
t.checkFlush()
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// deviceKey returns a byte slice encoding the following information:
|
||||
// keyTypeDevice (1 byte)
|
||||
// folder (4 bytes)
|
||||
@@ -755,7 +794,7 @@ func (db *Instance) globalKeyInto(gk, folder, file []byte) []byte {
|
||||
gk[0] = KeyTypeGlobal
|
||||
binary.BigEndian.PutUint32(gk[keyPrefixLen:], db.folderIdx.ID(folder))
|
||||
copy(gk[keyPrefixLen+keyFolderLen:], file)
|
||||
return gk[:reqLen]
|
||||
return gk
|
||||
}
|
||||
|
||||
// globalKeyName returns the filename from the key
|
||||
@@ -768,6 +807,17 @@ func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) {
|
||||
return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
|
||||
}
|
||||
|
||||
// needKey is a globalKey with a different prefix
|
||||
func (db *Instance) needKey(folder, file []byte) []byte {
|
||||
return db.needKeyInto(nil, folder, file)
|
||||
}
|
||||
|
||||
func (db *Instance) needKeyInto(k, folder, file []byte) []byte {
|
||||
k = db.globalKeyInto(k, folder, file)
|
||||
k[0] = KeyTypeNeed
|
||||
return k
|
||||
}
|
||||
|
||||
// sequenceKey returns a byte slice encoding the following information:
|
||||
// KeyTypeSequence (1 byte)
|
||||
// folder (4 bytes)
|
||||
@@ -782,7 +832,7 @@ func (db *Instance) sequenceKeyInto(k []byte, folder []byte, seq int64) []byte {
|
||||
k[0] = KeyTypeSequence
|
||||
binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
|
||||
binary.BigEndian.PutUint64(k[keyPrefixLen+keyFolderLen:], uint64(seq))
|
||||
return k[:reqLen]
|
||||
return k
|
||||
}
|
||||
|
||||
// sequenceKeySequence returns the sequence number from the key
|
||||
|
||||
Reference in New Issue
Block a user