lib/scanner: Use fs.Filesystem for all operations

One more step on the path of the great refactoring. Touches rwfolder a
little bit since it uses the Lstat from fs as well, but mostly this is
just on the scanner as rwfolder is scheduled for a later refactor.

There are a couple of usages of fs.DefaultFilesystem that will in the
end become a filesystem injected from the top, but that comes later.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4070
LGTM: AudriusButkevicius, imsodin
This commit is contained in:
Jakob Borg
2017-04-01 09:04:11 +00:00
committed by Simon Frei
parent bdb56d91b9
commit 4253f22680
13 changed files with 191 additions and 103 deletions

View File

@@ -8,41 +8,16 @@ package scanner
import (
"errors"
"os"
"path/filepath"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
// The parallel hasher reads FileInfo structures from the inbox, hashes the
// file to populate the Blocks element and sends it to the outbox. A number of
// workers are used in parallel. The outbox will become closed when the inbox
// is closed and all items handled.
func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter Counter, done, cancel chan struct{}, useWeakHashes bool) {
wg := sync.NewWaitGroup()
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
hashFiles(dir, blockSize, outbox, inbox, counter, cancel, useWeakHashes)
wg.Done()
}()
}
go func() {
wg.Wait()
if done != nil {
close(done)
}
close(outbox)
}()
}
// HashFile hashes the files and returns a list of blocks representing the file.
func HashFile(path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := os.Open(path)
func HashFile(fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := fs.Open(path)
if err != nil {
l.Debugln("open:", err)
return nil, err
@@ -82,10 +57,53 @@ func HashFile(path string, blockSize int, counter Counter, useWeakHashes bool) (
return blocks, nil
}
func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter Counter, cancel chan struct{}, useWeakHashes bool) {
// The parallel hasher reads FileInfo structures from the inbox, hashes the
// file to populate the Blocks element and sends it to the outbox. A number of
// workers are used in parallel. The outbox will become closed when the inbox
// is closed and all items handled.
type parallelHasher struct {
fs fs.Filesystem
dir string
blockSize int
workers int
outbox chan<- protocol.FileInfo
inbox <-chan protocol.FileInfo
counter Counter
done chan<- struct{}
cancel <-chan struct{}
useWeakHashes bool
wg sync.WaitGroup
}
func newParallelHasher(fs fs.Filesystem, dir string, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, cancel <-chan struct{}, useWeakHashes bool) {
ph := &parallelHasher{
fs: fs,
dir: dir,
blockSize: blockSize,
workers: workers,
outbox: outbox,
inbox: inbox,
counter: counter,
done: done,
cancel: cancel,
useWeakHashes: useWeakHashes,
wg: sync.NewWaitGroup(),
}
for i := 0; i < workers; i++ {
ph.wg.Add(1)
go ph.hashFiles()
}
go ph.closeWhenDone()
}
func (ph *parallelHasher) hashFiles() {
defer ph.wg.Done()
for {
select {
case f, ok := <-inbox:
case f, ok := <-ph.inbox:
if !ok {
return
}
@@ -94,7 +112,7 @@ func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo,
panic("Bug. Asked to hash a directory or a deleted file.")
}
blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, counter, useWeakHashes)
blocks, err := HashFile(ph.fs, filepath.Join(ph.dir, f.Name), ph.blockSize, ph.counter, ph.useWeakHashes)
if err != nil {
l.Debugln("hash error:", f.Name, err)
continue
@@ -112,13 +130,21 @@ func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo,
}
select {
case outbox <- f:
case <-cancel:
case ph.outbox <- f:
case <-ph.cancel:
return
}
case <-cancel:
case <-ph.cancel:
return
}
}
}
func (ph *parallelHasher) closeWhenDone() {
ph.wg.Wait()
if ph.done != nil {
close(ph.done)
}
close(ph.outbox)
}