There was a problem in iterating the sequence index that could result in missing updates. The issue is that while the index was (correctly) iterated in a snapshot, the actual file infos were read dirty outside of the snapshot. This fixes this by doing the reads inside the snapshot, and also updates a couple of other places that did the same thing more or less harmfully (I didn't investigate). To avoid similar issues in the future I did some renaming of the getFile* methods - the ones in a transaction are just getFile, while the ones directly on the database are variants of getFileDirty to highlight what's going on.
This commit is contained in:
parent
76af0cf07b
commit
1e69997ecd
@ -149,7 +149,7 @@ func TestUpdate0to3(t *testing.T) {
|
|||||||
|
|
||||||
updater.updateSchema0to1()
|
updater.updateSchema0to1()
|
||||||
|
|
||||||
if _, ok := db.getFile(db.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], []byte(slashPrefixed))); ok {
|
if _, ok := db.getFileDirty(folder, protocol.LocalDeviceID[:], []byte(slashPrefixed)); ok {
|
||||||
t.Error("File prefixed by '/' was not removed during transition to schema 1")
|
t.Error("File prefixed by '/' was not removed during transition to schema 1")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -99,6 +99,9 @@ func (db *instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
|
func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
|
||||||
|
t := db.newReadOnlyTransaction()
|
||||||
|
defer t.close()
|
||||||
|
|
||||||
if len(prefix) > 0 {
|
if len(prefix) > 0 {
|
||||||
unslashedPrefix := prefix
|
unslashedPrefix := prefix
|
||||||
if bytes.HasSuffix(prefix, []byte{'/'}) {
|
if bytes.HasSuffix(prefix, []byte{'/'}) {
|
||||||
@ -107,14 +110,11 @@ func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn It
|
|||||||
prefix = append(prefix, '/')
|
prefix = append(prefix, '/')
|
||||||
}
|
}
|
||||||
|
|
||||||
if f, ok := db.getFileTrunc(db.keyer.GenerateDeviceFileKey(nil, folder, device, unslashedPrefix), true); ok && !fn(f) {
|
if f, ok := t.getFileTrunc(db.keyer.GenerateDeviceFileKey(nil, folder, device, unslashedPrefix), true); ok && !fn(f) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t := db.newReadOnlyTransaction()
|
|
||||||
defer t.close()
|
|
||||||
|
|
||||||
dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, prefix)), nil)
|
dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, prefix)), nil)
|
||||||
defer dbi.Release()
|
defer dbi.Release()
|
||||||
|
|
||||||
@ -124,11 +124,7 @@ func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn It
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// The iterator function may keep a reference to the unmarshalled
|
f, err := unmarshalTrunc(dbi.Value(), truncate)
|
||||||
// struct, which in turn references the buffer it was unmarshalled
|
|
||||||
// from. dbi.Value() just returns an internal slice that it reuses, so
|
|
||||||
// we need to copy it.
|
|
||||||
f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Debugln("unmarshal error:", err)
|
l.Debugln("unmarshal error:", err)
|
||||||
continue
|
continue
|
||||||
@ -147,7 +143,7 @@ func (db *instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator)
|
|||||||
defer dbi.Release()
|
defer dbi.Release()
|
||||||
|
|
||||||
for dbi.Next() {
|
for dbi.Next() {
|
||||||
f, ok := db.getFile(dbi.Value())
|
f, ok := t.getFileByKey(dbi.Value())
|
||||||
if !ok {
|
if !ok {
|
||||||
l.Debugln("missing file for sequence number", db.keyer.SequenceFromSequenceKey(dbi.Key()))
|
l.Debugln("missing file for sequence number", db.keyer.SequenceFromSequenceKey(dbi.Key()))
|
||||||
continue
|
continue
|
||||||
@ -209,27 +205,21 @@ func (db *instance) withAllFolderTruncated(folder []byte, fn func(device []byte,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *instance) getFile(key []byte) (protocol.FileInfo, bool) {
|
func (db *instance) getFileDirty(folder, device, file []byte) (protocol.FileInfo, bool) {
|
||||||
if f, ok := db.getFileTrunc(key, false); ok {
|
key := db.keyer.GenerateDeviceFileKey(nil, folder, device, file)
|
||||||
return f.(protocol.FileInfo), true
|
|
||||||
}
|
|
||||||
return protocol.FileInfo{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
|
|
||||||
bs, err := db.Get(key, nil)
|
bs, err := db.Get(key, nil)
|
||||||
if err == leveldb.ErrNotFound {
|
if err == leveldb.ErrNotFound {
|
||||||
return nil, false
|
return protocol.FileInfo{}, false
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Debugln("surprise error:", err)
|
l.Debugln("surprise error:", err)
|
||||||
return nil, false
|
return protocol.FileInfo{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := unmarshalTrunc(bs, trunc)
|
var f protocol.FileInfo
|
||||||
if err != nil {
|
if err := f.Unmarshal(bs); err != nil {
|
||||||
l.Debugln("unmarshal error:", err)
|
l.Debugln("unmarshal error:", err)
|
||||||
return nil, false
|
return protocol.FileInfo{}, false
|
||||||
}
|
}
|
||||||
return f, true
|
return f, true
|
||||||
}
|
}
|
||||||
@ -256,7 +246,7 @@ func (db *instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []
|
|||||||
}
|
}
|
||||||
|
|
||||||
dk = db.keyer.GenerateDeviceFileKey(dk, folder, vl.Versions[0].Device, file)
|
dk = db.keyer.GenerateDeviceFileKey(dk, folder, vl.Versions[0].Device, file)
|
||||||
if fi, ok := db.getFileTrunc(dk, truncate); ok {
|
if fi, ok := t.getFileTrunc(dk, truncate); ok {
|
||||||
return gk, dk, fi, true
|
return gk, dk, fi, true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,6 +254,9 @@ func (db *instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
|
func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
|
||||||
|
t := db.newReadOnlyTransaction()
|
||||||
|
defer t.close()
|
||||||
|
|
||||||
if len(prefix) > 0 {
|
if len(prefix) > 0 {
|
||||||
unslashedPrefix := prefix
|
unslashedPrefix := prefix
|
||||||
if bytes.HasSuffix(prefix, []byte{'/'}) {
|
if bytes.HasSuffix(prefix, []byte{'/'}) {
|
||||||
@ -272,14 +265,11 @@ func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
|
|||||||
prefix = append(prefix, '/')
|
prefix = append(prefix, '/')
|
||||||
}
|
}
|
||||||
|
|
||||||
if f, ok := db.getGlobal(folder, unslashedPrefix, truncate); ok && !fn(f) {
|
if _, _, f, ok := db.getGlobalInto(t, nil, nil, folder, unslashedPrefix, truncate); ok && !fn(f) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t := db.newReadOnlyTransaction()
|
|
||||||
defer t.close()
|
|
||||||
|
|
||||||
dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, prefix)), nil)
|
dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, prefix)), nil)
|
||||||
defer dbi.Release()
|
defer dbi.Release()
|
||||||
|
|
||||||
@ -297,7 +287,7 @@ func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
|
|||||||
|
|
||||||
fk = db.keyer.GenerateDeviceFileKey(fk, folder, vl.Versions[0].Device, name)
|
fk = db.keyer.GenerateDeviceFileKey(fk, folder, vl.Versions[0].Device, name)
|
||||||
|
|
||||||
f, ok := db.getFileTrunc(fk, truncate)
|
f, ok := t.getFileTrunc(fk, truncate)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -504,7 +494,7 @@ func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) {
|
|||||||
newVL.Versions = append(newVL.Versions, version)
|
newVL.Versions = append(newVL.Versions, version)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
if fi, ok := db.getFile(fk); ok {
|
if fi, ok := t.getFileByKey(fk); ok {
|
||||||
meta.addFile(protocol.GlobalDeviceID, fi)
|
meta.addFile(protocol.GlobalDeviceID, fi)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -203,7 +203,7 @@ func (db *schemaUpdater) updateSchema2to3() {
|
|||||||
name := []byte(f.FileName())
|
name := []byte(f.FileName())
|
||||||
dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
|
dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
|
||||||
var v protocol.Vector
|
var v protocol.Vector
|
||||||
haveFile, ok := db.getFileTrunc(dk, true)
|
haveFile, ok := t.getFileTrunc(dk, true)
|
||||||
if ok {
|
if ok {
|
||||||
v = haveFile.FileVersion()
|
v = haveFile.FileVersion()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -161,11 +161,9 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
|||||||
// filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
// filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||||
oldFs := fs
|
oldFs := fs
|
||||||
fs = fs[:0]
|
fs = fs[:0]
|
||||||
var dk []byte
|
|
||||||
folder := []byte(s.folder)
|
folder := []byte(s.folder)
|
||||||
for _, nf := range oldFs {
|
for _, nf := range oldFs {
|
||||||
dk = s.db.keyer.GenerateDeviceFileKey(dk, folder, device[:], []byte(osutil.NormalizedFilename(nf.Name)))
|
ef, ok := s.db.getFileDirty(folder, device[:], []byte(osutil.NormalizedFilename(nf.Name)))
|
||||||
ef, ok := s.db.getFile(dk)
|
|
||||||
if ok && ef.Version.Equal(nf.Version) && ef.IsInvalid() == nf.IsInvalid() {
|
if ok && ef.Version.Equal(nf.Version) && ef.IsInvalid() == nf.IsInvalid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -246,7 +244,7 @@ func (s *FileSet) WithPrefixedGlobalTruncated(prefix string, fn Iterator) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *FileSet) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) {
|
func (s *FileSet) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) {
|
||||||
f, ok := s.db.getFile(s.db.keyer.GenerateDeviceFileKey(nil, []byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file))))
|
f, ok := s.db.getFileDirty([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file)))
|
||||||
f.Name = osutil.NativeFilename(f.Name)
|
f.Name = osutil.NativeFilename(f.Name)
|
||||||
return f, ok
|
return f, ok
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1363,6 +1363,108 @@ func TestCaseSensitive(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSequenceIndex(t *testing.T) {
|
||||||
|
// This test attempts to verify correct operation of the sequence index.
|
||||||
|
|
||||||
|
// It's a stress test and needs to run for a long time, but we don't
|
||||||
|
// really have time for that in normal builds.
|
||||||
|
runtime := time.Minute
|
||||||
|
if testing.Short() {
|
||||||
|
runtime = time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up a db and a few files that we will manipulate.
|
||||||
|
|
||||||
|
ldb := db.OpenMemory()
|
||||||
|
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
|
||||||
|
|
||||||
|
local := []protocol.FileInfo{
|
||||||
|
{Name: filepath.FromSlash("banana"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}},
|
||||||
|
{Name: filepath.FromSlash("pineapple"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}},
|
||||||
|
{Name: filepath.FromSlash("orange"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}},
|
||||||
|
{Name: filepath.FromSlash("apple"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}},
|
||||||
|
{Name: filepath.FromSlash("jackfruit"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a background routine that makes updates to these files as fast
|
||||||
|
// as it can. We always update the same files in the same order.
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range local {
|
||||||
|
local[i].Version = local[i].Version.Update(42)
|
||||||
|
}
|
||||||
|
s.Update(protocol.LocalDeviceID, local)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start a routine to walk the sequence index and inspect the result.
|
||||||
|
|
||||||
|
seen := make(map[string]db.FileIntf)
|
||||||
|
latest := make([]db.FileIntf, 0, len(local))
|
||||||
|
var seq int64
|
||||||
|
t0 := time.Now()
|
||||||
|
|
||||||
|
for time.Since(t0) < runtime {
|
||||||
|
// Walk the changes since our last iteration. This should give is
|
||||||
|
// one instance each of the files that are changed all the time, or
|
||||||
|
// a subset of those files if we manage to run before a complete
|
||||||
|
// update has happened since our last iteration.
|
||||||
|
latest = latest[:0]
|
||||||
|
s.WithHaveSequence(seq+1, func(f db.FileIntf) bool {
|
||||||
|
seen[f.FileName()] = f
|
||||||
|
latest = append(latest, f)
|
||||||
|
seq = f.SequenceNo()
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Calculate the spread in sequence number.
|
||||||
|
var max, min int64
|
||||||
|
for _, v := range seen {
|
||||||
|
s := v.SequenceNo()
|
||||||
|
if max == 0 || max < s {
|
||||||
|
max = s
|
||||||
|
}
|
||||||
|
if min == 0 || min > s {
|
||||||
|
min = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We shouldn't see a spread larger than the number of files, as
|
||||||
|
// that would mean we have missed updates. For example, if we were
|
||||||
|
// to see the following:
|
||||||
|
//
|
||||||
|
// banana N
|
||||||
|
// pineapple N+1
|
||||||
|
// orange N+2
|
||||||
|
// apple N+10
|
||||||
|
// jackfruit N+11
|
||||||
|
//
|
||||||
|
// that would mean that there have been updates to banana, pineapple
|
||||||
|
// and orange that we didn't see in this pass. If those files aren't
|
||||||
|
// updated again, those updates are permanently lost.
|
||||||
|
if max-min > int64(len(local)) {
|
||||||
|
for _, v := range seen {
|
||||||
|
t.Log("seen", v.FileName(), v.SequenceNo())
|
||||||
|
}
|
||||||
|
for _, v := range latest {
|
||||||
|
t.Log("latest", v.FileName(), v.SequenceNo())
|
||||||
|
}
|
||||||
|
t.Fatal("large spread")
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func replace(fs *db.FileSet, device protocol.DeviceID, files []protocol.FileInfo) {
|
func replace(fs *db.FileSet, device protocol.DeviceID, files []protocol.FileInfo) {
|
||||||
fs.Drop(device)
|
fs.Drop(device)
|
||||||
fs.Update(device, files)
|
fs.Update(device, files)
|
||||||
|
|||||||
@ -164,7 +164,7 @@ func (vl VersionList) String() string {
|
|||||||
// update brings the VersionList up to date with file. It returns the updated
|
// update brings the VersionList up to date with file. It returns the updated
|
||||||
// VersionList, a potentially removed old FileVersion and its index, as well as
|
// VersionList, a potentially removed old FileVersion and its index, as well as
|
||||||
// the index where the new FileVersion was inserted.
|
// the index where the new FileVersion was inserted.
|
||||||
func (vl VersionList) update(folder, device []byte, file protocol.FileInfo, db *instance) (_ VersionList, removedFV FileVersion, removedAt int, insertedAt int) {
|
func (vl VersionList) update(folder, device []byte, file protocol.FileInfo, t readOnlyTransaction) (_ VersionList, removedFV FileVersion, removedAt int, insertedAt int) {
|
||||||
vl, removedFV, removedAt = vl.pop(device)
|
vl, removedFV, removedAt = vl.pop(device)
|
||||||
|
|
||||||
nv := FileVersion{
|
nv := FileVersion{
|
||||||
@ -198,7 +198,7 @@ func (vl VersionList) update(folder, device []byte, file protocol.FileInfo, db *
|
|||||||
// to determine the winner.)
|
// to determine the winner.)
|
||||||
//
|
//
|
||||||
// A surprise missing file entry here is counted as a win for us.
|
// A surprise missing file entry here is counted as a win for us.
|
||||||
if of, ok := db.getFile(db.keyer.GenerateDeviceFileKey(nil, folder, vl.Versions[i].Device, []byte(file.Name))); !ok || file.WinsConflict(of) {
|
if of, ok := t.getFile(folder, vl.Versions[i].Device, []byte(file.Name)); !ok || file.WinsConflict(of) {
|
||||||
vl = vl.insertAt(i, nv)
|
vl = vl.insertAt(i, nv)
|
||||||
return vl, removedFV, removedAt, i
|
return vl, removedFV, removedAt, i
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,32 @@ func (t readOnlyTransaction) close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
|
func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
|
||||||
return t.db.getFile(t.db.keyer.GenerateDeviceFileKey(nil, folder, device, file))
|
return t.getFileByKey(t.db.keyer.GenerateDeviceFileKey(nil, folder, device, file))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t readOnlyTransaction) getFileByKey(key []byte) (protocol.FileInfo, bool) {
|
||||||
|
if f, ok := t.getFileTrunc(key, false); ok {
|
||||||
|
return f.(protocol.FileInfo), true
|
||||||
|
}
|
||||||
|
return protocol.FileInfo{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t readOnlyTransaction) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
|
||||||
|
bs, err := t.Get(key, nil)
|
||||||
|
if err == leveldb.ErrNotFound {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
l.Debugln("surprise error:", err)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := unmarshalTrunc(bs, trunc)
|
||||||
|
if err != nil {
|
||||||
|
l.Debugln("unmarshal error:", err)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return f, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// A readWriteTransaction is a readOnlyTransaction plus a batch for writes.
|
// A readWriteTransaction is a readOnlyTransaction plus a batch for writes.
|
||||||
@ -90,7 +115,7 @@ func (t readWriteTransaction) updateGlobal(gk, folder, device []byte, file proto
|
|||||||
if svl, err := t.Get(gk, nil); err == nil {
|
if svl, err := t.Get(gk, nil); err == nil {
|
||||||
fl.Unmarshal(svl) // Ignore error, continue with empty fl
|
fl.Unmarshal(svl) // Ignore error, continue with empty fl
|
||||||
}
|
}
|
||||||
fl, removedFV, removedAt, insertedAt := fl.update(folder, device, file, t.db)
|
fl, removedFV, removedAt, insertedAt := fl.update(folder, device, file, t.readOnlyTransaction)
|
||||||
if insertedAt == -1 {
|
if insertedAt == -1 {
|
||||||
l.Debugln("update global; same version, global unchanged")
|
l.Debugln("update global; same version, global unchanged")
|
||||||
return false
|
return false
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user