Implement IgnorePerms
This commit is contained in:
112
model/puller.go
112
model/puller.go
@@ -63,8 +63,7 @@ var errNoNode = errors.New("no available source node")
|
||||
|
||||
type puller struct {
|
||||
cfg *config.Configuration
|
||||
repo string
|
||||
dir string
|
||||
repoCfg config.RepositoryConfiguration
|
||||
bq *blockQueue
|
||||
model *Model
|
||||
oustandingPerNode activityMap
|
||||
@@ -74,11 +73,10 @@ type puller struct {
|
||||
requestResults chan requestResult
|
||||
}
|
||||
|
||||
func newPuller(repo, dir string, model *Model, slots int, cfg *config.Configuration) *puller {
|
||||
func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
|
||||
p := &puller{
|
||||
repoCfg: repoCfg,
|
||||
cfg: cfg,
|
||||
repo: repo,
|
||||
dir: dir,
|
||||
bq: newBlockQueue(),
|
||||
model: model,
|
||||
oustandingPerNode: make(activityMap),
|
||||
@@ -94,13 +92,13 @@ func newPuller(repo, dir string, model *Model, slots int, cfg *config.Configurat
|
||||
p.requestSlots <- true
|
||||
}
|
||||
if debug {
|
||||
l.Debugf("starting puller; repo %q dir %q slots %d", repo, dir, slots)
|
||||
l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
|
||||
}
|
||||
go p.run()
|
||||
} else {
|
||||
// Read only
|
||||
if debug {
|
||||
l.Debugf("starting puller; repo %q dir %q (read only)", repo, dir)
|
||||
l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
|
||||
}
|
||||
go p.runRO()
|
||||
}
|
||||
@@ -114,7 +112,7 @@ func (p *puller) run() {
|
||||
<-p.requestSlots
|
||||
b := p.bq.get()
|
||||
if debug {
|
||||
l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repo, b.file.Name, b.block.Offset, len(b.copy))
|
||||
l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
|
||||
}
|
||||
p.blocks <- b
|
||||
}
|
||||
@@ -130,13 +128,13 @@ func (p *puller) run() {
|
||||
for {
|
||||
select {
|
||||
case res := <-p.requestResults:
|
||||
p.model.setState(p.repo, RepoSyncing)
|
||||
p.model.setState(p.repoCfg.ID, RepoSyncing)
|
||||
changed = true
|
||||
p.requestSlots <- true
|
||||
p.handleRequestResult(res)
|
||||
|
||||
case b := <-p.blocks:
|
||||
p.model.setState(p.repo, RepoSyncing)
|
||||
p.model.setState(p.repoCfg.ID, RepoSyncing)
|
||||
changed = true
|
||||
if p.handleBlock(b) {
|
||||
// Block was fully handled, free up the slot
|
||||
@@ -149,7 +147,7 @@ func (p *puller) run() {
|
||||
break pull
|
||||
}
|
||||
if debug {
|
||||
l.Debugf("%q: idle but have %d open files", p.repo, len(p.openFiles))
|
||||
l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
|
||||
i := 5
|
||||
for _, f := range p.openFiles {
|
||||
l.Debugf(" %v", f)
|
||||
@@ -163,22 +161,22 @@ func (p *puller) run() {
|
||||
}
|
||||
|
||||
if changed {
|
||||
p.model.setState(p.repo, RepoCleaning)
|
||||
p.model.setState(p.repoCfg.ID, RepoCleaning)
|
||||
p.fixupDirectories()
|
||||
changed = false
|
||||
}
|
||||
|
||||
p.model.setState(p.repo, RepoIdle)
|
||||
p.model.setState(p.repoCfg.ID, RepoIdle)
|
||||
|
||||
// Do a rescan if it's time for it
|
||||
select {
|
||||
case <-walkTicker:
|
||||
if debug {
|
||||
l.Debugf("%q: time for rescan", p.repo)
|
||||
l.Debugf("%q: time for rescan", p.repoCfg.ID)
|
||||
}
|
||||
err := p.model.ScanRepo(p.repo)
|
||||
err := p.model.ScanRepo(p.repoCfg.ID)
|
||||
if err != nil {
|
||||
invalidateRepo(p.cfg, p.repo, err)
|
||||
invalidateRepo(p.cfg, p.repoCfg.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -195,11 +193,11 @@ func (p *puller) runRO() {
|
||||
|
||||
for _ = range walkTicker {
|
||||
if debug {
|
||||
l.Debugf("%q: time for rescan", p.repo)
|
||||
l.Debugf("%q: time for rescan", p.repoCfg.ID)
|
||||
}
|
||||
err := p.model.ScanRepo(p.repo)
|
||||
err := p.model.ScanRepo(p.repoCfg.ID)
|
||||
if err != nil {
|
||||
invalidateRepo(p.cfg, p.repo, err)
|
||||
invalidateRepo(p.cfg, p.repoCfg.ID, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -214,7 +212,7 @@ func (p *puller) fixupDirectories() {
|
||||
return nil
|
||||
}
|
||||
|
||||
rn, err := filepath.Rel(p.dir, path)
|
||||
rn, err := filepath.Rel(p.repoCfg.Directory, path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -223,7 +221,7 @@ func (p *puller) fixupDirectories() {
|
||||
return nil
|
||||
}
|
||||
|
||||
cur := p.model.CurrentRepoFile(p.repo, rn)
|
||||
cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
|
||||
if cur.Name != rn {
|
||||
// No matching dir in current list; weird
|
||||
if debug {
|
||||
@@ -276,7 +274,7 @@ func (p *puller) fixupDirectories() {
|
||||
for {
|
||||
deleteDirs = nil
|
||||
changed = 0
|
||||
filepath.Walk(p.dir, walkFn)
|
||||
filepath.Walk(p.repoCfg.Directory, walkFn)
|
||||
|
||||
var deleted = 0
|
||||
// Delete any queued directories
|
||||
@@ -320,7 +318,7 @@ func (p *puller) handleRequestResult(res requestResult) {
|
||||
p.openFiles[f.Name] = of
|
||||
|
||||
if debug {
|
||||
l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repo, f.Name, res.offset, of.outstanding, of.done)
|
||||
l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
|
||||
}
|
||||
|
||||
if of.done && of.outstanding == 0 {
|
||||
@@ -338,7 +336,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
|
||||
// Deleted directories we mark as handled and delete later.
|
||||
if protocol.IsDirectory(f.Flags) {
|
||||
if !protocol.IsDeleted(f.Flags) {
|
||||
path := filepath.Join(p.dir, f.Name)
|
||||
path := filepath.Join(p.repoCfg.Directory, f.Name)
|
||||
_, err := os.Stat(path)
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
if debug {
|
||||
@@ -352,7 +350,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
|
||||
} else if debug {
|
||||
l.Debugf("ignore delete dir: %v", f)
|
||||
}
|
||||
p.model.updateLocal(p.repo, f)
|
||||
p.model.updateLocal(p.repoCfg.ID, f)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -361,12 +359,12 @@ func (p *puller) handleBlock(b bqBlock) bool {
|
||||
|
||||
if !ok {
|
||||
if debug {
|
||||
l.Debugf("pull: %q: opening file %q", p.repo, f.Name)
|
||||
l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
|
||||
}
|
||||
|
||||
of.availability = uint64(p.model.repoFiles[p.repo].Availability(f.Name))
|
||||
of.filepath = filepath.Join(p.dir, f.Name)
|
||||
of.temp = filepath.Join(p.dir, defTempNamer.TempName(f.Name))
|
||||
of.availability = uint64(p.model.repoFiles[p.repoCfg.ID].Availability(f.Name))
|
||||
of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
|
||||
of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
|
||||
|
||||
dirName := filepath.Dir(of.filepath)
|
||||
_, err := os.Stat(dirName)
|
||||
@@ -374,13 +372,13 @@ func (p *puller) handleBlock(b bqBlock) bool {
|
||||
err = os.MkdirAll(dirName, 0777)
|
||||
}
|
||||
if err != nil {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
|
||||
}
|
||||
|
||||
of.file, of.err = os.Create(of.temp)
|
||||
if of.err != nil {
|
||||
if debug {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
|
||||
}
|
||||
if !b.last {
|
||||
p.openFiles[f.Name] = of
|
||||
@@ -393,7 +391,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
|
||||
if of.err != nil {
|
||||
// We have already failed this file.
|
||||
if debug {
|
||||
l.Debugf("pull: error: %q / %q has already failed: %v", p.repo, f.Name, of.err)
|
||||
l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
|
||||
}
|
||||
if b.last {
|
||||
delete(p.openFiles, f.Name)
|
||||
@@ -424,14 +422,14 @@ func (p *puller) handleCopyBlock(b bqBlock) {
|
||||
of := p.openFiles[f.Name]
|
||||
|
||||
if debug {
|
||||
l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repo, f.Name)
|
||||
l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
|
||||
}
|
||||
|
||||
var exfd *os.File
|
||||
exfd, of.err = os.Open(of.filepath)
|
||||
if of.err != nil {
|
||||
if debug {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
|
||||
}
|
||||
of.file.Close()
|
||||
of.file = nil
|
||||
@@ -450,7 +448,7 @@ func (p *puller) handleCopyBlock(b bqBlock) {
|
||||
buffers.Put(bs)
|
||||
if of.err != nil {
|
||||
if debug {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
|
||||
}
|
||||
exfd.Close()
|
||||
of.file.Close()
|
||||
@@ -493,10 +491,10 @@ func (p *puller) handleRequestBlock(b bqBlock) bool {
|
||||
|
||||
go func(node string, b bqBlock) {
|
||||
if debug {
|
||||
l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repo, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
|
||||
l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
|
||||
}
|
||||
|
||||
bs, err := p.model.requestGlobal(node, p.repo, f.Name, b.block.Offset, int(b.block.Size), nil)
|
||||
bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
|
||||
p.requestResults <- requestResult{
|
||||
node: node,
|
||||
file: f,
|
||||
@@ -527,24 +525,24 @@ func (p *puller) handleEmptyBlock(b bqBlock) {
|
||||
os.Remove(of.temp)
|
||||
os.Chmod(of.filepath, 0666)
|
||||
if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
|
||||
p.model.updateLocal(p.repo, f)
|
||||
p.model.updateLocal(p.repoCfg.ID, f)
|
||||
}
|
||||
} else {
|
||||
if debug {
|
||||
l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repo, f.Name)
|
||||
l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
|
||||
}
|
||||
t := time.Unix(f.Modified, 0)
|
||||
if os.Chtimes(of.temp, t, t) != nil {
|
||||
delete(p.openFiles, f.Name)
|
||||
return
|
||||
}
|
||||
if protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
|
||||
if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
|
||||
delete(p.openFiles, f.Name)
|
||||
return
|
||||
}
|
||||
defTempNamer.Show(of.temp)
|
||||
if Rename(of.temp, of.filepath) == nil {
|
||||
p.model.updateLocal(p.repo, f)
|
||||
p.model.updateLocal(p.repoCfg.ID, f)
|
||||
}
|
||||
}
|
||||
delete(p.openFiles, f.Name)
|
||||
@@ -552,8 +550,8 @@ func (p *puller) handleEmptyBlock(b bqBlock) {
|
||||
|
||||
func (p *puller) queueNeededBlocks() {
|
||||
queued := 0
|
||||
for _, f := range p.model.NeedFilesRepo(p.repo) {
|
||||
lf := p.model.CurrentRepoFile(p.repo, f.Name)
|
||||
for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
|
||||
lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
|
||||
have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
|
||||
if debug {
|
||||
l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
|
||||
@@ -566,13 +564,13 @@ func (p *puller) queueNeededBlocks() {
|
||||
})
|
||||
}
|
||||
if debug && queued > 0 {
|
||||
l.Debugf("%q: queued %d blocks", p.repo, queued)
|
||||
l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *puller) closeFile(f scanner.File) {
|
||||
if debug {
|
||||
l.Debugf("pull: closing %q / %q", p.repo, f.Name)
|
||||
l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
|
||||
}
|
||||
|
||||
of := p.openFiles[f.Name]
|
||||
@@ -584,7 +582,7 @@ func (p *puller) closeFile(f scanner.File) {
|
||||
fd, err := os.Open(of.temp)
|
||||
if err != nil {
|
||||
if debug {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -593,31 +591,37 @@ func (p *puller) closeFile(f scanner.File) {
|
||||
|
||||
if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
|
||||
if debug {
|
||||
l.Debugf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1)
|
||||
l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for i := range hb {
|
||||
if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
|
||||
l.Debugf("pull: %q / %q: block %d hash mismatch", p.repo, f.Name, i)
|
||||
l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t := time.Unix(f.Modified, 0)
|
||||
os.Chtimes(of.temp, t, t)
|
||||
if protocol.HasPermissionBits(f.Flags) {
|
||||
os.Chmod(of.temp, os.FileMode(f.Flags&0777))
|
||||
err = os.Chtimes(of.temp, t, t)
|
||||
if debug && err != nil {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
|
||||
}
|
||||
if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
|
||||
err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
|
||||
if debug && err != nil {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
|
||||
}
|
||||
}
|
||||
defTempNamer.Show(of.temp)
|
||||
if debug {
|
||||
l.Debugf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath)
|
||||
l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
|
||||
}
|
||||
if err := Rename(of.temp, of.filepath); err == nil {
|
||||
p.model.updateLocal(p.repo, f)
|
||||
p.model.updateLocal(p.repoCfg.ID, f)
|
||||
} else {
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
|
||||
l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user