From 1c757db153960695fe4fadb8979bc7268ea04598 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 30 Dec 2013 09:22:34 -0500 Subject: [PATCH] Avoid deadlock in index exchange by more fine grained locking --- protocol/messages_test.go | 16 ++++++++-------- protocol/protocol.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/protocol/messages_test.go b/protocol/messages_test.go index ebd291f4..b4d74fc4 100644 --- a/protocol/messages_test.go +++ b/protocol/messages_test.go @@ -32,10 +32,10 @@ func TestIndex(t *testing.T) { } var buf = new(bytes.Buffer) - var wr = marshalWriter{buf, 0, nil} + var wr = marshalWriter{w: buf} wr.writeIndex(idx) - var rd = marshalReader{buf, 0, nil} + var rd = marshalReader{r: buf} var idx2 = rd.readIndex() if !reflect.DeepEqual(idx, idx2) { @@ -47,9 +47,9 @@ func TestRequest(t *testing.T) { f := func(name string, offset uint64, size uint32, hash []byte) bool { var buf = new(bytes.Buffer) var req = request{name, offset, size, hash} - var wr = marshalWriter{buf, 0, nil} + var wr = marshalWriter{w: buf} wr.writeRequest(req) - var rd = marshalReader{buf, 0, nil} + var rd = marshalReader{r: buf} var req2 = rd.readRequest() return req.name == req2.name && req.offset == req2.offset && @@ -64,9 +64,9 @@ func TestRequest(t *testing.T) { func TestResponse(t *testing.T) { f := func(data []byte) bool { var buf = new(bytes.Buffer) - var wr = marshalWriter{buf, 0, nil} + var wr = marshalWriter{w: buf} wr.writeResponse(data) - var rd = marshalReader{buf, 0, nil} + var rd = marshalReader{r: buf} var read = rd.readResponse() return bytes.Compare(read, data) == 0 } @@ -98,7 +98,7 @@ func BenchmarkWriteIndex(b *testing.B) { }, } - var wr = marshalWriter{ioutil.Discard, 0, nil} + var wr = marshalWriter{w: ioutil.Discard} for i := 0; i < b.N; i++ { wr.writeIndex(idx) @@ -107,7 +107,7 @@ func BenchmarkWriteIndex(b *testing.B) { func BenchmarkWriteRequest(b *testing.B) { var req = request{"blah blah", 1231323, 13123123, []byte("hash hash hash")} - var wr = marshalWriter{ioutil.Discard, 0, nil} + var wr = marshalWriter{w: ioutil.Discard} for i := 0; i < b.N; i++ { wr.writeRequest(req) diff --git a/protocol/protocol.go b/protocol/protocol.go index b272b6da..d659cb84 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -45,6 +45,7 @@ type Model interface { type Connection struct { sync.RWMutex + ID string receiver Model reader io.Reader @@ -54,10 +55,12 @@ type Connection struct { closed bool awaiting map[int]chan asyncResult nextId int - lastReceive time.Time peerLatency time.Duration lastStatistics Statistics indexSent map[string]int64 + + lastReceive time.Time + lastReceiveLock sync.RWMutex } var ErrClosed = errors.New("Connection closed") @@ -234,9 +237,9 @@ func (c *Connection) readerLoop() { break } - c.Lock() + c.lastReceiveLock.Lock() c.lastReceive = time.Now() - c.Unlock() + c.lastReceiveLock.Unlock() switch hdr.msgType { case messageTypeIndex: @@ -334,9 +337,9 @@ func (c *Connection) processRequest(msgID int) { func (c *Connection) pingerLoop() { var rc = make(chan time.Duration, 1) for !c.isClosed() { - c.RLock() + c.lastReceiveLock.RLock() lr := c.lastReceive - c.RUnlock() + c.lastReceiveLock.RUnlock() if time.Since(lr) > pingIdleTime { go func() {