Implement version vectors

This commit is contained in:
Jakob Borg
2015-03-25 22:37:35 +01:00
parent b4f45d1e79
commit 6da7f17c4a
9 changed files with 113 additions and 152 deletions

View File

@@ -17,7 +17,6 @@ import (
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/lamport"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -49,7 +48,7 @@ const (
)
type fileVersion struct {
version int64
version protocol.Vector
device []byte
}
@@ -242,8 +241,7 @@ func ldbGenericReplace(db *leveldb.DB, folder, device []byte, fs []protocol.File
}
var ef FileInfoTruncated
ef.UnmarshalXDR(dbi.Value())
if fs[fsi].Version > ef.Version ||
(fs[fsi].Version == ef.Version && fs[fsi].Flags != ef.Flags) {
if !fs[fsi].Version.Equal(ef.Version) || fs[fsi].Flags != ef.Flags {
if debugDB {
l.Debugln("generic replace; differs - insert")
}
@@ -315,7 +313,7 @@ func ldbReplace(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) i
})
}
func ldbReplaceWithDelete(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) int64 {
func ldbReplaceWithDelete(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo, myID uint64) int64 {
return ldbGenericReplace(db, folder, device, fs, func(db dbReader, batch dbWriter, folder, device, name []byte, dbi iterator.Iterator) int64 {
var tf FileInfoTruncated
err := tf.UnmarshalXDR(dbi.Value())
@@ -329,7 +327,7 @@ func ldbReplaceWithDelete(db *leveldb.DB, folder, device []byte, fs []protocol.F
ts := clock(tf.LocalVersion)
f := protocol.FileInfo{
Name: tf.Name,
Version: lamport.Default.Tick(tf.Version),
Version: tf.Version.Update(myID),
LocalVersion: ts,
Flags: tf.Flags | protocol.FlagDeleted,
Modified: tf.Modified,
@@ -394,7 +392,7 @@ func ldbUpdate(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) in
}
// Flags might change without the version being bumped when we set the
// invalid flag on an existing file.
if ef.Version != f.Version || ef.Flags != f.Flags {
if !ef.Version.Equal(f.Version) || ef.Flags != f.Flags {
if lv := ldbInsert(batch, folder, device, f); lv > maxLocalVer {
maxLocalVer = lv
}
@@ -454,7 +452,7 @@ func ldbInsert(batch dbWriter, folder, device []byte, file protocol.FileInfo) in
// ldbUpdateGlobal adds this device+version to the version list for the given
// file. If the device is already present in the list, the version is updated.
// If the file does not have an entry in the global list, it is created.
func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, version int64) bool {
func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, version protocol.Vector) bool {
if debugDB {
l.Debugf("update global; folder=%q device=%v file=%q version=%d", folder, protocol.DeviceIDFromBytes(device), file, version)
}
@@ -465,10 +463,8 @@ func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, v
}
var fl versionList
nv := fileVersion{
device: device,
version: version,
}
// Remove the device from the current version list
if svl != nil {
err = fl.UnmarshalXDR(svl)
if err != nil {
@@ -477,7 +473,7 @@ func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, v
for i := range fl.versions {
if bytes.Compare(fl.versions[i].device, device) == 0 {
if fl.versions[i].version == version {
if fl.versions[i].version.Equal(version) {
// No need to do anything
return false
}
@@ -487,8 +483,15 @@ func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, v
}
}
nv := fileVersion{
device: device,
version: version,
}
for i := range fl.versions {
if fl.versions[i].version <= version {
// We compare against ConcurrentLesser as well here because we need
// to enforce a consistent ordering of versions even in the case of
// conflicts.
if comp := fl.versions[i].version.Compare(version); comp == protocol.Equal || comp == protocol.Lesser || comp == protocol.ConcurrentLesser {
t := append(fl.versions, fileVersion{})
copy(t[i+1:], t[i:])
t[i] = nv
@@ -776,7 +779,7 @@ func ldbAvailability(db *leveldb.DB, folder, file []byte) []protocol.DeviceID {
var devices []protocol.DeviceID
for _, v := range vl.versions {
if v.version != vl.versions[0].version {
if !v.version.Equal(vl.versions[0].version) {
break
}
n := protocol.DeviceIDFromBytes(v.device)
@@ -808,7 +811,7 @@ func ldbWithNeed(db *leveldb.DB, folder, device []byte, truncate bool, fn Iterat
dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
defer dbi.Release()
outer:
nextFile:
for dbi.Next() {
var vl versionList
err := vl.UnmarshalXDR(dbi.Value())
@@ -822,12 +825,15 @@ outer:
have := false // If we have the file, any version
need := false // If we have a lower version of the file
var haveVersion int64
var haveVersion protocol.Vector
for _, v := range vl.versions {
if bytes.Compare(v.device, device) == 0 {
have = true
haveVersion = v.version
need = v.version < vl.versions[0].version
// XXX: This marks Concurrent (i.e. conflicting) changes as
// needs. Maybe we should do that, but it needs special
// handling in the puller.
need = !v.version.GreaterEqual(vl.versions[0].version)
break
}
}
@@ -835,11 +841,12 @@ outer:
if need || !have {
name := globalKeyName(dbi.Key())
needVersion := vl.versions[0].version
inner:
nextVersion:
for i := range vl.versions {
if vl.versions[i].version != needVersion {
if !vl.versions[i].version.Equal(needVersion) {
// We haven't found a valid copy of the file with the needed version.
continue outer
continue nextFile
}
fk := deviceKey(folder, vl.versions[i].device, name)
if debugDB {
@@ -866,12 +873,12 @@ outer:
if gf.IsInvalid() {
// The file is marked invalid for whatever reason, don't use it.
continue inner
continue nextVersion
}
if gf.IsDeleted() && !have {
// We don't need deleted files that we don't have
continue outer
continue nextFile
}
if debugDB {
@@ -883,7 +890,7 @@ outer:
}
// This file is handled, no need to look further in the version list
continue outer
continue nextFile
}
}
}

View File

@@ -18,9 +18,9 @@ fileVersion Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ version (64 bits) +
| |
/ /
\ Vector Structure \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of device |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@@ -31,7 +31,7 @@ fileVersion Structure:
struct fileVersion {
hyper version;
Vector version;
opaque device<>;
}
@@ -39,7 +39,7 @@ struct fileVersion {
func (o fileVersion) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
return o.EncodeXDRInto(xw)
}
func (o fileVersion) MarshalXDR() ([]byte, error) {
@@ -57,29 +57,32 @@ func (o fileVersion) MustMarshalXDR() []byte {
func (o fileVersion) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.encodeXDR(xw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o fileVersion) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteUint64(uint64(o.version))
func (o fileVersion) EncodeXDRInto(xw *xdr.Writer) (int, error) {
_, err := o.version.EncodeXDRInto(xw)
if err != nil {
return xw.Tot(), err
}
xw.WriteBytes(o.device)
return xw.Tot(), xw.Error()
}
func (o *fileVersion) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
return o.DecodeXDRFrom(xr)
}
func (o *fileVersion) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.decodeXDR(xr)
return o.DecodeXDRFrom(xr)
}
func (o *fileVersion) decodeXDR(xr *xdr.Reader) error {
o.version = int64(xr.ReadUint64())
func (o *fileVersion) DecodeXDRFrom(xr *xdr.Reader) error {
(&o.version).DecodeXDRFrom(xr)
o.device = xr.ReadBytes()
return xr.Error()
}
@@ -107,7 +110,7 @@ struct versionList {
func (o versionList) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
return o.EncodeXDRInto(xw)
}
func (o versionList) MarshalXDR() ([]byte, error) {
@@ -125,14 +128,14 @@ func (o versionList) MustMarshalXDR() []byte {
func (o versionList) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.encodeXDR(xw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o versionList) encodeXDR(xw *xdr.Writer) (int, error) {
func (o versionList) EncodeXDRInto(xw *xdr.Writer) (int, error) {
xw.WriteUint32(uint32(len(o.versions)))
for i := range o.versions {
_, err := o.versions[i].encodeXDR(xw)
_, err := o.versions[i].EncodeXDRInto(xw)
if err != nil {
return xw.Tot(), err
}
@@ -142,20 +145,20 @@ func (o versionList) encodeXDR(xw *xdr.Writer) (int, error) {
func (o *versionList) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
return o.DecodeXDRFrom(xr)
}
func (o *versionList) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.decodeXDR(xr)
return o.DecodeXDRFrom(xr)
}
func (o *versionList) decodeXDR(xr *xdr.Reader) error {
func (o *versionList) DecodeXDRFrom(xr *xdr.Reader) error {
_versionsSize := int(xr.ReadUint32())
o.versions = make([]fileVersion, _versionsSize)
for i := range o.versions {
(&o.versions[i]).decodeXDR(xr)
(&o.versions[i]).DecodeXDRFrom(xr)
}
return xr.Error()
}

View File

@@ -16,7 +16,6 @@ import (
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/lamport"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -61,7 +60,6 @@ func NewFileSet(folder string, db *leveldb.DB) *FileSet {
if f.LocalVersion > s.localVersion[deviceID] {
s.localVersion[deviceID] = f.LocalVersion
}
lamport.Default.Tick(f.Version)
return true
})
if debug {
@@ -90,14 +88,14 @@ func (s *FileSet) Replace(device protocol.DeviceID, fs []protocol.FileInfo) {
}
}
func (s *FileSet) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo) {
func (s *FileSet) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo, myID uint64) {
if debug {
l.Debugf("%s ReplaceWithDelete(%v, [%d])", s.folder, device, len(fs))
}
normalizeFilenames(fs)
s.mutex.Lock()
defer s.mutex.Unlock()
if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] {
if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs, myID); lv > s.localVersion[device] {
s.localVersion[device] = lv
}
if device == protocol.LocalDeviceID {
@@ -118,7 +116,7 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
updates := make([]protocol.FileInfo, 0, len(fs))
for _, newFile := range fs {
existingFile, ok := ldbGet(s.db, []byte(s.folder), device[:], []byte(newFile.Name))
if !ok || existingFile.Version <= newFile.Version {
if !ok || !existingFile.Version.Equal(newFile.Version) {
discards = append(discards, existingFile)
updates = append(updates, newFile)
}