Fine grained locking

This commit is contained in:
Jakob Borg
2014-01-17 20:06:44 -07:00
parent fff50b5472
commit 4ac204b604
5 changed files with 213 additions and 117 deletions

View File

@@ -14,9 +14,10 @@ type Monitor interface {
type FileQueue struct {
files queuedFileList
lock sync.Mutex
sorted bool
fmut sync.Mutex // protects files and sorted
availability map[string][]string
amut sync.Mutex // protects availability
}
type queuedFile struct {
@@ -57,8 +58,14 @@ type queuedBlock struct {
}
func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
for _, f := range q.files {
if f.name == name {
panic("re-adding added file " + f.name)
}
}
q.files = append(q.files, queuedFile{
name: name,
@@ -72,15 +79,15 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
}
func (q *FileQueue) Len() int {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
return len(q.files)
}
func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
if !q.sorted {
sort.Sort(q.files)
@@ -90,7 +97,11 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
for i := range q.files {
qf := &q.files[i]
if len(q.availability[qf.name]) == 0 {
q.amut.Lock()
av := q.availability[qf.name]
q.amut.Unlock()
if len(av) == 0 {
// Noone has the file we want; abort.
if qf.remaining != len(qf.blocks) {
// We have already started on this file; close it down
@@ -103,7 +114,7 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
return queuedBlock{}, false
}
for _, ni := range q.availability[qf.name] {
for _, ni := range av {
// Find and return the next block in the queue
if ni == nodeID {
for j, b := range qf.blocks {
@@ -127,8 +138,8 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
}
func (q *FileQueue) Done(file string, offset int64, data []byte) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
c := content{
offset: offset,
@@ -167,8 +178,8 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
}
func (q *FileQueue) Queued(file string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
for _, qf := range q.files {
if qf.name == file {
@@ -179,8 +190,8 @@ func (q *FileQueue) Queued(file string) bool {
}
func (q *FileQueue) QueuedFiles() (files []string) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
for _, qf := range q.files {
files = append(files, qf.name)
@@ -202,8 +213,9 @@ func (q *FileQueue) deleteFile(n string) {
}
func (q *FileQueue) SetAvailable(file, node string) {
q.lock.Lock()
defer q.lock.Unlock()
q.amut.Lock()
defer q.amut.Unlock()
if q.availability == nil {
q.availability = make(map[string][]string)
}
@@ -211,8 +223,9 @@ func (q *FileQueue) SetAvailable(file, node string) {
}
func (q *FileQueue) AddAvailable(file, node string) {
q.lock.Lock()
defer q.lock.Unlock()
q.amut.Lock()
defer q.amut.Unlock()
if q.availability == nil {
q.availability = make(map[string][]string)
}
@@ -220,8 +233,9 @@ func (q *FileQueue) AddAvailable(file, node string) {
}
func (q *FileQueue) RemoveAvailable(toRemove string) {
q.lock.Lock()
defer q.lock.Unlock()
q.amut.Lock()
defer q.amut.Unlock()
for file, nodes := range q.availability {
for i, node := range nodes {
if node == toRemove {