model: Use separate db commit routine (fixes #1558)

This commit is contained in:
Jakob Borg
2015-04-05 15:34:29 +02:00
parent 515f0db5b4
commit ecadf30fe7
3 changed files with 78 additions and 26 deletions

View File

@@ -69,8 +69,9 @@ type rwFolder struct {
copiers int
pullers int
stop chan struct{}
queue *jobQueue
stop chan struct{}
queue *jobQueue
dbUpdates chan protocol.FileInfo
}
func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder {
@@ -276,6 +277,7 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
var updateWg sync.WaitGroup
var copyWg sync.WaitGroup
var pullWg sync.WaitGroup
var doneWg sync.WaitGroup
@@ -284,6 +286,14 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
l.Debugln(p, "c", p.copiers, "p", p.pullers)
}
p.dbUpdates = make(chan protocol.FileInfo)
updateWg.Add(1)
go func() {
// dbUpdaterRoutine finishes when p.dbUpdates is closed
p.dbUpdaterRoutine()
updateWg.Done()
}()
for i := 0; i < p.copiers; i++ {
copyWg.Add(1)
go func() {
@@ -453,6 +463,10 @@ nextFile:
p.deleteDir(dir)
}
// Wait for db updates to complete
close(p.dbUpdates)
updateWg.Wait()
return changed
}
@@ -510,7 +524,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
}
if err = osutil.InWritableDir(mkdir, realName); err == nil {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
}
@@ -527,9 +541,9 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
// It's OK to change mode bits on stuff within non-writable directories.
if p.ignorePerms {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else if err := os.Chmod(realName, mode); err == nil {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
}
@@ -564,7 +578,7 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo) {
}
err = osutil.InWritableDir(os.Remove, realName)
if err == nil || os.IsNotExist(err) {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
}
@@ -601,7 +615,7 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) {
if err != nil && !os.IsNotExist(err) {
l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
} else {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
}
}
@@ -653,7 +667,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
// of the source and the creation of the target. Fix-up the metadata,
// and update the local index of the target file.
p.model.updateLocal(p.folder, source)
p.dbUpdates <- source
err = p.shortcutFile(target)
if err != nil {
@@ -671,7 +685,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
return
}
p.model.updateLocal(p.folder, source)
p.dbUpdates <- source
}
}
@@ -802,7 +816,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) {
}
}
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
return
}
@@ -810,7 +824,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) {
func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
if err == nil {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
}
@@ -1048,7 +1062,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) {
}
// Record the updated file in the index
p.model.updateLocal(p.folder, state.file)
p.dbUpdates <- state.file
}
func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
@@ -1089,6 +1103,47 @@ func (p *rwFolder) Jobs() ([]string, []string) {
return p.queue.Jobs()
}
// dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds.
func (p *rwFolder) dbUpdaterRoutine() {
const (
maxBatchSize = 1000
maxBatchTime = 2 * time.Second
)
batch := make([]protocol.FileInfo, 0, maxBatchSize)
tick := time.NewTicker(maxBatchTime)
defer tick.Stop()
loop:
for {
select {
case file, ok := <-p.dbUpdates:
if !ok {
break loop
}
file.LocalVersion = 0
batch = append(batch, file)
if len(batch) == maxBatchSize {
p.model.updateLocals(p.folder, batch)
batch = batch[:0]
}
case <-tick.C:
if len(batch) > 0 {
p.model.updateLocals(p.folder, batch)
batch = batch[:0]
}
}
}
if len(batch) > 0 {
p.model.updateLocals(p.folder, batch)
}
}
func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range cfg.Folders {
folder := &cfg.Folders[i]