diff --git a/cmd/stindex/main.go b/cmd/stindex/main.go index e18bb412..29b85a0f 100644 --- a/cmd/stindex/main.go +++ b/cmd/stindex/main.go @@ -32,7 +32,8 @@ func main() { if *node == "" { log.Printf("*** Global index for repo %q", *repo) - fs.WithGlobal(func(f protocol.FileInfo) bool { + fs.WithGlobalTruncated(func(fi protocol.FileIntf) bool { + f := fi.(protocol.FileInfoTruncated) fmt.Println(f) fmt.Println("\t", fs.Availability(f.Name)) return true @@ -43,7 +44,8 @@ func main() { log.Fatal(err) } log.Printf("*** Have index for repo %q node %q", *repo, n) - fs.WithHave(n, func(f protocol.FileInfo) bool { + fs.WithHaveTruncated(n, func(fi protocol.FileIntf) bool { + f := fi.(protocol.FileInfoTruncated) fmt.Println(f) return true }) diff --git a/files/leveldb.go b/files/leveldb.go index 912964ed..2e9fcf76 100644 --- a/files/leveldb.go +++ b/files/leveldb.go @@ -119,7 +119,7 @@ func globalKeyName(key []byte) []byte { type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 -type fileIterator func(f protocol.FileInfo) bool +type fileIterator func(f protocol.FileIntf) bool func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 { defer runtime.GC() @@ -181,7 +181,7 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo case moreFs && moreDb && cmp == 0: // File exists on both sides - compare versions. - var ef protocol.FileInfo + var ef protocol.FileInfoTruncated ef.UnmarshalXDR(dbi.Value()) if fs[fsi].Version > ef.Version { if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer { @@ -226,20 +226,23 @@ func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint6 func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 { return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 { - var f protocol.FileInfo - err := f.UnmarshalXDR(dbi.Value()) + var tf protocol.FileInfoTruncated + err := tf.UnmarshalXDR(dbi.Value()) if err != nil { panic(err) } - if !protocol.IsDeleted(f.Flags) { + if !tf.IsDeleted() { if debug { l.Debugf("mark deleted; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name) } - ts := clock(f.LocalVersion) - f.Blocks = nil - f.Version = lamport.Default.Tick(f.Version) - f.Flags |= protocol.FlagDeleted - f.LocalVersion = ts + ts := clock(tf.LocalVersion) + f := protocol.FileInfo{ + Name: tf.Name, + Version: lamport.Default.Tick(tf.Version), + LocalVersion: ts, + Flags: tf.Flags | protocol.FlagDeleted, + Modified: tf.Modified, + } batch.Put(dbi.Key(), f.MarshalXDR()) ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version) return ts @@ -271,7 +274,7 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 continue } - var ef protocol.FileInfo + var ef protocol.FileInfoTruncated err = ef.UnmarshalXDR(bs) if err != nil { panic(err) @@ -395,7 +398,7 @@ func ldbRemoveFromGlobal(db dbReader, batch dbWriter, repo, node, file []byte) { } } -func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) { +func ldbWithHave(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) { start := nodeKey(repo, node, nil) // before all repo/node files limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files snap, err := db.GetSnapshot() @@ -407,8 +410,7 @@ func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) { defer dbi.Release() for dbi.Next() { - var f protocol.FileInfo - err := f.UnmarshalXDR(dbi.Value()) + f, err := unmarshalTrunc(dbi.Value(), truncate) if err != nil { panic(err) } @@ -418,7 +420,7 @@ func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) { } } -func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfo) bool) { +func ldbWithAllRepoTruncated(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfoTruncated) bool) { defer runtime.GC() start := nodeKey(repo, nil, nil) // before all repo/node files @@ -433,7 +435,7 @@ func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol for dbi.Next() { node := nodeKeyNode(dbi.Key()) - var f protocol.FileInfo + var f protocol.FileInfoTruncated err := f.UnmarshalXDR(dbi.Value()) if err != nil { panic(err) @@ -444,40 +446,6 @@ func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol } } -/* -func ldbCheckGlobalConsistency(db *leveldb.DB, repo []byte) { - l.Debugf("Checking global consistency for %q", repo) - start := nodeKey(repo, nil, nil) // before all repo/node files - limit := nodeKey(repo, protocol.LocalNodeID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files - snap, err := db.GetSnapshot() - if err != nil { - panic(err) - } - defer snap.Release() - dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil) - defer dbi.Release() - - batch := new(leveldb.Batch) - i := 0 - for dbi.Next() { - repo := nodeKeyRepo(dbi.Key()) - node := nodeKeyNode(dbi.Key()) - var f protocol.FileInfo - err := f.UnmarshalXDR(dbi.Value()) - if err != nil { - panic(err) - } - if ldbUpdateGlobal(snap, batch, repo, node, []byte(f.Name), f.Version) { - var nodeID protocol.NodeID - copy(nodeID[:], node) - l.Debugf("fixed global for %q %s %q", repo, nodeID, f.Name) - } - i++ - } - l.Debugln("Done", i) -} -*/ - func ldbGet(db *leveldb.DB, repo, node, file []byte) protocol.FileInfo { nk := nodeKey(repo, node, file) bs, err := db.Get(nk, nil) @@ -536,7 +504,7 @@ func ldbGetGlobal(db *leveldb.DB, repo, file []byte) protocol.FileInfo { return f } -func ldbWithGlobal(db *leveldb.DB, repo []byte, fn fileIterator) { +func ldbWithGlobal(db *leveldb.DB, repo []byte, truncate bool, fn fileIterator) { defer runtime.GC() start := globalKey(repo, nil) @@ -565,8 +533,7 @@ func ldbWithGlobal(db *leveldb.DB, repo []byte, fn fileIterator) { panic(err) } - var f protocol.FileInfo - err = f.UnmarshalXDR(bs) + f, err := unmarshalTrunc(bs, truncate) if err != nil { panic(err) } @@ -605,7 +572,7 @@ func ldbAvailability(db *leveldb.DB, repo, file []byte) []protocol.NodeID { return nodes } -func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) { +func ldbWithNeed(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) { defer runtime.GC() start := globalKey(repo, nil) @@ -649,13 +616,12 @@ func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) { panic(err) } - var gf protocol.FileInfo - err = gf.UnmarshalXDR(bs) + gf, err := unmarshalTrunc(bs, truncate) if err != nil { panic(err) } - if protocol.IsDeleted(gf.Flags) && !have { + if gf.IsDeleted() && !have { // We don't need deleted files that we don't have continue } @@ -670,3 +636,15 @@ func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) { } } } + +func unmarshalTrunc(bs []byte, truncate bool) (protocol.FileIntf, error) { + if truncate { + var tf protocol.FileInfoTruncated + err := tf.UnmarshalXDR(bs) + return tf, err + } else { + var tf protocol.FileInfo + err := tf.UnmarshalXDR(bs) + return tf, err + } +} diff --git a/files/set.go b/files/set.go index d49615a7..9065425e 100644 --- a/files/set.go +++ b/files/set.go @@ -36,7 +36,7 @@ func NewSet(repo string, db *leveldb.DB) *Set { } var nodeID protocol.NodeID - ldbWithAllRepo(db, []byte(repo), func(node []byte, f protocol.FileInfo) bool { + ldbWithAllRepoTruncated(db, []byte(repo), func(node []byte, f protocol.FileInfoTruncated) bool { copy(nodeID[:], node) if f.LocalVersion > s.localVersion[nodeID] { s.localVersion[nodeID] = f.LocalVersion @@ -87,21 +87,35 @@ func (s *Set) WithNeed(node protocol.NodeID, fn fileIterator) { if debug { l.Debugf("%s WithNeed(%v)", s.repo, node) } - ldbWithNeed(s.db, []byte(s.repo), node[:], fn) + ldbWithNeed(s.db, []byte(s.repo), node[:], false, fn) +} + +func (s *Set) WithNeedTruncated(node protocol.NodeID, fn fileIterator) { + if debug { + l.Debugf("%s WithNeedTruncated(%v)", s.repo, node) + } + ldbWithNeed(s.db, []byte(s.repo), node[:], true, fn) } func (s *Set) WithHave(node protocol.NodeID, fn fileIterator) { if debug { l.Debugf("%s WithHave(%v)", s.repo, node) } - ldbWithHave(s.db, []byte(s.repo), node[:], fn) + ldbWithHave(s.db, []byte(s.repo), node[:], false, fn) } -func (s *Set) WithGlobal(fn fileIterator) { +func (s *Set) WithHaveTruncated(node protocol.NodeID, fn fileIterator) { if debug { - l.Debugf("%s WithGlobal()", s.repo) + l.Debugf("%s WithHaveTruncated(%v)", s.repo, node) } - ldbWithGlobal(s.db, []byte(s.repo), fn) + ldbWithHave(s.db, []byte(s.repo), node[:], true, fn) +} + +func (s *Set) WithGlobalTruncated(fn fileIterator) { + if debug { + l.Debugf("%s WithGlobalTrucnated()", s.repo) + } + ldbWithGlobal(s.db, []byte(s.repo), true, fn) } func (s *Set) Get(node protocol.NodeID, file string) protocol.FileInfo { diff --git a/model/model.go b/model/model.go index 0f8803ff..c15f875e 100644 --- a/model/model.go +++ b/model/model.go @@ -208,15 +208,9 @@ func (m *Model) Completion(node protocol.NodeID, repo string) float64 { return 0 // Repo doesn't exist, so we hardly have any of it } - rf.WithGlobal(func(f protocol.FileInfo) bool { - if !protocol.IsDeleted(f.Flags) { - var size int64 - if protocol.IsDirectory(f.Flags) { - size = zeroEntrySize - } else { - size = f.Size() - } - tot += size + rf.WithGlobalTruncated(func(f protocol.FileIntf) bool { + if !f.IsDeleted() { + tot += f.Size() } return true }) @@ -226,20 +220,19 @@ func (m *Model) Completion(node protocol.NodeID, repo string) float64 { } var need int64 - rf.WithNeed(node, func(f protocol.FileInfo) bool { - if !protocol.IsDeleted(f.Flags) { - var size int64 - if protocol.IsDirectory(f.Flags) { - size = zeroEntrySize - } else { - size = f.Size() - } - need += size + rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool { + if !f.IsDeleted() { + need += f.Size() } return true }) - return 100 * (1 - float64(need)/float64(tot)) + res := 100 * (1 - float64(need)/float64(tot)) + if debug { + l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot) + } + + return res } func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) { @@ -252,18 +245,13 @@ func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) { return } -func sizeOfFile(f protocol.FileInfo) (files, deleted int, bytes int64) { - if !protocol.IsDeleted(f.Flags) { +func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) { + if !f.IsDeleted() { files++ - if !protocol.IsDirectory(f.Flags) { - bytes += f.Size() - } else { - bytes += zeroEntrySize - } } else { deleted++ - bytes += zeroEntrySize } + bytes += f.Size() return } @@ -273,7 +261,7 @@ func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) { m.rmut.RLock() defer m.rmut.RUnlock() if rf, ok := m.repoFiles[repo]; ok { - rf.WithGlobal(func(f protocol.FileInfo) bool { + rf.WithGlobalTruncated(func(f protocol.FileIntf) bool { fs, de, by := sizeOfFile(f) files += fs deleted += de @@ -290,7 +278,7 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) { m.rmut.RLock() defer m.rmut.RUnlock() if rf, ok := m.repoFiles[repo]; ok { - rf.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool { fs, de, by := sizeOfFile(f) files += fs deleted += de @@ -306,13 +294,16 @@ func (m *Model) NeedSize(repo string) (files int, bytes int64) { m.rmut.RLock() defer m.rmut.RUnlock() if rf, ok := m.repoFiles[repo]; ok { - rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool { fs, de, by := sizeOfFile(f) files += fs + de bytes += by return true }) } + if debug { + l.Debugf("NeedSize(%q): %d %d", repo, files, bytes) + } return } @@ -322,8 +313,8 @@ func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo { defer m.rmut.RUnlock() if rf, ok := m.repoFiles[repo]; ok { fs := make([]protocol.FileInfo, 0, indexBatchSize) - rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool { - fs = append(fs, f) + rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool { + fs = append(fs, f.(protocol.FileInfo)) return len(fs) < indexBatchSize }) return fs @@ -597,7 +588,8 @@ func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, rep maxLocalVer := uint64(0) var err error - fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool { + f := fi.(protocol.FileInfo) if f.LocalVersion <= minLocalVer { return true } @@ -802,7 +794,8 @@ func (m *Model) ScanRepoSub(repo, sub string) error { batch = batch[:0] // TODO: We should limit the Have scanning to start at sub seenPrefix := false - fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool { + f := fi.(protocol.FileInfoTruncated) if !strings.HasPrefix(f.Name, sub) { return !seenPrefix } @@ -814,10 +807,12 @@ func (m *Model) ScanRepoSub(repo, sub string) error { } if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) { // File has been deleted - f.Blocks = nil - f.Flags |= protocol.FlagDeleted - f.Version = lamport.Default.Tick(f.Version) - f.LocalVersion = 0 + nf := protocol.FileInfo{ + Name: f.Name, + Flags: f.Flags | protocol.FlagDeleted, + Modified: f.Modified, + Version: lamport.Default.Tick(f.Version), + } events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{ "repo": repo, "name": f.Name, @@ -825,7 +820,7 @@ func (m *Model) ScanRepoSub(repo, sub string) error { "flags": fmt.Sprintf("0%o", f.Flags), "size": f.Size(), }) - batch = append(batch, f) + batch = append(batch, nf) } } return true @@ -898,7 +893,8 @@ func (m *Model) Override(repo string) { m.rmut.RUnlock() batch := make([]protocol.FileInfo, 0, indexBatchSize) - fs.WithNeed(protocol.LocalNodeID, func(need protocol.FileInfo) bool { + fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool { + need := fi.(protocol.FileInfo) if len(batch) == indexBatchSize { fs.Update(protocol.LocalNodeID, batch) batch = batch[:0] diff --git a/protocol/message.go b/protocol/message.go index 4f089943..2eb5020f 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -26,12 +26,46 @@ func (f FileInfo) String() string { } func (f FileInfo) Size() (bytes int64) { + if IsDeleted(f.Flags) || IsDirectory(f.Flags) { + return 128 + } for _, b := range f.Blocks { bytes += int64(b.Size) } return } +func (f FileInfo) IsDeleted() bool { + return IsDeleted(f.Flags) +} + +// Used for unmarshalling a FileInfo structure but skipping the actual block list +type FileInfoTruncated struct { + Name string // max:1024 + Flags uint32 + Modified int64 + Version uint64 + LocalVersion uint64 + NumBlocks uint32 +} + +// Returns an upper bound on the size, not the exact figure +func (f FileInfoTruncated) Size() int64 { + if IsDeleted(f.Flags) || IsDirectory(f.Flags) { + return 128 + } + return int64(f.NumBlocks) * BlockSize +} + +func (f FileInfoTruncated) IsDeleted() bool { + return IsDeleted(f.Flags) +} + +type FileIntf interface { + Size() int64 + IsDeleted() bool +} + type BlockInfo struct { Offset int64 // noencode (cache only) Size uint32 diff --git a/protocol/message_xdr.go b/protocol/message_xdr.go index 329bec89..3f1c0631 100644 --- a/protocol/message_xdr.go +++ b/protocol/message_xdr.go @@ -199,6 +199,98 @@ func (o *FileInfo) decodeXDR(xr *xdr.Reader) error { /* +FileInfoTruncated Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Modified (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Num Blocks | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct FileInfoTruncated { + string Name<1024>; + unsigned int Flags; + hyper Modified; + unsigned hyper Version; + unsigned hyper LocalVersion; + unsigned int NumBlocks; +} + +*/ + +func (o FileInfoTruncated) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o FileInfoTruncated) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o FileInfoTruncated) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o FileInfoTruncated) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Name) > 1024 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Name) + xw.WriteUint32(o.Flags) + xw.WriteUint64(uint64(o.Modified)) + xw.WriteUint64(o.Version) + xw.WriteUint64(o.LocalVersion) + xw.WriteUint32(o.NumBlocks) + return xw.Tot(), xw.Error() +} + +func (o *FileInfoTruncated) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *FileInfoTruncated) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *FileInfoTruncated) decodeXDR(xr *xdr.Reader) error { + o.Name = xr.ReadStringMax(1024) + o.Flags = xr.ReadUint32() + o.Modified = int64(xr.ReadUint64()) + o.Version = xr.ReadUint64() + o.LocalVersion = xr.ReadUint64() + o.NumBlocks = xr.ReadUint32() + return xr.Error() +} + +/* + BlockInfo Structure: 0 1 2 3