From d4199c2d08b6486b5e2007c312ff2bd502486de3 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Fri, 24 Oct 2014 23:20:08 +0100 Subject: [PATCH] Recover from corrupt block maps --- internal/files/blockmap.go | 12 ++++++ internal/files/blockmap_test.go | 31 +++++++++++++++ internal/model/puller.go | 39 +++++++++++++++---- internal/model/puller_test.go | 69 ++++++++++++++++++++++++++++++++- internal/scanner/blocks.go | 4 +- 5 files changed, 145 insertions(+), 10 deletions(-) diff --git a/internal/files/blockmap.go b/internal/files/blockmap.go index 1dd246b0..d678eda1 100644 --- a/internal/files/blockmap.go +++ b/internal/files/blockmap.go @@ -179,6 +179,18 @@ func (f *BlockFinder) Iterate(hash []byte, iterFn func(string, string, uint32) b return false } +// A method for repairing incorrect blockmap entries, removes the old entry +// and replaces it with a new entry for the given block +func (f *BlockFinder) Fix(folder, file string, index uint32, oldHash, newHash []byte) error { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, uint32(index)) + + batch := new(leveldb.Batch) + batch.Delete(toBlockKey(oldHash, folder, file)) + batch.Put(toBlockKey(newHash, folder, file), buf) + return f.db.Write(batch, nil) +} + // m.blockKey returns a byte slice encoding the following information: // keyTypeBlock (1 byte) // folder (64 bytes) diff --git a/internal/files/blockmap_test.go b/internal/files/blockmap_test.go index feb30c9d..57963986 100644 --- a/internal/files/blockmap_test.go +++ b/internal/files/blockmap_test.go @@ -235,3 +235,34 @@ func TestBlockFinderLookup(t *testing.T) { f1.Flags = 0 } + +func TestBlockFinderFix(t *testing.T) { + db, f := setup() + + iterFn := func(folder, file string, index uint32) bool { + return true + } + + m := NewBlockMap(db, "folder1") + err := m.Add([]protocol.FileInfo{f1}) + if err != nil { + t.Fatal(err) + } + + if !f.Iterate(f1.Blocks[0].Hash, iterFn) { + t.Fatal("Block not found") + } + + err = f.Fix("folder1", f1.Name, 0, f1.Blocks[0].Hash, f2.Blocks[0].Hash) + if err != nil { + t.Fatal(err) + } + + if f.Iterate(f1.Blocks[0].Hash, iterFn) { + t.Fatal("Unexpected block") + } + + if !f.Iterate(f2.Blocks[0].Hash, iterFn) { + t.Fatal("Block not found") + } +} diff --git a/internal/model/puller.go b/internal/model/puller.go index 9f01644e..8b4affe1 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -17,6 +17,7 @@ package model import ( "bytes" + "crypto/sha256" "errors" "fmt" "os" @@ -141,9 +142,16 @@ loop: } p.model.setState(p.folder, FolderSyncing) tries := 0 + checksum := false for { tries++ - changed := p.pullerIteration(copiersPerFolder, pullersPerFolder, finishersPerFolder) + // Last resort mode, to get around corrupt/invalid block maps. + if tries == 10 { + l.Infoln("Desperation mode ON") + checksum = true + } + + changed := p.pullerIteration(copiersPerFolder, pullersPerFolder, finishersPerFolder, checksum) if debug { l.Debugln(p, "changed", changed) } @@ -234,7 +242,7 @@ func (p *Puller) String() string { // finisher routines are used. It's seldom efficient to use more than one // copier routine, while multiple pullers are essential and multiple finishers // may be useful (they are primarily CPU bound due to hashing). -func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { +func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int, checksum bool) int { pullChan := make(chan pullBlockState) copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) @@ -247,7 +255,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { copyWg.Add(1) go func() { // copierRoutine finishes when copyChan is closed - p.copierRoutine(copyChan, pullChan, finisherChan) + p.copierRoutine(copyChan, pullChan, finisherChan, checksum) copyWg.Done() }() } @@ -549,7 +557,7 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) { // copierRoutine reads copierStates until the in channel closes and performs // the relevant copies when possible, or passes it to the puller routine. -func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { +func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState, checksum bool) { buf := make([]byte, protocol.BlockSize) nextFile: @@ -574,10 +582,10 @@ nextFile: } }() + hasher := sha256.New() for _, block := range state.blocks { buf = buf[:int(block.Size)] - - success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { + found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { path := filepath.Join(p.model.folderCfgs[folder].Path, file) var fd *os.File @@ -598,6 +606,23 @@ nextFile: return false } + // Only done on second to last puller attempt + if checksum { + hasher.Write(buf) + hash := hasher.Sum(nil) + hasher.Reset() + if !bytes.Equal(hash, block.Hash) { + if debug { + l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash) + } + err = p.model.finder.Fix(folder, file, index, block.Hash, hash) + if err != nil { + l.Warnln("finder fix:", err) + } + return false + } + } + _, err = dstFd.WriteAt(buf, block.Offset) if err != nil { state.earlyClose("dst write", err) @@ -612,7 +637,7 @@ nextFile: break } - if !success { + if !found { state.pullStarted() ps := pullBlockState{ sharedPullerState: state.sharedPullerState, diff --git a/internal/model/puller_test.go b/internal/model/puller_test.go index 451b694f..77b76127 100644 --- a/internal/model/puller_test.go +++ b/internal/model/puller_test.go @@ -210,7 +210,7 @@ func TestCopierFinder(t *testing.T) { finisherChan := make(chan *sharedPullerState, 1) // Run a single fetcher routine - go p.copierRoutine(copyChan, pullChan, finisherChan) + go p.copierRoutine(copyChan, pullChan, finisherChan, false) p.handleFile(requiredFile, copyChan, finisherChan) @@ -305,3 +305,70 @@ func TestCopierCleanup(t *testing.T) { t.Error("Expected block not found") } } + +// On the 10th iteration, we start hashing the content which we receive by +// following blockfinder's instructions. Make sure that the copier routine +// hashes the content when asked, and pulls if it fails to find the block. +func TestLastResortPulling(t *testing.T) { + fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"} + cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}} + + db, _ := leveldb.Open(storage.NewMemStorage(), nil) + m := NewModel(config.Wrap("/tmp/test", cfg), "device", "syncthing", "dev", db) + m.AddFolder(fcfg) + + // Add a file to index (with the incorrect block representation, as content + // doesn't actually match the block list) + file := protocol.FileInfo{ + Name: "empty", + Flags: 0, + Modified: 0, + Blocks: []protocol.BlockInfo{blocks[0]}, + } + m.updateLocal("default", file) + + // Pretend that we are handling a new file of the same content but + // with a different name (causing to copy that particular block) + file.Name = "newfile" + + iterFn := func(folder, file string, index uint32) bool { + return true + } + + // Check that that particular block is there + if !m.finder.Iterate(blocks[0].Hash, iterFn) { + t.Error("Expected block not found") + } + + p := Puller{ + folder: "default", + dir: "testdata", + model: m, + } + + copyChan := make(chan copyBlocksState) + pullChan := make(chan pullBlockState, 1) + finisherChan := make(chan *sharedPullerState, 1) + + // Run a single copier routine with checksumming enabled + go p.copierRoutine(copyChan, pullChan, finisherChan, true) + + p.handleFile(file, copyChan, finisherChan) + + // Copier should hash empty file, realise that the region it has read + // doesn't match the hash which was advertised by the block map, fix it + // and ask to pull the block. + <-pullChan + + // Verify that it did fix the incorrect hash. + if m.finder.Iterate(blocks[0].Hash, iterFn) { + t.Error("Found unexpected block") + } + + if !m.finder.Iterate(scanner.SHA256OfNothing, iterFn) { + t.Error("Expected block not found") + } + + (<-finisherChan).fd.Close() + os.Remove(filepath.Join("testdata", defTempNamer.TempName("newfile"))) +} diff --git a/internal/scanner/blocks.go b/internal/scanner/blocks.go index 9f441b8e..8360fcf2 100644 --- a/internal/scanner/blocks.go +++ b/internal/scanner/blocks.go @@ -24,7 +24,7 @@ import ( "github.com/syncthing/syncthing/internal/protocol" ) -var sha256OfNothing = []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55} +var SHA256OfNothing = []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55} // Blocks returns the blockwise hash of the reader. func Blocks(r io.Reader, blocksize int, sizehint int64) ([]protocol.BlockInfo, error) { @@ -61,7 +61,7 @@ func Blocks(r io.Reader, blocksize int, sizehint int64) ([]protocol.BlockInfo, e blocks = append(blocks, protocol.BlockInfo{ Offset: 0, Size: 0, - Hash: sha256OfNothing, + Hash: SHA256OfNothing, }) }