Refactor out file scanner into separate package
This commit is contained in:
@@ -13,16 +13,17 @@ import (
|
||||
|
||||
"github.com/calmh/syncthing/buffers"
|
||||
"github.com/calmh/syncthing/protocol"
|
||||
"github.com/calmh/syncthing/scanner"
|
||||
)
|
||||
|
||||
type Model struct {
|
||||
dir string
|
||||
|
||||
global map[string]File // the latest version of each file as it exists in the cluster
|
||||
gmut sync.RWMutex // protects global
|
||||
local map[string]File // the files we currently have locally on disk
|
||||
lmut sync.RWMutex // protects local
|
||||
remote map[string]map[string]File
|
||||
global map[string]scanner.File // the latest version of each file as it exists in the cluster
|
||||
gmut sync.RWMutex // protects global
|
||||
local map[string]scanner.File // the files we currently have locally on disk
|
||||
lmut sync.RWMutex // protects local
|
||||
remote map[string]map[string]scanner.File
|
||||
rmut sync.RWMutex // protects remote
|
||||
protoConn map[string]Connection
|
||||
rawConn map[string]io.Closer
|
||||
@@ -31,7 +32,7 @@ type Model struct {
|
||||
// Queue for files to fetch. fq can call back into the model, so we must ensure
|
||||
// to hold no locks when calling methods on fq.
|
||||
fq *FileQueue
|
||||
dq chan File // queue for files to delete
|
||||
dq chan scanner.File // queue for files to delete
|
||||
|
||||
updatedLocal int64 // timestamp of last update to local
|
||||
updateGlobal int64 // timestamp of last update to remote
|
||||
@@ -77,16 +78,16 @@ var (
|
||||
func NewModel(dir string, maxChangeBw int) *Model {
|
||||
m := &Model{
|
||||
dir: dir,
|
||||
global: make(map[string]File),
|
||||
local: make(map[string]File),
|
||||
remote: make(map[string]map[string]File),
|
||||
global: make(map[string]scanner.File),
|
||||
local: make(map[string]scanner.File),
|
||||
remote: make(map[string]map[string]scanner.File),
|
||||
protoConn: make(map[string]Connection),
|
||||
rawConn: make(map[string]io.Closer),
|
||||
lastIdxBcast: time.Now(),
|
||||
trace: make(map[string]bool),
|
||||
sup: suppressor{threshold: int64(maxChangeBw)},
|
||||
fq: NewFileQueue(),
|
||||
dq: make(chan File),
|
||||
dq: make(chan scanner.File),
|
||||
}
|
||||
|
||||
go m.broadcastIndexLoop()
|
||||
@@ -128,7 +129,6 @@ func (m *Model) StartRW(del bool, threads int) {
|
||||
m.delete = del
|
||||
m.parallelRequests = threads
|
||||
|
||||
go m.cleanTempFiles()
|
||||
if del {
|
||||
go m.deleteLoop()
|
||||
}
|
||||
@@ -260,7 +260,7 @@ func (m *Model) InSyncSize() (files, bytes int64) {
|
||||
}
|
||||
|
||||
// NeedFiles returns the list of currently needed files and the total size.
|
||||
func (m *Model) NeedFiles() (files []File, bytes int64) {
|
||||
func (m *Model) NeedFiles() (files []scanner.File, bytes int64) {
|
||||
qf := m.fq.QueuedFiles()
|
||||
|
||||
m.gmut.RLock()
|
||||
@@ -278,7 +278,7 @@ func (m *Model) NeedFiles() (files []File, bytes int64) {
|
||||
// Index is called when a new node is connected and we receive their full index.
|
||||
// Implements the protocol.Model interface.
|
||||
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
|
||||
var files = make([]File, len(fs))
|
||||
var files = make([]scanner.File, len(fs))
|
||||
for i := range fs {
|
||||
files[i] = fileFromFileInfo(fs[i])
|
||||
}
|
||||
@@ -290,7 +290,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
|
||||
debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
|
||||
}
|
||||
|
||||
repo := make(map[string]File)
|
||||
repo := make(map[string]scanner.File)
|
||||
for _, f := range files {
|
||||
m.indexUpdate(repo, f)
|
||||
}
|
||||
@@ -306,7 +306,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
|
||||
// IndexUpdate is called for incremental updates to connected nodes' indexes.
|
||||
// Implements the protocol.Model interface.
|
||||
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
|
||||
var files = make([]File, len(fs))
|
||||
var files = make([]scanner.File, len(fs))
|
||||
for i := range fs {
|
||||
files[i] = fileFromFileInfo(fs[i])
|
||||
}
|
||||
@@ -335,7 +335,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
|
||||
m.recomputeNeedForFiles(files)
|
||||
}
|
||||
|
||||
func (m *Model) indexUpdate(repo map[string]File, f File) {
|
||||
func (m *Model) indexUpdate(repo map[string]scanner.File, f scanner.File) {
|
||||
if m.trace["idx"] {
|
||||
var flagComment string
|
||||
if f.Flags&protocol.FlagDeleted != 0 {
|
||||
@@ -431,9 +431,9 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by
|
||||
}
|
||||
|
||||
// ReplaceLocal replaces the local repository index with the given list of files.
|
||||
func (m *Model) ReplaceLocal(fs []File) {
|
||||
func (m *Model) ReplaceLocal(fs []scanner.File) {
|
||||
var updated bool
|
||||
var newLocal = make(map[string]File)
|
||||
var newLocal = make(map[string]scanner.File)
|
||||
|
||||
m.lmut.RLock()
|
||||
for _, f := range fs {
|
||||
@@ -474,7 +474,7 @@ func (m *Model) ReplaceLocal(fs []File) {
|
||||
// the local index from a cache file at startup.
|
||||
func (m *Model) SeedLocal(fs []protocol.FileInfo) {
|
||||
m.lmut.Lock()
|
||||
m.local = make(map[string]File)
|
||||
m.local = make(map[string]scanner.File)
|
||||
for _, f := range fs {
|
||||
m.local[f.Name] = fileFromFileInfo(f)
|
||||
}
|
||||
@@ -628,7 +628,7 @@ func (m *Model) broadcastIndexLoop() {
|
||||
}
|
||||
|
||||
// markDeletedLocals sets the deleted flag on files that have gone missing locally.
|
||||
func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
|
||||
func (m *Model) markDeletedLocals(newLocal map[string]scanner.File) bool {
|
||||
// For every file in the existing local table, check if they are also
|
||||
// present in the new local table. If they are not, check that we already
|
||||
// had the newest version available according to the global table and if so
|
||||
@@ -658,7 +658,7 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
|
||||
return updated
|
||||
}
|
||||
|
||||
func (m *Model) updateLocal(f File) {
|
||||
func (m *Model) updateLocal(f scanner.File) {
|
||||
var updated bool
|
||||
|
||||
m.lmut.Lock()
|
||||
@@ -685,7 +685,7 @@ func (m *Model) updateLocal(f File) {
|
||||
/*
|
||||
XXX: Not done, needs elegant handling of availability
|
||||
|
||||
func (m *Model) recomputeGlobalFor(files []File) bool {
|
||||
func (m *Model) recomputeGlobalFor(files []scanner.File) bool {
|
||||
m.gmut.Lock()
|
||||
defer m.gmut.Unlock()
|
||||
|
||||
@@ -702,7 +702,7 @@ func (m *Model) recomputeGlobalFor(files []File) bool {
|
||||
*/
|
||||
|
||||
func (m *Model) recomputeGlobal() {
|
||||
var newGlobal = make(map[string]File)
|
||||
var newGlobal = make(map[string]scanner.File)
|
||||
|
||||
m.lmut.RLock()
|
||||
for n, f := range m.local {
|
||||
@@ -761,12 +761,12 @@ func (m *Model) recomputeGlobal() {
|
||||
|
||||
type addOrder struct {
|
||||
n string
|
||||
remote []Block
|
||||
remote []scanner.Block
|
||||
fm *fileMonitor
|
||||
}
|
||||
|
||||
func (m *Model) recomputeNeedForGlobal() {
|
||||
var toDelete []File
|
||||
var toDelete []scanner.File
|
||||
var toAdd []addOrder
|
||||
|
||||
m.gmut.RLock()
|
||||
@@ -785,8 +785,8 @@ func (m *Model) recomputeNeedForGlobal() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Model) recomputeNeedForFiles(files []File) {
|
||||
var toDelete []File
|
||||
func (m *Model) recomputeNeedForFiles(files []scanner.File) {
|
||||
var toDelete []scanner.File
|
||||
var toAdd []addOrder
|
||||
|
||||
m.gmut.RLock()
|
||||
@@ -805,7 +805,7 @@ func (m *Model) recomputeNeedForFiles(files []File) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
|
||||
func (m *Model) recomputeNeedForFile(gf scanner.File, toAdd []addOrder, toDelete []scanner.File) ([]addOrder, []scanner.File) {
|
||||
m.lmut.RLock()
|
||||
lf, ok := m.local[gf.Name]
|
||||
m.lmut.RUnlock()
|
||||
@@ -830,7 +830,7 @@ func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File)
|
||||
if gf.Flags&protocol.FlagDeleted != 0 {
|
||||
toDelete = append(toDelete, gf)
|
||||
} else {
|
||||
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
|
||||
local, remote := scanner.BlockDiff(lf.Blocks, gf.Blocks)
|
||||
fm := fileMonitor{
|
||||
name: gf.Name,
|
||||
path: path.Clean(path.Join(m.dir, gf.Name)),
|
||||
@@ -878,18 +878,18 @@ func (m *Model) deleteLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func fileFromFileInfo(f protocol.FileInfo) File {
|
||||
var blocks = make([]Block, len(f.Blocks))
|
||||
func fileFromFileInfo(f protocol.FileInfo) scanner.File {
|
||||
var blocks = make([]scanner.Block, len(f.Blocks))
|
||||
var offset int64
|
||||
for i, b := range f.Blocks {
|
||||
blocks[i] = Block{
|
||||
blocks[i] = scanner.Block{
|
||||
Offset: offset,
|
||||
Size: b.Size,
|
||||
Hash: b.Hash,
|
||||
}
|
||||
offset += int64(b.Size)
|
||||
}
|
||||
return File{
|
||||
return scanner.File{
|
||||
Name: f.Name,
|
||||
Size: offset,
|
||||
Flags: f.Flags,
|
||||
@@ -899,7 +899,7 @@ func fileFromFileInfo(f protocol.FileInfo) File {
|
||||
}
|
||||
}
|
||||
|
||||
func fileInfoFromFile(f File) protocol.FileInfo {
|
||||
func fileInfoFromFile(f scanner.File) protocol.FileInfo {
|
||||
var blocks = make([]protocol.BlockInfo, len(f.Blocks))
|
||||
for i, b := range f.Blocks {
|
||||
blocks[i] = protocol.BlockInfo{
|
||||
|
||||
Reference in New Issue
Block a user