Use LevelDB storage backend

This commit is contained in:
Jakob Borg
2014-07-06 14:46:48 +02:00
parent 4a88d1244d
commit 31350b4352
114 changed files with 20315 additions and 683 deletions

View File

@@ -17,13 +17,13 @@ import (
"sync"
"time"
"github.com/calmh/syncthing/cid"
"github.com/calmh/syncthing/config"
"github.com/calmh/syncthing/files"
"github.com/calmh/syncthing/lamport"
"github.com/calmh/syncthing/osutil"
"github.com/calmh/syncthing/protocol"
"github.com/calmh/syncthing/scanner"
"github.com/syndtr/goleveldb/leveldb"
)
type repoState int
@@ -44,6 +44,7 @@ const zeroEntrySize = 128
type Model struct {
indexDir string
cfg *config.Configuration
db *leveldb.DB
clientName string
clientVersion string
@@ -58,8 +59,6 @@ type Model struct {
repoState map[string]repoState // repo -> state
smut sync.RWMutex
cm *cid.Map
protoConn map[protocol.NodeID]protocol.Connection
rawConn map[protocol.NodeID]io.Closer
nodeVer map[protocol.NodeID]string
@@ -79,10 +78,11 @@ var (
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string) *Model {
func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string, db *leveldb.DB) *Model {
m := &Model{
indexDir: indexDir,
cfg: cfg,
db: db,
clientName: clientName,
clientVersion: clientVersion,
repoCfgs: make(map[string]config.RepositoryConfiguration),
@@ -91,7 +91,6 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
nodeRepos: make(map[protocol.NodeID][]string),
repoState: make(map[string]repoState),
suppressor: make(map[string]*suppressor),
cm: cid.NewMap(),
protoConn: make(map[protocol.NodeID]protocol.Connection),
rawConn: make(map[protocol.NodeID]io.Closer),
nodeVer: make(map[protocol.NodeID]string),
@@ -163,7 +162,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
var have int64
for _, repo := range m.nodeRepos[node] {
for _, f := range m.repoFiles[repo].Global() {
m.repoFiles[repo].WithGlobal(func(f scanner.File) bool {
if !protocol.IsDeleted(f.Flags) {
size := f.Size
if protocol.IsDirectory(f.Flags) {
@@ -172,9 +171,10 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
tot += size
have += size
}
}
return true
})
for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) {
m.repoFiles[repo].WithNeed(node, func(f scanner.File) bool {
if !protocol.IsDeleted(f.Flags) {
size := f.Size
if protocol.IsDirectory(f.Flags) {
@@ -182,7 +182,8 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
}
have -= size
}
}
return true
})
}
ci.Completion = 100
@@ -210,17 +211,25 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) {
for _, f := range fs {
if !protocol.IsDeleted(f.Flags) {
files++
if !protocol.IsDirectory(f.Flags) {
bytes += f.Size
} else {
bytes += zeroEntrySize
}
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
bytes += by
}
return
}
func sizeOfFile(f scanner.File) (files, deleted int, bytes int64) {
if !protocol.IsDeleted(f.Flags) {
files++
if !protocol.IsDirectory(f.Flags) {
bytes += f.Size
} else {
deleted++
bytes += zeroEntrySize
}
} else {
deleted++
bytes += zeroEntrySize
}
return
}
@@ -231,9 +240,15 @@ func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
return sizeOf(rf.Global())
rf.WithGlobal(func(f scanner.File) bool {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
bytes += by
return true
})
}
return 0, 0, 0
return
}
// LocalSize returns the number of files, deleted files and total bytes for all
@@ -242,7 +257,13 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
return sizeOf(rf.Have(cid.LocalID))
rf.WithHave(protocol.LocalNodeID, func(f scanner.File) bool {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
bytes += by
return true
})
}
return 0, 0, 0
}
@@ -253,16 +274,20 @@ func (m *Model) NeedSize(repo string) (files int, bytes int64) {
return f + d, b
}
// NeedFiles returns the list of currently needed files and the total size.
// NeedFiles returns the list of currently needed files
func (m *Model) NeedFilesRepo(repo string) []scanner.File {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
f := rf.Need(cid.LocalID)
var fs []scanner.File
rf.WithNeed(protocol.LocalNodeID, func(f scanner.File) bool {
fs = append(fs, f)
return true
})
if r := m.repoCfgs[repo].FileRanker(); r != nil {
files.SortBy(r).Sort(f)
files.SortBy(r).Sort(fs)
}
return f
return fs
}
return nil
}
@@ -293,10 +318,9 @@ func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInf
files[i] = fileFromFileInfo(f)
}
id := m.cm.Get(nodeID)
m.rmut.RLock()
if r, ok := m.repoFiles[repo]; ok {
r.Replace(id, files)
r.Replace(nodeID, files)
} else {
l.Fatalf("Index for nonexistant repo %q", repo)
}
@@ -329,10 +353,9 @@ func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.F
files[i] = fileFromFileInfo(f)
}
id := m.cm.Get(nodeID)
m.rmut.RLock()
if r, ok := m.repoFiles[repo]; ok {
r.Update(id, files)
r.Update(nodeID, files)
} else {
l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
}
@@ -378,13 +401,11 @@ func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterCon
func (m *Model) Close(node protocol.NodeID, err error) {
l.Infof("Connection to %s closed: %v", node, err)
cid := m.cm.Get(node)
m.rmut.RLock()
for _, repo := range m.nodeRepos[node] {
m.repoFiles[repo].Replace(cid, nil)
m.repoFiles[repo].Replace(node, nil)
}
m.rmut.RUnlock()
m.cm.Clear(node)
m.pmut.Lock()
conn, ok := m.rawConn[node]
@@ -410,7 +431,7 @@ func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64,
return nil, ErrNoSuchFile
}
lf := r.Get(cid.LocalID, name)
lf := r.Get(protocol.LocalNodeID, name)
if lf.Suppressed || protocol.IsDeleted(lf.Flags) {
if debug {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
@@ -425,7 +446,7 @@ func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64,
return nil, ErrNoSuchFile
}
if debug && nodeID != cid.LocalNodeID {
if debug && nodeID != protocol.LocalNodeID {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
}
m.rmut.RLock()
@@ -449,13 +470,13 @@ func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64,
// ReplaceLocal replaces the local repository index with the given list of files.
func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
m.rmut.RLock()
m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs)
m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
m.rmut.RUnlock()
}
func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
m.rmut.RLock()
f := m.repoFiles[repo].Get(cid.LocalID, file)
f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
m.rmut.RUnlock()
return f
}
@@ -533,7 +554,11 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
var index []protocol.FileInfo
fs := m.repoFiles[repo].Have(cid.LocalID)
var fs []scanner.File
m.repoFiles[repo].WithHave(protocol.LocalNodeID, func(f scanner.File) bool {
fs = append(fs, f)
return true
})
for _, f := range fs {
mf := fileInfoFromFile(f)
@@ -552,7 +577,7 @@ func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
func (m *Model) updateLocal(repo string, f scanner.File) {
m.rmut.RLock()
m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f})
m.repoFiles[repo].Update(protocol.LocalNodeID, []scanner.File{f})
m.rmut.RUnlock()
}
@@ -573,6 +598,7 @@ func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset
}
func (m *Model) broadcastIndexLoop() {
// TODO: Rewrite to send index in segments
var lastChange = map[string]uint64{}
for {
time.Sleep(5 * time.Second)
@@ -584,21 +610,13 @@ func (m *Model) broadcastIndexLoop() {
for repo, fs := range m.repoFiles {
repo := repo
c := fs.Changes(cid.LocalID)
c := fs.Changes(protocol.LocalNodeID)
if c == lastChange[repo] {
continue
}
lastChange[repo] = c
idx := m.protocolIndex(repo)
indexWg.Add(1)
go func() {
err := m.saveIndex(repo, m.indexDir, idx)
if err != nil {
l.Infof("Saving index for %q: %v", repo, err)
}
indexWg.Done()
}()
for _, nodeID := range m.repoNodes[repo] {
nodeID := nodeID
@@ -632,7 +650,7 @@ func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
m.rmut.Lock()
m.repoCfgs[cfg.ID] = cfg
m.repoFiles[cfg.ID] = files.NewSet()
m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
@@ -713,18 +731,6 @@ func (m *Model) ScanRepo(repo string) error {
return nil
}
func (m *Model) SaveIndexes(dir string) {
m.rmut.RLock()
for repo := range m.repoCfgs {
fs := m.protocolIndex(repo)
err := m.saveIndex(repo, dir, fs)
if err != nil {
l.Infof("Saving index for %q: %v", repo, err)
}
}
m.rmut.RUnlock()
}
func (m *Model) LoadIndexes(dir string) {
m.rmut.RLock()
for repo := range m.repoCfgs {
@@ -737,7 +743,7 @@ func (m *Model) LoadIndexes(dir string) {
sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
}
m.repoFiles[repo].Replace(cid.LocalID, sfs)
m.repoFiles[repo].Replace(protocol.LocalNodeID, sfs)
}
m.rmut.RUnlock()
}
@@ -867,7 +873,7 @@ func (m *Model) Override(repo string) {
for i := range fs {
f := &fs[i]
h := r.Get(cid.LocalID, f.Name)
h := r.Get(protocol.LocalNodeID, f.Name)
if h.Name != f.Name {
// We are missing the file
f.Flags |= protocol.FlagDeleted
@@ -879,7 +885,7 @@ func (m *Model) Override(repo string) {
f.Version = lamport.Default.Tick(f.Version)
}
r.Update(cid.LocalID, fs)
r.Update(protocol.LocalNodeID, fs)
}
// Version returns the change version for the given repository. This is
@@ -890,7 +896,7 @@ func (m *Model) Version(repo string) uint64 {
m.rmut.Lock()
for _, n := range m.repoNodes[repo] {
ver += m.repoFiles[repo].Changes(m.cm.Get(n))
ver += m.repoFiles[repo].Changes(n)
}
m.rmut.Unlock()

View File

@@ -11,10 +11,11 @@ import (
"testing"
"time"
"github.com/calmh/syncthing/cid"
"github.com/calmh/syncthing/config"
"github.com/calmh/syncthing/protocol"
"github.com/calmh/syncthing/scanner"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
var node1, node2 protocol.NodeID
@@ -59,7 +60,8 @@ func init() {
}
func TestRequest(t *testing.T) {
m := NewModel("/tmp", &config.Configuration{}, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", &config.Configuration{}, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
@@ -95,7 +97,8 @@ func genFiles(n int) []protocol.FileInfo {
}
func BenchmarkIndex10000(b *testing.B) {
m := NewModel("/tmp", nil, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
files := genFiles(10000)
@@ -107,7 +110,8 @@ func BenchmarkIndex10000(b *testing.B) {
}
func BenchmarkIndex00100(b *testing.B) {
m := NewModel("/tmp", nil, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
files := genFiles(100)
@@ -119,7 +123,8 @@ func BenchmarkIndex00100(b *testing.B) {
}
func BenchmarkIndexUpdate10000f10000(b *testing.B) {
m := NewModel("/tmp", nil, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
files := genFiles(10000)
@@ -132,7 +137,8 @@ func BenchmarkIndexUpdate10000f10000(b *testing.B) {
}
func BenchmarkIndexUpdate10000f00100(b *testing.B) {
m := NewModel("/tmp", nil, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
files := genFiles(10000)
@@ -146,7 +152,8 @@ func BenchmarkIndexUpdate10000f00100(b *testing.B) {
}
func BenchmarkIndexUpdate10000f00001(b *testing.B) {
m := NewModel("/tmp", nil, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
files := genFiles(10000)
@@ -193,7 +200,8 @@ func (FakeConnection) Statistics() protocol.Statistics {
}
func BenchmarkRequest(b *testing.B) {
m := NewModel("/tmp", nil, "syncthing", "dev")
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel("/tmp", nil, "syncthing", "dev", db)
m.AddRepo(config.RepositoryConfiguration{ID: "default", Directory: "testdata"})
m.ScanRepo("default")
@@ -228,27 +236,17 @@ func BenchmarkRequest(b *testing.B) {
}
func TestActivityMap(t *testing.T) {
cm := cid.NewMap()
fooID := cm.Get(node1)
if fooID == 0 {
t.Fatal("ID cannot be zero")
}
barID := cm.Get(node2)
if barID == 0 {
t.Fatal("ID cannot be zero")
}
m := make(activityMap)
if node := m.leastBusyNode(1<<fooID, cm); node != node1 {
if node := m.leastBusyNode([]protocol.NodeID{node1}); node != node1 {
t.Errorf("Incorrect least busy node %q", node)
}
if node := m.leastBusyNode(1<<barID, cm); node != node2 {
if node := m.leastBusyNode([]protocol.NodeID{node2}); node != node2 {
t.Errorf("Incorrect least busy node %q", node)
}
if node := m.leastBusyNode(1<<fooID|1<<barID, cm); node != node1 {
if node := m.leastBusyNode([]protocol.NodeID{node1, node2}); node != node1 {
t.Errorf("Incorrect least busy node %q", node)
}
if node := m.leastBusyNode(1<<fooID|1<<barID, cm); node != node2 {
if node := m.leastBusyNode([]protocol.NodeID{node1, node2}); node != node2 {
t.Errorf("Incorrect least busy node %q", node)
}
}

View File

@@ -12,7 +12,6 @@ import (
"runtime"
"time"
"github.com/calmh/syncthing/cid"
"github.com/calmh/syncthing/config"
"github.com/calmh/syncthing/osutil"
"github.com/calmh/syncthing/protocol"
@@ -32,7 +31,7 @@ type requestResult struct {
type openFile struct {
filepath string // full filepath name
temp string // temporary filename
availability uint64 // availability bitset
availability []protocol.NodeID
file *os.File
err error // error when opening or writing to file, all following operations are cancelled
outstanding int // number of requests we still have outstanding
@@ -41,20 +40,14 @@ type openFile struct {
type activityMap map[protocol.NodeID]int
func (m activityMap) leastBusyNode(availability uint64, cm *cid.Map) protocol.NodeID {
func (m activityMap) leastBusyNode(availability []protocol.NodeID) protocol.NodeID {
var low int = 2<<30 - 1
var selected protocol.NodeID
for _, node := range cm.Names() {
id := cm.Get(node)
if id == cid.LocalID {
continue
}
for _, node := range availability {
usage := m[node]
if availability&(1<<id) != 0 {
if usage < low {
low = usage
selected = node
}
if usage < low {
low = usage
selected = node
}
}
m[selected]++
@@ -245,7 +238,7 @@ func (p *puller) fixupDirectories() {
}
if filepath.Base(rn) == ".stversions" {
return nil
return filepath.SkipDir
}
cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
@@ -414,7 +407,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
}
of.availability = uint64(p.model.repoFiles[p.repoCfg.ID].Availability(f.Name))
of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name)
of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
@@ -521,7 +514,7 @@ func (p *puller) handleRequestBlock(b bqBlock) bool {
panic("bug: request for non-open file")
}
node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.cm)
node := p.oustandingPerNode.leastBusyNode(of.availability)
if len(node) == 0 {
of.err = errNoNode
if of.file != nil {