Deadlock fix and cleanups

This commit is contained in:
Jakob Borg
2014-01-13 10:29:23 -07:00
parent f0b18685a5
commit ba0e4ded65
5 changed files with 67 additions and 33 deletions

View File

@@ -7,20 +7,16 @@ import (
"time"
)
type Resolver interface {
WhoHas(string) []string
}
type Monitor interface {
FileBegins(<-chan content) error
FileDone() error
}
type FileQueue struct {
resolver Resolver
files queuedFileList
lock sync.Mutex
sorted bool
files queuedFileList
lock sync.Mutex
sorted bool
availability map[string][]string
}
type queuedFile struct {
@@ -92,32 +88,28 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
}
for i := range q.files {
if time.Since(q.files[i].nodesChecked) > 5*time.Second {
// Refresh node list every now and then
q.files[i].nodes = q.resolver.WhoHas(q.files[i].name)
}
qf := &q.files[i]
if len(q.files[i].nodes) == 0 {
if len(q.availability[qf.name]) == 0 {
// Noone has the file we want; abort.
if q.files[i].remaining != len(q.files[i].blocks) {
if qf.remaining != len(qf.blocks) {
// We have already started on this file; close it down
close(q.files[i].channel)
if mon := q.files[i].monitor; mon != nil {
close(qf.channel)
if mon := qf.monitor; mon != nil {
mon.FileDone()
}
}
q.deleteIndex(i)
q.deleteAt(i)
return queuedBlock{}, false
}
qf := q.files[i]
for _, ni := range qf.nodes {
for _, ni := range q.availability[qf.name] {
// Find and return the next block in the queue
if ni == nodeID {
for j, b := range qf.blocks {
if !qf.activeBlocks[j] {
q.files[i].activeBlocks[j] = true
q.files[i].given++
qf.activeBlocks[j] = true
qf.given++
return queuedBlock{
name: qf.name,
block: b,
@@ -142,29 +134,31 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
offset: offset,
data: data,
}
for i, qf := range q.files {
for i := range q.files {
qf := &q.files[i]
if qf.name == file {
if qf.monitor != nil && qf.remaining == len(qf.blocks) {
err := qf.monitor.FileBegins(qf.channel)
if err != nil {
log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
q.deleteIndex(i)
q.deleteAt(i)
return
}
}
qf.channel <- c
q.files[i].remaining--
qf.remaining--
if q.files[i].remaining == 0 {
if qf.remaining == 0 {
close(qf.channel)
q.deleteIndex(i)
if qf.monitor != nil {
err := qf.monitor.FileDone()
if err != nil {
log.Printf("WARNING: %s: %v", qf.name, err)
}
}
q.deleteAt(i)
}
return
}
@@ -194,6 +188,24 @@ func (q *FileQueue) QueuedFiles() (files []string) {
return
}
func (q *FileQueue) deleteIndex(i int) {
func (q *FileQueue) deleteAt(i int) {
q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])]
}
func (q *FileQueue) SetAvailable(file, node string) {
q.lock.Lock()
defer q.lock.Unlock()
if q.availability == nil {
q.availability = make(map[string][]string)
}
q.availability[file] = []string{node}
}
func (q *FileQueue) AddAvailable(file, node string) {
q.lock.Lock()
defer q.lock.Unlock()
if q.availability == nil {
q.availability = make(map[string][]string)
}
q.availability[file] = append(q.availability[file], node)
}