This commit is contained in:
@@ -1738,32 +1738,22 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignore
|
||||
// returns the highest sent sequence number.
|
||||
func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string, dropSymlinks bool) (int64, error) {
|
||||
deviceID := conn.ID()
|
||||
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes := 0
|
||||
initial := prevSequence == 0
|
||||
var err error
|
||||
var f protocol.FileInfo
|
||||
debugMsg := func(t string) string {
|
||||
return fmt.Sprintf("Sending indexes for %s to %s at %s: %d files (<%d bytes) (%s)", folder, deviceID, conn, len(batch), batchSizeBytes, t)
|
||||
batch := newFileInfoBatch(nil)
|
||||
batch.flushFn = func(fs []protocol.FileInfo) error {
|
||||
l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", folder, deviceID, conn, len(batch.infos), batch.size)
|
||||
if initial {
|
||||
initial = false
|
||||
return conn.Index(folder, fs)
|
||||
}
|
||||
return conn.IndexUpdate(folder, fs)
|
||||
}
|
||||
|
||||
var err error
|
||||
var f protocol.FileInfo
|
||||
fs.WithHaveSequence(prevSequence+1, func(fi db.FileIntf) bool {
|
||||
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
|
||||
if initial {
|
||||
if err = conn.Index(folder, batch); err != nil {
|
||||
return false
|
||||
}
|
||||
l.Debugln(debugMsg("initial index"))
|
||||
initial = false
|
||||
} else {
|
||||
if err = conn.IndexUpdate(folder, batch); err != nil {
|
||||
return false
|
||||
}
|
||||
l.Debugln(debugMsg("batched update"))
|
||||
}
|
||||
|
||||
batch = make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes = 0
|
||||
if err = batch.flushIfFull(); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
f = fi.(protocol.FileInfo)
|
||||
@@ -1786,26 +1776,14 @@ func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs
|
||||
return true
|
||||
}
|
||||
|
||||
batch = append(batch, f)
|
||||
batchSizeBytes += f.ProtoSize()
|
||||
batch.append(f)
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return prevSequence, err
|
||||
}
|
||||
|
||||
if initial {
|
||||
err = conn.Index(folder, batch)
|
||||
if err == nil {
|
||||
l.Debugln(debugMsg("small initial index"))
|
||||
}
|
||||
} else if len(batch) > 0 {
|
||||
err = conn.IndexUpdate(folder, batch)
|
||||
if err == nil {
|
||||
l.Debugln(debugMsg("last batch"))
|
||||
}
|
||||
}
|
||||
err = batch.flush()
|
||||
|
||||
// True if there was nothing to be sent
|
||||
if f.Sequence == 0 {
|
||||
@@ -2046,12 +2024,18 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
return err
|
||||
}
|
||||
|
||||
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes := 0
|
||||
changes := 0
|
||||
batch := newFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
if err := runner.CheckHealth(); err != nil {
|
||||
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
|
||||
return err
|
||||
}
|
||||
m.updateLocalsFromScanning(folder, fs)
|
||||
return nil
|
||||
})
|
||||
|
||||
// Schedule a pull after scanning, but only if we actually detected any
|
||||
// changes.
|
||||
changes := 0
|
||||
defer func() {
|
||||
if changes > 0 {
|
||||
runner.SchedulePull()
|
||||
@@ -2059,26 +2043,16 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
}()
|
||||
|
||||
for f := range fchan {
|
||||
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
|
||||
if err := runner.CheckHealth(); err != nil {
|
||||
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
|
||||
return err
|
||||
}
|
||||
m.updateLocalsFromScanning(folder, batch)
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
batch = append(batch, f)
|
||||
batchSizeBytes += f.ProtoSize()
|
||||
batch.append(f)
|
||||
changes++
|
||||
}
|
||||
|
||||
if err := runner.CheckHealth(); err != nil {
|
||||
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
|
||||
if err := batch.flush(); err != nil {
|
||||
return err
|
||||
} else if len(batch) > 0 {
|
||||
m.updateLocalsFromScanning(folder, batch)
|
||||
}
|
||||
|
||||
if len(subDirs) == 0 {
|
||||
@@ -2089,42 +2063,70 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
|
||||
// Do a scan of the database for each prefix, to check for deleted and
|
||||
// ignored files.
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
batch.reset()
|
||||
var toIgnore []db.FileInfoTruncated
|
||||
ignoredParent := ""
|
||||
pathSep := string(fs.PathSeparator)
|
||||
for _, sub := range subDirs {
|
||||
var iterError error
|
||||
|
||||
fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
|
||||
f := fi.(db.FileInfoTruncated)
|
||||
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
|
||||
if err := runner.CheckHealth(); err != nil {
|
||||
iterError = err
|
||||
return false
|
||||
}
|
||||
m.updateLocalsFromScanning(folder, batch)
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
iterError = err
|
||||
return false
|
||||
}
|
||||
|
||||
switch {
|
||||
case !f.IsIgnored() && ignores.Match(f.Name).IsIgnored():
|
||||
if ignoredParent != "" && !strings.HasPrefix(f.Name, ignoredParent+pathSep) {
|
||||
for _, f := range toIgnore {
|
||||
l.Debugln("marking file as ignored", f)
|
||||
nf := f.ConvertToIgnoredFileInfo(m.id.Short())
|
||||
batch.append(nf)
|
||||
changes++
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
iterError = err
|
||||
return false
|
||||
}
|
||||
}
|
||||
toIgnore = toIgnore[:0]
|
||||
ignoredParent = ""
|
||||
}
|
||||
|
||||
switch ignored := ignores.Match(f.Name).IsIgnored(); {
|
||||
case !f.IsIgnored() && ignored:
|
||||
// File was not ignored at last pass but has been ignored.
|
||||
if f.IsDirectory() {
|
||||
// Delay ignoring as a child might be unignored.
|
||||
toIgnore = append(toIgnore, f)
|
||||
if ignoredParent == "" {
|
||||
// If the parent wasn't ignored already, set
|
||||
// this path as the "highest" ignored parent
|
||||
ignoredParent = f.Name
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
l.Debugln("marking file as ignored", f)
|
||||
nf := f.ConvertToIgnoredFileInfo(m.id.Short())
|
||||
batch = append(batch, nf)
|
||||
batchSizeBytes += nf.ProtoSize()
|
||||
batch.append(nf)
|
||||
changes++
|
||||
|
||||
case f.IsIgnored() && !ignores.Match(f.Name).IsIgnored():
|
||||
case f.IsIgnored() && !ignored:
|
||||
// Successfully scanned items are already un-ignored during
|
||||
// the scan, so check whether it is deleted.
|
||||
fallthrough
|
||||
case !f.IsIgnored() && !f.IsDeleted():
|
||||
// The file is not ignored and not deleted. Lets check if
|
||||
case !f.IsIgnored() && !f.IsDeleted() && !f.IsUnsupported():
|
||||
// The file is not ignored, deleted or unsupported. Lets check if
|
||||
// it's still here. Simply stat:ing it wont do as there are
|
||||
// tons of corner cases (e.g. parent dir->symlink, missing
|
||||
// permissions)
|
||||
if !osutil.IsDeleted(mtimefs, f.Name) {
|
||||
if ignoredParent != "" {
|
||||
// Don't ignore parents of this not ignored item
|
||||
toIgnore = toIgnore[:0]
|
||||
ignoredParent = ""
|
||||
}
|
||||
return true
|
||||
}
|
||||
nf := protocol.FileInfo{
|
||||
@@ -2143,28 +2145,36 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
// counter makes sure we are in conflict with any
|
||||
// other existing versions, which will be resolved
|
||||
// by the normal pulling mechanisms.
|
||||
if f.IsIgnored() {
|
||||
if f.ShouldConflict() {
|
||||
nf.Version = nf.Version.DropOthers(m.shortID)
|
||||
}
|
||||
|
||||
batch = append(batch, nf)
|
||||
batchSizeBytes += nf.ProtoSize()
|
||||
batch.append(nf)
|
||||
changes++
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if iterError == nil && len(toIgnore) > 0 {
|
||||
for _, f := range toIgnore {
|
||||
l.Debugln("marking file as ignored", f)
|
||||
nf := f.ConvertToIgnoredFileInfo(m.id.Short())
|
||||
batch.append(nf)
|
||||
changes++
|
||||
if iterError = batch.flushIfFull(); iterError != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
toIgnore = toIgnore[:0]
|
||||
}
|
||||
|
||||
if iterError != nil {
|
||||
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), iterError)
|
||||
return iterError
|
||||
}
|
||||
}
|
||||
|
||||
if err := runner.CheckHealth(); err != nil {
|
||||
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
|
||||
if err := batch.flush(); err != nil {
|
||||
return err
|
||||
} else if len(batch) > 0 {
|
||||
m.updateLocalsFromScanning(folder, batch)
|
||||
}
|
||||
|
||||
m.folderStatRef(folder).ScanCompleted()
|
||||
@@ -2903,3 +2913,44 @@ func (s folderDeviceSet) hasDevice(dev protocol.DeviceID) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type fileInfoBatch struct {
|
||||
infos []protocol.FileInfo
|
||||
size int
|
||||
flushFn func([]protocol.FileInfo) error
|
||||
}
|
||||
|
||||
func newFileInfoBatch(fn func([]protocol.FileInfo) error) *fileInfoBatch {
|
||||
return &fileInfoBatch{
|
||||
infos: make([]protocol.FileInfo, 0, maxBatchSizeFiles),
|
||||
flushFn: fn,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) append(f protocol.FileInfo) {
|
||||
b.infos = append(b.infos, f)
|
||||
b.size += f.ProtoSize()
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) flushIfFull() error {
|
||||
if len(b.infos) == maxBatchSizeFiles || b.size > maxBatchSizeBytes {
|
||||
return b.flush()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) flush() error {
|
||||
if len(b.infos) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := b.flushFn(b.infos); err != nil {
|
||||
return err
|
||||
}
|
||||
b.reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) reset() {
|
||||
b.infos = b.infos[:0]
|
||||
b.size = 0
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user