Mostly lock free receive loop

This commit is contained in:
Jakob Borg
2013-12-30 22:10:54 -05:00
parent fd56123acf
commit 632bcae856

View File

@@ -59,9 +59,6 @@ type Connection struct {
lastStatistics Statistics lastStatistics Statistics
statisticsLock sync.Mutex statisticsLock sync.Mutex
lastReceive time.Time
lastReceiveLock sync.RWMutex
} }
var ErrClosed = errors.New("Connection closed") var ErrClosed = errors.New("Connection closed")
@@ -71,8 +68,10 @@ type asyncResult struct {
err error err error
} }
const pingTimeout = 2 * time.Minute const (
const pingIdleTime = 5 * time.Minute pingTimeout = 2 * time.Minute
pingIdleTime = 5 * time.Minute
)
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection { func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection {
flrd := flate.NewReader(reader) flrd := flate.NewReader(reader)
@@ -88,7 +87,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
writer: flwr, writer: flwr,
mwriter: &marshalWriter{w: flwr}, mwriter: &marshalWriter{w: flwr},
awaiting: make(map[int]chan asyncResult), awaiting: make(map[int]chan asyncResult),
lastReceive: time.Now(),
ID: nodeID, ID: nodeID,
lastStatistics: Statistics{At: time.Now()}, lastStatistics: Statistics{At: time.Now()},
} }
@@ -102,7 +100,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
// Index writes the list of file information to the connected peer node // Index writes the list of file information to the connected peer node
func (c *Connection) Index(idx []FileInfo) { func (c *Connection) Index(idx []FileInfo) {
c.Lock() c.Lock()
var msgType int var msgType int
if c.indexSent == nil { if c.indexSent == nil {
// This is the first time we send an index. // This is the first time we send an index.
@@ -234,26 +231,24 @@ func (c *Connection) isClosed() bool {
} }
func (c *Connection) readerLoop() { func (c *Connection) readerLoop() {
for !c.isClosed() { loop:
for {
hdr := c.mreader.readHeader() hdr := c.mreader.readHeader()
if c.mreader.err != nil { if c.mreader.err != nil {
c.Close(c.mreader.err) c.Close(c.mreader.err)
break break loop
} }
if hdr.version != 0 { if hdr.version != 0 {
c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version)) c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
break break loop
} }
c.lastReceiveLock.Lock()
c.lastReceive = time.Now()
c.lastReceiveLock.Unlock()
switch hdr.msgType { switch hdr.msgType {
case messageTypeIndex: case messageTypeIndex:
files := c.mreader.readIndex() files := c.mreader.readIndex()
if c.mreader.err != nil { if c.mreader.err != nil {
c.Close(c.mreader.err) c.Close(c.mreader.err)
break loop
} else { } else {
c.receiver.Index(c.ID, files) c.receiver.Index(c.ID, files)
} }
@@ -262,18 +257,25 @@ func (c *Connection) readerLoop() {
files := c.mreader.readIndex() files := c.mreader.readIndex()
if c.mreader.err != nil { if c.mreader.err != nil {
c.Close(c.mreader.err) c.Close(c.mreader.err)
break loop
} else { } else {
c.receiver.IndexUpdate(c.ID, files) c.receiver.IndexUpdate(c.ID, files)
} }
case messageTypeRequest: case messageTypeRequest:
c.processRequest(hdr.msgID) req := c.mreader.readRequest()
if c.mreader.err != nil {
c.Close(c.mreader.err)
break loop
}
go c.processRequest(hdr.msgID, req)
case messageTypeResponse: case messageTypeResponse:
data := c.mreader.readResponse() data := c.mreader.readResponse()
if c.mreader.err != nil { if c.mreader.err != nil {
c.Close(c.mreader.err) c.Close(c.mreader.err)
break loop
} else { } else {
c.Lock() c.Lock()
rc, ok := c.awaiting[hdr.msgID] rc, ok := c.awaiting[hdr.msgID]
@@ -293,8 +295,10 @@ func (c *Connection) readerLoop() {
c.Unlock() c.Unlock()
if err != nil { if err != nil {
c.Close(err) c.Close(err)
break loop
} else if c.mwriter.err != nil { } else if c.mwriter.err != nil {
c.Close(c.mwriter.err) c.Close(c.mwriter.err)
break loop
} }
case messageTypePong: case messageTypePong:
@@ -313,35 +317,31 @@ func (c *Connection) readerLoop() {
default: default:
c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
break loop
} }
} }
} }
func (c *Connection) processRequest(msgID int) { func (c *Connection) processRequest(msgID int, req request) {
req := c.mreader.readRequest()
if c.mreader.err != nil {
c.Close(c.mreader.err)
} else {
go func() {
data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash)
c.Lock() c.Lock()
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
c.mwriter.writeResponse(data) c.mwriter.writeResponse(data)
err := c.flush() err := c.flush()
c.Unlock() c.Unlock()
buffers.Put(data) buffers.Put(data)
if err != nil { if err != nil {
c.Close(err) c.Close(err)
} else if c.mwriter.err != nil { } else if c.mwriter.err != nil {
c.Close(c.mwriter.err) c.Close(c.mwriter.err)
} }
}()
}
} }
func (c *Connection) pingerLoop() { func (c *Connection) pingerLoop() {
var rc = make(chan bool, 1) var rc = make(chan bool, 1)
for !c.isClosed() { for {
time.Sleep(pingIdleTime / 2) time.Sleep(pingIdleTime / 2)
go func() { go func() {
rc <- c.Ping() rc <- c.Ping()