lib/db: Fix sequence updating for remote invalid items (#5420)

* lib/db: Fix sequence updating for remote invalid items

* fix for the unit test introduced in the previous commit

* lib/db: Polish blockmap
This commit is contained in:
Simon Frei 2019-01-04 20:19:10 +01:00 committed by Jakob Borg
parent 04070b4848
commit 158559023e
4 changed files with 75 additions and 51 deletions

View File

@ -39,12 +39,7 @@ func (m *BlockMap) Add(files []protocol.FileInfo) error {
buf := make([]byte, 4) buf := make([]byte, 4)
var key []byte var key []byte
for _, file := range files { for _, file := range files {
if batch.Len() > maxBatchSize { m.checkFlush(batch)
if err := m.db.Write(batch, nil); err != nil {
return err
}
batch.Reset()
}
if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() { if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() {
continue continue
@ -65,29 +60,21 @@ func (m *BlockMap) Update(files []protocol.FileInfo) error {
buf := make([]byte, 4) buf := make([]byte, 4)
var key []byte var key []byte
for _, file := range files { for _, file := range files {
if batch.Len() > maxBatchSize { m.checkFlush(batch)
if err := m.db.Write(batch, nil); err != nil {
return err
}
batch.Reset()
}
if file.IsDirectory() { switch {
continue case file.IsDirectory():
} case file.IsDeleted() || file.IsInvalid():
if file.IsDeleted() || file.IsInvalid() {
for _, block := range file.Blocks { for _, block := range file.Blocks {
key = m.blockKeyInto(key, block.Hash, file.Name) key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Delete(key) batch.Delete(key)
} }
continue default:
} for i, block := range file.Blocks {
binary.BigEndian.PutUint32(buf, uint32(i))
for i, block := range file.Blocks { key = m.blockKeyInto(key, block.Hash, file.Name)
binary.BigEndian.PutUint32(buf, uint32(i)) batch.Put(key, buf)
key = m.blockKeyInto(key, block.Hash, file.Name) }
batch.Put(key, buf)
} }
} }
return m.db.Write(batch, nil) return m.db.Write(batch, nil)
@ -98,33 +85,36 @@ func (m *BlockMap) Discard(files []protocol.FileInfo) error {
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
var key []byte var key []byte
for _, file := range files { for _, file := range files {
if batch.Len() > maxBatchSize { m.checkFlush(batch)
if err := m.db.Write(batch, nil); err != nil { m.discard(file, key, batch)
return err
}
batch.Reset()
}
for _, block := range file.Blocks {
key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Delete(key)
}
} }
return m.db.Write(batch, nil) return m.db.Write(batch, nil)
} }
func (m *BlockMap) discard(file protocol.FileInfo, key []byte, batch *leveldb.Batch) {
for _, block := range file.Blocks {
key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Delete(key)
}
}
func (m *BlockMap) checkFlush(batch *leveldb.Batch) error {
if batch.Len() > maxBatchSize {
if err := m.db.Write(batch, nil); err != nil {
return err
}
batch.Reset()
}
return nil
}
// Drop block map, removing all entries related to this block map from the db. // Drop block map, removing all entries related to this block map from the db.
func (m *BlockMap) Drop() error { func (m *BlockMap) Drop() error {
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
iter := m.db.NewIterator(util.BytesPrefix(m.blockKeyInto(nil, nil, "")[:keyPrefixLen+keyFolderLen]), nil) iter := m.db.NewIterator(util.BytesPrefix(m.blockKeyInto(nil, nil, "")[:keyPrefixLen+keyFolderLen]), nil)
defer iter.Release() defer iter.Release()
for iter.Next() { for iter.Next() {
if batch.Len() > maxBatchSize { m.checkFlush(batch)
if err := m.db.Write(batch, nil); err != nil {
return err
}
batch.Reset()
}
batch.Delete(iter.Key()) batch.Delete(iter.Key())
} }

View File

@ -107,16 +107,18 @@ func (m *metadataTracker) countsPtr(dev protocol.DeviceID, flags uint32) *Counts
// addFile adds a file to the counts, adjusting the sequence number as // addFile adds a file to the counts, adjusting the sequence number as
// appropriate // appropriate
func (m *metadataTracker) addFile(dev protocol.DeviceID, f FileIntf) { func (m *metadataTracker) addFile(dev protocol.DeviceID, f FileIntf) {
if f.IsInvalid() && f.FileLocalFlags() == 0 {
// This is a remote invalid file; it does not count.
return
}
m.mut.Lock() m.mut.Lock()
defer m.mut.Unlock() defer m.mut.Unlock()
m.dirty = true m.dirty = true
m.updateSeqLocked(dev, f)
if f.IsInvalid() && f.FileLocalFlags() == 0 {
// This is a remote invalid file; it does not count.
return
}
if flags := f.FileLocalFlags(); flags == 0 { if flags := f.FileLocalFlags(); flags == 0 {
// Account regular files in the zero-flags bucket. // Account regular files in the zero-flags bucket.
m.addFileLocked(dev, 0, f) m.addFileLocked(dev, 0, f)
@ -128,6 +130,21 @@ func (m *metadataTracker) addFile(dev protocol.DeviceID, f FileIntf) {
} }
} }
func (m *metadataTracker) Sequence(dev protocol.DeviceID) int64 {
m.mut.Lock()
defer m.mut.Unlock()
return m.countsPtr(dev, 0).Sequence
}
func (m *metadataTracker) updateSeqLocked(dev protocol.DeviceID, f FileIntf) {
if dev == protocol.GlobalDeviceID {
return
}
if cp := m.countsPtr(dev, 0); f.SequenceNo() > cp.Sequence {
cp.Sequence = f.SequenceNo()
}
}
func (m *metadataTracker) addFileLocked(dev protocol.DeviceID, flags uint32, f FileIntf) { func (m *metadataTracker) addFileLocked(dev protocol.DeviceID, flags uint32, f FileIntf) {
cp := m.countsPtr(dev, flags) cp := m.countsPtr(dev, flags)
@ -142,10 +159,6 @@ func (m *metadataTracker) addFileLocked(dev protocol.DeviceID, flags uint32, f F
cp.Files++ cp.Files++
} }
cp.Bytes += f.FileSize() cp.Bytes += f.FileSize()
if seq := f.SequenceNo(); seq > cp.Sequence {
cp.Sequence = seq
}
} }
// removeFile removes a file from the counts // removeFile removes a file from the counts

View File

@ -80,3 +80,24 @@ func TestMetaDevices(t *testing.T) {
t.Error("second device should be d2") t.Error("second device should be d2")
} }
} }
func TestMetaSequences(t *testing.T) {
d1 := protocol.DeviceID{1}
meta := newMetadataTracker()
meta.addFile(d1, protocol.FileInfo{Sequence: 1})
meta.addFile(d1, protocol.FileInfo{Sequence: 2, RawInvalid: true})
meta.addFile(d1, protocol.FileInfo{Sequence: 3})
meta.addFile(d1, protocol.FileInfo{Sequence: 4, RawInvalid: true})
meta.addFile(protocol.LocalDeviceID, protocol.FileInfo{Sequence: 1})
meta.addFile(protocol.LocalDeviceID, protocol.FileInfo{Sequence: 2})
meta.addFile(protocol.LocalDeviceID, protocol.FileInfo{Sequence: 3, LocalFlags: 1})
meta.addFile(protocol.LocalDeviceID, protocol.FileInfo{Sequence: 4, LocalFlags: 2})
if seq := meta.Sequence(d1); seq != 4 {
t.Error("sequence of first device should be 4, not", seq)
}
if seq := meta.Sequence(protocol.LocalDeviceID); seq != 4 {
t.Error("sequence of first device should be 4, not", seq)
}
}

View File

@ -276,7 +276,7 @@ func (s *FileSet) Availability(file string) []protocol.DeviceID {
} }
func (s *FileSet) Sequence(device protocol.DeviceID) int64 { func (s *FileSet) Sequence(device protocol.DeviceID) int64 {
return s.meta.Counts(device, 0).Sequence return s.meta.Sequence(device)
} }
func (s *FileSet) LocalSize() Counts { func (s *FileSet) LocalSize() Counts {