diff --git a/internal/model/model.go b/internal/model/model.go index 52f553e6..1f4daf10 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -38,10 +38,12 @@ import ( // How many files to send in each Index/IndexUpdate message. const ( - indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed) - indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos - IndexPerBlockSize = 40 // Each BlockInfo is approximately this big - indexBatchSize = 1000 // Either way, don't include more files than this + indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed) + indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos + indexPerBlockSize = 40 // Each BlockInfo is approximately this big + indexBatchSize = 1000 // Either way, don't include more files than this + reqValidationTime = time.Hour // How long to cache validation entries for Request messages + reqValidationCacheSize = 1000 // How many entries to aim for in the validation cache size ) type service interface { @@ -86,6 +88,9 @@ type Model struct { addedFolder bool started bool + + reqValidationCache map[string]time.Time // folder / file name => time when confirmed to exist + rvmut sync.RWMutex // protects reqValidationCache } var ( @@ -97,29 +102,31 @@ var ( // for file data without altering the local folder in any way. func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model { m := &Model{ - cfg: cfg, - db: ldb, - finder: db.NewBlockFinder(ldb, cfg), - progressEmitter: NewProgressEmitter(cfg), - id: id, - shortID: id.Short(), - deviceName: deviceName, - clientName: clientName, - clientVersion: clientVersion, - folderCfgs: make(map[string]config.FolderConfiguration), - folderFiles: make(map[string]*db.FileSet), - folderDevices: make(map[string][]protocol.DeviceID), - deviceFolders: make(map[protocol.DeviceID][]string), - deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), - folderIgnores: make(map[string]*ignore.Matcher), - folderRunners: make(map[string]service), - folderStatRefs: make(map[string]*stats.FolderStatisticsReference), - protoConn: make(map[protocol.DeviceID]protocol.Connection), - rawConn: make(map[protocol.DeviceID]io.Closer), - deviceVer: make(map[protocol.DeviceID]string), + cfg: cfg, + db: ldb, + finder: db.NewBlockFinder(ldb, cfg), + progressEmitter: NewProgressEmitter(cfg), + id: id, + shortID: id.Short(), + deviceName: deviceName, + clientName: clientName, + clientVersion: clientVersion, + folderCfgs: make(map[string]config.FolderConfiguration), + folderFiles: make(map[string]*db.FileSet), + folderDevices: make(map[string][]protocol.DeviceID), + deviceFolders: make(map[protocol.DeviceID][]string), + deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), + folderIgnores: make(map[string]*ignore.Matcher), + folderRunners: make(map[string]service), + folderStatRefs: make(map[string]*stats.FolderStatisticsReference), + protoConn: make(map[protocol.DeviceID]protocol.Connection), + rawConn: make(map[protocol.DeviceID]io.Closer), + deviceVer: make(map[protocol.DeviceID]string), + reqValidationCache: make(map[string]time.Time), - fmut: sync.NewRWMutex(), - pmut: sync.NewRWMutex(), + fmut: sync.NewRWMutex(), + pmut: sync.NewRWMutex(), + rvmut: sync.NewRWMutex(), } if cfg.Options().ProgressUpdateIntervalS > -1 { go m.progressEmitter.Serve() @@ -729,33 +736,61 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset return nil, fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags) } - // Verify that the requested file exists in the local model. - m.fmut.RLock() - folderFiles, ok := m.folderFiles[folder] - m.fmut.RUnlock() + // Verify that the requested file exists in the local model. We only need + // to validate this file if we haven't done so recently, so we keep a + // cache of successfull results. "Recently" can be quite a long time, as + // we remove validation cache entries when we detect local changes. If + // we're out of sync here and the file actually doesn't exist any more, or + // has shrunk or something, then we'll anyway get a read error that we + // pass on to the other side. - if !ok { - l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder) - return nil, protocol.ErrNoSuchFile - } + m.rvmut.RLock() + validated := m.reqValidationCache[folder+"/"+name] + m.rvmut.RUnlock() - lf, ok := folderFiles.Get(protocol.LocalDeviceID, name) - if !ok { - return nil, protocol.ErrNoSuchFile - } + if time.Since(validated) > reqValidationTime { + m.fmut.RLock() + folderFiles, ok := m.folderFiles[folder] + m.fmut.RUnlock() - if lf.IsInvalid() || lf.IsDeleted() { - if debug { - l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf) + if !ok { + l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder) + return nil, protocol.ErrNoSuchFile } - return nil, protocol.ErrInvalid - } - if offset > lf.Size() { - if debug { - l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size) + // This call is really expensive for large files, as we load the full + // block list which may be megabytes and megabytes of data to allocate + // space for, read, and deserialize. + lf, ok := folderFiles.Get(protocol.LocalDeviceID, name) + if !ok { + return nil, protocol.ErrNoSuchFile } - return nil, protocol.ErrNoSuchFile + + if lf.IsInvalid() || lf.IsDeleted() { + if debug { + l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf) + } + return nil, protocol.ErrInvalid + } + + if offset > lf.Size() { + if debug { + l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size) + } + return nil, protocol.ErrNoSuchFile + } + + m.rvmut.Lock() + m.reqValidationCache[folder+"/"+name] = time.Now() + if len(m.reqValidationCache) > reqValidationCacheSize { + // Don't let the cache grow infinitely + for name, validated := range m.reqValidationCache { + if time.Since(validated) > time.Minute { + delete(m.reqValidationCache, name) + } + } + } + m.rvmut.Unlock() } if debug && deviceID != protocol.LocalDeviceID { @@ -767,7 +802,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset var reader io.ReaderAt var err error - if lf.IsSymlink() { + if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 { target, _, err := symlinks.Read(fn) if err != nil { return nil, err @@ -1048,7 +1083,7 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold } batch = append(batch, f) - currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize + currentBatchSize += indexPerFileSize + len(f.Blocks)*indexPerBlockSize return true }) @@ -1071,6 +1106,11 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) { m.fmut.RLock() m.folderFiles[folder].Update(protocol.LocalDeviceID, fs) m.fmut.RUnlock() + m.rvmut.Lock() + for _, f := range fs { + delete(m.reqValidationCache, folder+"/"+f.Name) + } + m.rvmut.Unlock() events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{ "folder": folder, diff --git a/test/transfer-bench_test.go b/test/transfer-bench_test.go index db27447a..9f40f3fe 100644 --- a/test/transfer-bench_test.go +++ b/test/transfer-bench_test.go @@ -10,6 +10,8 @@ package integration import ( "log" + "os" + "runtime" "syscall" "testing" "time" @@ -19,8 +21,23 @@ func TestBenchmarkTransferManyFiles(t *testing.T) { benchmarkTransfer(t, 50000, 15) } -func TestBenchmarkTransferLargeFiles(t *testing.T) { - benchmarkTransfer(t, 200, 28) +func TestBenchmarkTransferLargeFile1G(t *testing.T) { + benchmarkTransfer(t, 1, 30) +} +func TestBenchmarkTransferLargeFile2G(t *testing.T) { + benchmarkTransfer(t, 1, 31) +} +func TestBenchmarkTransferLargeFile4G(t *testing.T) { + benchmarkTransfer(t, 1, 32) +} +func TestBenchmarkTransferLargeFile8G(t *testing.T) { + benchmarkTransfer(t, 1, 33) +} +func TestBenchmarkTransferLargeFile16G(t *testing.T) { + benchmarkTransfer(t, 1, 34) +} +func TestBenchmarkTransferLargeFile32G(t *testing.T) { + benchmarkTransfer(t, 1, 35) } func benchmarkTransfer(t *testing.T, files, sizeExp int) { @@ -31,7 +48,20 @@ func benchmarkTransfer(t *testing.T, files, sizeExp int) { } log.Println("Generating files...") - err = generateFiles("s1", files, sizeExp, "../LICENSE") + if files == 1 { + // Special case. Generate one file with the specified size exactly. + fd, err := os.Open("../LICENSE") + if err != nil { + t.Fatal(err) + } + err = os.MkdirAll("s1", 0755) + if err != nil { + t.Fatal(err) + } + err = generateOneFile(fd, "s1/onefile", 1< s { a = s } - s += rand.Intn(a) + s += rand.Int63n(a) - src := io.LimitReader(&inifiteReader{fd}, int64(s)) - - p1 := filepath.Join(p0, n) - dst, err := os.Create(p1) - if err != nil { + if err := generateOneFile(fd, p1, s); err != nil { return err } + } - _, err = io.Copy(dst, src) - if err != nil { - return err - } + return nil +} - err = dst.Close() - if err != nil { - return err - } +func generateOneFile(fd io.ReadSeeker, p1 string, s int64) error { + src := io.LimitReader(&inifiteReader{fd}, int64(s)) + dst, err := os.Create(p1) + if err != nil { + return err + } - _ = os.Chmod(p1, os.FileMode(rand.Intn(0777)|0400)) + _, err = io.Copy(dst, src) + if err != nil { + return err + } - t := time.Now().Add(-time.Duration(rand.Intn(30*86400)) * time.Second) - err = os.Chtimes(p1, t, t) - if err != nil { - return err - } + err = dst.Close() + if err != nil { + return err + } + + _ = os.Chmod(p1, os.FileMode(rand.Intn(0777)|0400)) + + t := time.Now().Add(-time.Duration(rand.Intn(30*86400)) * time.Second) + err = os.Chtimes(p1, t, t) + if err != nil { + return err } return nil @@ -367,6 +375,7 @@ type fileInfo struct { mode os.FileMode mod int64 hash [16]byte + size int64 } func (f fileInfo) String() string { @@ -428,6 +437,7 @@ func startWalker(dir string, res chan<- fileInfo, abort <-chan struct{}) chan er name: rn, mode: info.Mode(), mod: info.ModTime().Unix(), + size: info.Size(), } sum, err := md5file(path) if err != nil {