Factor out XDR en/decoding

This commit is contained in:
Jakob Borg
2014-02-15 12:08:55 +01:00
parent 21a7f3960a
commit f89fa6caed
8 changed files with 342 additions and 253 deletions

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/calmh/syncthing/buffers"
"github.com/calmh/syncthing/xdr"
)
const (
@@ -61,9 +62,9 @@ type Connection struct {
id string
receiver Model
reader io.Reader
mreader *marshalReader
mreader marshalReader
writer io.Writer
mwriter *marshalWriter
mwriter marshalWriter
closed bool
awaiting map[int]chan asyncResult
nextId int
@@ -101,9 +102,9 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
id: nodeID,
receiver: receiver,
reader: flrd,
mreader: &marshalReader{r: flrd},
mreader: marshalReader{Reader: xdr.NewReader(flrd)},
writer: flwr,
mwriter: &marshalWriter{w: flwr},
mwriter: marshalWriter{Writer: xdr.NewWriter(flwr)},
awaiting: make(map[int]chan asyncResult),
indexSent: make(map[string]map[string][2]int64),
}
@@ -168,8 +169,8 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
if err != nil {
c.close(err)
return
} else if c.mwriter.err != nil {
c.close(c.mwriter.err)
} else if c.mwriter.Err() != nil {
c.close(c.mwriter.Err())
return
}
}
@@ -185,10 +186,10 @@ func (c *Connection) Request(repo string, name string, offset int64, size uint32
c.awaiting[c.nextId] = rc
c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest})
c.mwriter.writeRequest(request{repo, name, offset, size, hash})
if c.mwriter.err != nil {
if c.mwriter.Err() != nil {
c.Unlock()
c.close(c.mwriter.err)
return nil, c.mwriter.err
c.close(c.mwriter.Err())
return nil, c.mwriter.Err()
}
err := c.flush()
if err != nil {
@@ -220,9 +221,9 @@ func (c *Connection) ping() bool {
c.Unlock()
c.close(err)
return false
} else if c.mwriter.err != nil {
} else if c.mwriter.Err() != nil {
c.Unlock()
c.close(c.mwriter.err)
c.close(c.mwriter.Err())
return false
}
c.nextId = (c.nextId + 1) & 0xfff
@@ -269,8 +270,8 @@ func (c *Connection) readerLoop() {
loop:
for {
hdr := c.mreader.readHeader()
if c.mreader.err != nil {
c.close(c.mreader.err)
if c.mreader.Err() != nil {
c.close(c.mreader.Err())
break loop
}
if hdr.version != 0 {
@@ -282,8 +283,8 @@ loop:
case messageTypeIndex:
repo, files := c.mreader.readIndex()
_ = repo
if c.mreader.err != nil {
c.close(c.mreader.err)
if c.mreader.Err() != nil {
c.close(c.mreader.Err())
break loop
} else {
c.receiver.Index(c.id, files)
@@ -295,8 +296,8 @@ loop:
case messageTypeIndexUpdate:
repo, files := c.mreader.readIndex()
_ = repo
if c.mreader.err != nil {
c.close(c.mreader.err)
if c.mreader.Err() != nil {
c.close(c.mreader.Err())
break loop
} else {
c.receiver.IndexUpdate(c.id, files)
@@ -304,8 +305,8 @@ loop:
case messageTypeRequest:
req := c.mreader.readRequest()
if c.mreader.err != nil {
c.close(c.mreader.err)
if c.mreader.Err() != nil {
c.close(c.mreader.Err())
break loop
}
go c.processRequest(hdr.msgID, req)
@@ -313,8 +314,8 @@ loop:
case messageTypeResponse:
data := c.mreader.readResponse()
if c.mreader.err != nil {
c.close(c.mreader.err)
if c.mreader.Err() != nil {
c.close(c.mreader.Err())
break loop
} else {
c.Lock()
@@ -323,21 +324,21 @@ loop:
c.Unlock()
if ok {
rc <- asyncResult{data, c.mreader.err}
rc <- asyncResult{data, c.mreader.Err()}
close(rc)
}
}
case messageTypePing:
c.Lock()
c.mwriter.writeUint32(encodeHeader(header{0, hdr.msgID, messageTypePong}))
c.mwriter.WriteUint32(encodeHeader(header{0, hdr.msgID, messageTypePong}))
err := c.flush()
c.Unlock()
if err != nil {
c.close(err)
break loop
} else if c.mwriter.err != nil {
c.close(c.mwriter.err)
} else if c.mwriter.Err() != nil {
c.close(c.mwriter.Err())
break loop
}
@@ -376,9 +377,9 @@ func (c *Connection) processRequest(msgID int, req request) {
data, _ := c.receiver.Request(c.id, req.repo, req.name, req.offset, req.size, req.hash)
c.Lock()
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
c.mwriter.WriteUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
c.mwriter.writeResponse(data)
err := c.mwriter.err
err := c.mwriter.Err()
if err == nil {
err = c.flush()
}
@@ -427,8 +428,8 @@ func (c *Connection) Statistics() Statistics {
stats := Statistics{
At: time.Now(),
InBytesTotal: int(c.mreader.getTot()),
OutBytesTotal: int(c.mwriter.getTot()),
InBytesTotal: int(c.mreader.Tot()),
OutBytesTotal: int(c.mwriter.Tot()),
}
return stats