all: Implement variable sized blocks (fixes #4807)

This commit is contained in:
Jakob Borg
2018-04-16 20:08:50 +02:00
committed by Audrius Butkevicius
parent 01aef75c96
commit 19c7cd99f5
27 changed files with 536 additions and 293 deletions

View File

@@ -626,7 +626,7 @@ func (m *Model) Completion(device protocol.DeviceID, folder string) FolderComple
}
// This might might be more than it really is, because some blocks can be of a smaller size.
downloaded = int64(counts[ft.Name] * protocol.BlockSize)
downloaded = int64(counts[ft.Name] * int(ft.BlockSize()))
fileNeed = ft.FileSize() - downloaded
if fileNeed < 0 {
@@ -1968,7 +1968,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
Folder: folderCfg.ID,
Subs: subDirs,
Matcher: ignores,
BlockSize: protocol.BlockSize,
TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{m, folder},
Filesystem: mtimefs,
@@ -1978,6 +1977,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
ShortID: m.shortID,
ProgressTickIntervalS: folderCfg.ScanProgressIntervalS,
UseWeakHashes: weakhash.Enabled,
UseLargeBlocks: folderCfg.UseLargeBlocks,
})
if err := runner.CheckHealth(); err != nil {
@@ -2513,7 +2513,7 @@ func (m *Model) RestoreFolderVersions(folder string, versions map[string]time.Ti
return errors, nil
}
func (m *Model) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []Availability {
func (m *Model) Availability(folder string, file protocol.FileInfo, block protocol.BlockInfo) []Availability {
// The slightly unusual locking sequence here is because we need to hold
// pmut for the duration (as the value returned from foldersFiles can
// get heavily modified on Close()), but also must acquire fmut before
@@ -2532,7 +2532,7 @@ func (m *Model) Availability(folder, file string, version protocol.Vector, block
var availabilities []Availability
next:
for _, device := range fs.Availability(file) {
for _, device := range fs.Availability(file.Name) {
for _, pausedFolder := range m.remotePausedFolders[device] {
if pausedFolder == folder {
continue next
@@ -2545,7 +2545,7 @@ next:
}
for device := range devices {
if m.deviceDownloads[device].Has(folder, file, version, int32(block.Offset/protocol.BlockSize)) {
if m.deviceDownloads[device].Has(folder, file.Name, file.Version, int32(block.Offset/int64(file.BlockSize()))) {
availabilities = append(availabilities, Availability{ID: device, FromTemporary: true})
}
}

View File

@@ -177,7 +177,7 @@ func TestRequest(t *testing.T) {
defer m.Stop()
m.ScanFolder("default")
bs := make([]byte, protocol.BlockSize)
bs := make([]byte, protocol.MinBlockSize)
// Existing, shared file
bs = bs[:6]
@@ -409,20 +409,22 @@ func (f *fakeConnection) addFile(name string, flags uint32, ftype protocol.FileI
f.mut.Lock()
defer f.mut.Unlock()
blocks, _ := scanner.Blocks(context.TODO(), bytes.NewReader(data), protocol.BlockSize, int64(len(data)), nil, true)
blockSize := protocol.BlockSize(int64(len(data)))
blocks, _ := scanner.Blocks(context.TODO(), bytes.NewReader(data), blockSize, int64(len(data)), nil, true)
var version protocol.Vector
version = version.Update(f.id.Short())
if ftype == protocol.FileInfoTypeFile || ftype == protocol.FileInfoTypeDirectory {
f.files = append(f.files, protocol.FileInfo{
Name: name,
Type: ftype,
Size: int64(len(data)),
ModifiedS: time.Now().Unix(),
Permissions: flags,
Version: version,
Sequence: time.Now().UnixNano(),
Blocks: blocks,
Name: name,
Type: ftype,
Size: int64(len(data)),
ModifiedS: time.Now().Unix(),
Permissions: flags,
Version: version,
Sequence: time.Now().UnixNano(),
RawBlockSize: int32(blockSize),
Blocks: blocks,
})
} else {
// Symlink
@@ -2803,7 +2805,7 @@ func TestNoRequestsFromPausedDevices(t *testing.T) {
files.Update(device1, []protocol.FileInfo{file})
files.Update(device2, []protocol.FileInfo{file})
avail := m.Availability("default", file.Name, file.Version, file.Blocks[0])
avail := m.Availability("default", file, file.Blocks[0])
if len(avail) != 0 {
t.Errorf("should not be available, no connections")
}
@@ -2813,7 +2815,7 @@ func TestNoRequestsFromPausedDevices(t *testing.T) {
// !!! This is not what I'd expect to happen, as we don't even know if the peer has the original index !!!
avail = m.Availability("default", file.Name, file.Version, file.Blocks[0])
avail = m.Availability("default", file, file.Blocks[0])
if len(avail) != 2 {
t.Errorf("should have two available")
}
@@ -2833,7 +2835,7 @@ func TestNoRequestsFromPausedDevices(t *testing.T) {
m.ClusterConfig(device1, cc)
m.ClusterConfig(device2, cc)
avail = m.Availability("default", file.Name, file.Version, file.Blocks[0])
avail = m.Availability("default", file, file.Blocks[0])
if len(avail) != 2 {
t.Errorf("should have two available")
}
@@ -2841,7 +2843,7 @@ func TestNoRequestsFromPausedDevices(t *testing.T) {
m.Closed(&fakeConnection{id: device1}, errDeviceUnknown)
m.Closed(&fakeConnection{id: device2}, errDeviceUnknown)
avail = m.Availability("default", file.Name, file.Version, file.Blocks[0])
avail = m.Availability("default", file, file.Blocks[0])
if len(avail) != 0 {
t.Errorf("should have no available")
}
@@ -2856,7 +2858,7 @@ func TestNoRequestsFromPausedDevices(t *testing.T) {
ccp.Folders[0].Paused = true
m.ClusterConfig(device1, ccp)
avail = m.Availability("default", file.Name, file.Version, file.Blocks[0])
avail = m.Availability("default", file, file.Blocks[0])
if len(avail) != 1 {
t.Errorf("should have one available")
}

View File

@@ -126,7 +126,7 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers
if f.PullerMaxPendingKiB == 0 {
f.PullerMaxPendingKiB = defaultPullerPendingKiB
}
if blockSizeKiB := protocol.BlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
if blockSizeKiB := protocol.MaxBlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
f.PullerMaxPendingKiB = blockSizeKiB
}
@@ -1010,7 +1010,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
// Check for an old temporary file which might have some blocks we could
// reuse.
tempBlocks, err := scanner.HashFile(f.ctx, f.fs, tempName, protocol.BlockSize, nil, false)
tempBlocks, err := scanner.HashFile(f.ctx, f.fs, tempName, file.BlockSize(), nil, false)
if err == nil {
// Check for any reusable blocks in the temp file
tempCopyBlocks, _ := blockDiff(tempBlocks, file.Blocks)
@@ -1160,7 +1160,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo) error {
// copierRoutine reads copierStates until the in channel closes and performs
// the relevant copies when possible, or passes it to the puller routine.
func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
buf := make([]byte, protocol.BlockSize)
buf := make([]byte, protocol.MinBlockSize)
for state := range in {
dstFd, err := state.tempFile()
@@ -1203,7 +1203,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
if len(hashesToFind) > 0 {
file, err = f.fs.Open(state.file.Name)
if err == nil {
weakHashFinder, err = weakhash.NewFinder(file, protocol.BlockSize, hashesToFind)
weakHashFinder, err = weakhash.NewFinder(file, int(state.file.BlockSize()), hashesToFind)
if err != nil {
l.Debugln("weak hasher", err)
}
@@ -1268,7 +1268,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
return false
}
_, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
_, err = fd.ReadAt(buf, int64(state.file.BlockSize())*int64(index))
fd.Close()
if err != nil {
return false
@@ -1391,7 +1391,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
}
var lastError error
candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
candidates := f.model.Availability(f.folderID, state.file, state.block)
for {
// Select the least busy device to pull the block from. If we found no
// feasible device at all, fail the block (and in the long run, the

View File

@@ -256,7 +256,7 @@ func TestCopierFinder(t *testing.T) {
}
// Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(context.TODO(), fs.NewFilesystem(fs.FilesystemTypeBasic, "."), tempFile, protocol.BlockSize, nil, false)
blks, err := scanner.HashFile(context.TODO(), fs.NewFilesystem(fs.FilesystemTypeBasic, "."), tempFile, protocol.MinBlockSize, nil, false)
if err != nil {
t.Log(err)
}
@@ -273,8 +273,8 @@ func TestWeakHash(t *testing.T) {
tempFile := filepath.Join("testdata", fs.TempName("weakhash"))
var shift int64 = 10
var size int64 = 1 << 20
expectBlocks := int(size / protocol.BlockSize)
expectPulls := int(shift / protocol.BlockSize)
expectBlocks := int(size / protocol.MinBlockSize)
expectPulls := int(shift / protocol.MinBlockSize)
if shift > 0 {
expectPulls++
}
@@ -307,7 +307,7 @@ func TestWeakHash(t *testing.T) {
// File 1: abcdefgh
// File 2: xyabcdef
f.Seek(0, os.SEEK_SET)
existing, err := scanner.Blocks(context.TODO(), f, protocol.BlockSize, size, nil, true)
existing, err := scanner.Blocks(context.TODO(), f, protocol.MinBlockSize, size, nil, true)
if err != nil {
t.Error(err)
}
@@ -316,7 +316,7 @@ func TestWeakHash(t *testing.T) {
remainder := io.LimitReader(f, size-shift)
prefix := io.LimitReader(rand.Reader, shift)
nf := io.MultiReader(prefix, remainder)
desired, err := scanner.Blocks(context.TODO(), nf, protocol.BlockSize, size, nil, true)
desired, err := scanner.Blocks(context.TODO(), nf, protocol.MinBlockSize, size, nil, true)
if err != nil {
t.Error(err)
}

View File

@@ -217,7 +217,7 @@ func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
s.mut.Lock()
s.copyNeeded--
s.updated = time.Now()
s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
s.available = append(s.available, int32(block.Offset/int64(s.file.BlockSize())))
s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
s.mut.Unlock()
@@ -253,7 +253,7 @@ func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
s.mut.Lock()
s.pullNeeded--
s.updated = time.Now()
s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
s.available = append(s.available, int32(block.Offset/int64(s.file.BlockSize())))
s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
s.mut.Unlock()
@@ -314,8 +314,8 @@ func (s *sharedPullerState) Progress() *pullerProgress {
CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
Pulled: s.pullTotal - s.pullNeeded,
Pulling: s.pullNeeded,
BytesTotal: blocksToSize(total),
BytesDone: blocksToSize(done),
BytesTotal: blocksToSize(s.file.BlockSize(), total),
BytesDone: blocksToSize(s.file.BlockSize(), done),
}
}
@@ -343,9 +343,9 @@ func (s *sharedPullerState) Available() []int32 {
return blocks
}
func blocksToSize(num int) int64 {
func blocksToSize(size int, num int) int64 {
if num < 2 {
return protocol.BlockSize / 2
return int64(size / 2)
}
return int64(num-1)*protocol.BlockSize + protocol.BlockSize/2
return int64(num-1)*int64(size) + int64(size/2)
}