Tweak locking and integration test.

This commit is contained in:
Jakob Borg
2014-01-20 22:22:27 +01:00
parent b67443eb40
commit d9ffd359e2
7 changed files with 122 additions and 110 deletions

View File

@@ -1,16 +1,5 @@
package model
/*
Locking
=======
The model has read and write locks. These must be acquired as appropriate by
public methods. To prevent deadlock situations, private methods should never
acquire locks, but document what locks they require.
*/
import (
"crypto/sha1"
"errors"
@@ -40,7 +29,9 @@ type Model struct {
rawConn map[string]io.Closer
pmut sync.RWMutex // protects protoConn and rawConn
fq FileQueue // queue for files to fetch
// Queue for files to fetch. fq can call back into the model, so we must ensure
// to hold no locks when calling methods on fq.
fq *FileQueue
dq chan File // queue for files to delete
updatedLocal int64 // timestamp of last update to local
@@ -100,6 +91,7 @@ func NewModel(dir string) *Model {
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
fileWasSuppressed: make(map[string]int),
fq: NewFileQueue(),
dq: make(chan File),
}
@@ -157,6 +149,13 @@ func (m *Model) Generation() int64 {
return m.updatedLocal + m.updateGlobal
}
func (m *Model) LocalAge() float64 {
m.umut.RLock()
defer m.umut.RUnlock()
return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
}
type ConnectionInfo struct {
protocol.Statistics
Address string
@@ -241,9 +240,11 @@ func (m *Model) InSyncSize() (files, bytes int) {
// NeedFiles returns the list of currently needed files and the total size.
func (m *Model) NeedFiles() (files []File, bytes int) {
qf := m.fq.QueuedFiles()
m.gmut.RLock()
for _, n := range m.fq.QueuedFiles() {
for _, n := range qf {
f := m.global[n]
files = append(files, f)
bytes += f.Size()
@@ -320,9 +321,11 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
repo[f.Name] = fileFromFileInfo(f)
}
// Close removes the peer from the model and closes the underlyign connection if possible.
// Close removes the peer from the model and closes the underlying connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) {
m.fq.RemoveAvailable(node)
m.pmut.Lock()
m.rmut.Lock()
@@ -334,7 +337,6 @@ func (m *Model) Close(node string, err error) {
delete(m.remote, node)
delete(m.protoConn, node)
delete(m.rawConn, node)
m.fq.RemoveAvailable(node)
m.rmut.Unlock()
m.pmut.Unlock()
@@ -471,37 +473,42 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
protoConn.Index(idx)
}()
if m.rwRunning {
for i := 0; i < m.parallellRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
log.Println("PULL: Starting", nodeID, i)
}
for {
m.pmut.RLock()
if _, ok := m.protoConn[nodeID]; !ok {
if m.trace["pull"] {
log.Println("PULL: Exiting", nodeID, i)
}
m.pmut.RUnlock()
return
m.initmut.Lock()
rw := m.rwRunning
m.initmut.Unlock()
if !rw {
return
}
for i := 0; i < m.parallellRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
log.Println("PULL: Starting", nodeID, i)
}
for {
m.pmut.RLock()
if _, ok := m.protoConn[nodeID]; !ok {
if m.trace["pull"] {
log.Println("PULL: Exiting", nodeID, i)
}
m.pmut.RUnlock()
qb, ok := m.fq.Get(nodeID)
if ok {
if m.trace["pull"] {
log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
}
data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
m.fq.Done(qb.name, qb.block.Offset, data)
} else {
time.Sleep(1 * time.Second)
}
return
}
}()
}
m.pmut.RUnlock()
qb, ok := m.fq.Get(nodeID)
if ok {
if m.trace["pull"] {
log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
}
data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
m.fq.Done(qb.name, qb.block.Offset, data)
} else {
time.Sleep(1 * time.Second)
}
}
}()
}
}
@@ -606,7 +613,6 @@ func (m *Model) broadcastIndexLoop() {
}
// markDeletedLocals sets the deleted flag on files that have gone missing locally.
// Must be called with the write lock held.
func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
// For every file in the existing local table, check if they are also
// present in the new local table. If they are not, check that we already
@@ -661,7 +667,6 @@ func (m *Model) updateLocal(f File) {
}
}
// Must be called with the write lock held.
func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File)
@@ -671,23 +676,29 @@ func (m *Model) recomputeGlobal() {
}
m.lmut.RUnlock()
var available = make(map[string][]string)
m.rmut.RLock()
var highestMod int64
for nodeID, fs := range m.remote {
for n, nf := range fs {
if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
newGlobal[n] = nf
m.fq.SetAvailable(n, nodeID)
available[n] = []string{nodeID}
if nf.Modified > highestMod {
highestMod = nf.Modified
}
} else if lf.Equals(nf) {
m.fq.AddAvailable(n, nodeID)
available[n] = append(available[n], nodeID)
}
}
}
m.rmut.RUnlock()
for f, ns := range available {
m.fq.SetAvailable(f, ns)
}
// Figure out if anything actually changed
m.gmut.RLock()
@@ -727,10 +738,6 @@ func (m *Model) recomputeNeed() {
m.gmut.RLock()
for n, gf := range m.global {
if m.fq.Queued(n) {
continue
}
m.lmut.RLock()
lf, ok := m.local[n]
m.lmut.RUnlock()
@@ -771,7 +778,9 @@ func (m *Model) recomputeNeed() {
m.gmut.RUnlock()
for _, ao := range toAdd {
m.fq.Add(ao.n, ao.remote, ao.fm)
if !m.fq.Queued(ao.n) {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
}
for _, gf := range toDelete {
m.dq <- gf