Rework XDR encoding
This commit is contained in:
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/calmh/syncthing/xdr"
|
||||
)
|
||||
|
||||
const BlockSize = 128 * 1024
|
||||
|
||||
const (
|
||||
messageTypeIndex = 1
|
||||
messageTypeRequest = 2
|
||||
@@ -32,26 +34,13 @@ var (
|
||||
ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash")
|
||||
)
|
||||
|
||||
type FileInfo struct {
|
||||
Name string
|
||||
Flags uint32
|
||||
Modified int64
|
||||
Version uint32
|
||||
Blocks []BlockInfo
|
||||
}
|
||||
|
||||
type BlockInfo struct {
|
||||
Size uint32
|
||||
Hash []byte
|
||||
}
|
||||
|
||||
type Model interface {
|
||||
// An index was received from the peer node
|
||||
Index(nodeID string, files []FileInfo)
|
||||
// An index update was received from the peer node
|
||||
IndexUpdate(nodeID string, files []FileInfo)
|
||||
// A request was made by the peer node
|
||||
Request(nodeID, repo string, name string, offset int64, size uint32, hash []byte) ([]byte, error)
|
||||
Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error)
|
||||
// The peer node closed the connection
|
||||
Close(nodeID string, err error)
|
||||
}
|
||||
@@ -62,9 +51,9 @@ type Connection struct {
|
||||
id string
|
||||
receiver Model
|
||||
reader io.Reader
|
||||
mreader marshalReader
|
||||
xr *xdr.Reader
|
||||
writer io.Writer
|
||||
mwriter marshalWriter
|
||||
xw *xdr.Writer
|
||||
closed bool
|
||||
awaiting map[int]chan asyncResult
|
||||
nextId int
|
||||
@@ -102,9 +91,9 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
id: nodeID,
|
||||
receiver: receiver,
|
||||
reader: flrd,
|
||||
mreader: marshalReader{Reader: xdr.NewReader(flrd)},
|
||||
xr: xdr.NewReader(flrd),
|
||||
writer: flwr,
|
||||
mwriter: marshalWriter{Writer: xdr.NewWriter(flwr)},
|
||||
xw: xdr.NewWriter(flwr),
|
||||
awaiting: make(map[int]chan asyncResult),
|
||||
indexSent: make(map[string]map[string][2]int64),
|
||||
}
|
||||
@@ -116,9 +105,16 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
c.myOptions = options
|
||||
go func() {
|
||||
c.Lock()
|
||||
c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions})
|
||||
c.mwriter.writeOptions(options)
|
||||
err := c.flush()
|
||||
header{0, c.nextId, messageTypeOptions}.encodeXDR(c.xw)
|
||||
var om OptionsMessage
|
||||
for k, v := range options {
|
||||
om.Options = append(om.Options, Option{k, v})
|
||||
}
|
||||
om.encodeXDR(c.xw)
|
||||
err := c.xw.Error()
|
||||
if err == nil {
|
||||
err = c.flush()
|
||||
}
|
||||
if err != nil {
|
||||
log.Println("Warning: Write error during initial handshake:", err)
|
||||
}
|
||||
@@ -159,9 +155,11 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
|
||||
idx = diff
|
||||
}
|
||||
|
||||
c.mwriter.writeHeader(header{0, c.nextId, msgType})
|
||||
c.mwriter.writeIndex(repo, idx)
|
||||
err := c.flush()
|
||||
header{0, c.nextId, msgType}.encodeXDR(c.xw)
|
||||
_, err := IndexMessage{repo, idx}.encodeXDR(c.xw)
|
||||
if err == nil {
|
||||
err = c.flush()
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
c.hasSentIndex = true
|
||||
c.Unlock()
|
||||
@@ -169,14 +167,11 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
|
||||
if err != nil {
|
||||
c.close(err)
|
||||
return
|
||||
} else if c.mwriter.Err() != nil {
|
||||
c.close(c.mwriter.Err())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Request returns the bytes for the specified block after fetching them from the connected peer.
|
||||
func (c *Connection) Request(repo string, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
|
||||
func (c *Connection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
|
||||
c.Lock()
|
||||
if c.closed {
|
||||
c.Unlock()
|
||||
@@ -184,14 +179,11 @@ func (c *Connection) Request(repo string, name string, offset int64, size uint32
|
||||
}
|
||||
rc := make(chan asyncResult)
|
||||
c.awaiting[c.nextId] = rc
|
||||
c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest})
|
||||
c.mwriter.writeRequest(request{repo, name, offset, size, hash})
|
||||
if c.mwriter.Err() != nil {
|
||||
c.Unlock()
|
||||
c.close(c.mwriter.Err())
|
||||
return nil, c.mwriter.Err()
|
||||
header{0, c.nextId, messageTypeRequest}.encodeXDR(c.xw)
|
||||
_, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
|
||||
if err == nil {
|
||||
err = c.flush()
|
||||
}
|
||||
err := c.flush()
|
||||
if err != nil {
|
||||
c.Unlock()
|
||||
c.close(err)
|
||||
@@ -215,15 +207,15 @@ func (c *Connection) ping() bool {
|
||||
}
|
||||
rc := make(chan asyncResult, 1)
|
||||
c.awaiting[c.nextId] = rc
|
||||
c.mwriter.writeHeader(header{0, c.nextId, messageTypePing})
|
||||
header{0, c.nextId, messageTypePing}.encodeXDR(c.xw)
|
||||
err := c.flush()
|
||||
if err != nil {
|
||||
c.Unlock()
|
||||
c.close(err)
|
||||
return false
|
||||
} else if c.mwriter.Err() != nil {
|
||||
} else if c.xw.Error() != nil {
|
||||
c.Unlock()
|
||||
c.close(c.mwriter.Err())
|
||||
c.close(c.xw.Error())
|
||||
return false
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
@@ -269,9 +261,10 @@ func (c *Connection) isClosed() bool {
|
||||
func (c *Connection) readerLoop() {
|
||||
loop:
|
||||
for {
|
||||
hdr := c.mreader.readHeader()
|
||||
if c.mreader.Err() != nil {
|
||||
c.close(c.mreader.Err())
|
||||
var hdr header
|
||||
hdr.decodeXDR(c.xr)
|
||||
if c.xr.Error() != nil {
|
||||
c.close(c.xr.Error())
|
||||
break loop
|
||||
}
|
||||
if hdr.version != 0 {
|
||||
@@ -281,64 +274,65 @@ loop:
|
||||
|
||||
switch hdr.msgType {
|
||||
case messageTypeIndex:
|
||||
repo, files := c.mreader.readIndex()
|
||||
_ = repo
|
||||
if c.mreader.Err() != nil {
|
||||
c.close(c.mreader.Err())
|
||||
var im IndexMessage
|
||||
im.decodeXDR(c.xr)
|
||||
if c.xr.Error() != nil {
|
||||
c.close(c.xr.Error())
|
||||
break loop
|
||||
} else {
|
||||
c.receiver.Index(c.id, files)
|
||||
c.receiver.Index(c.id, im.Files)
|
||||
}
|
||||
c.Lock()
|
||||
c.hasRecvdIndex = true
|
||||
c.Unlock()
|
||||
|
||||
case messageTypeIndexUpdate:
|
||||
repo, files := c.mreader.readIndex()
|
||||
_ = repo
|
||||
if c.mreader.Err() != nil {
|
||||
c.close(c.mreader.Err())
|
||||
var im IndexMessage
|
||||
im.decodeXDR(c.xr)
|
||||
if c.xr.Error() != nil {
|
||||
c.close(c.xr.Error())
|
||||
break loop
|
||||
} else {
|
||||
c.receiver.IndexUpdate(c.id, files)
|
||||
c.receiver.IndexUpdate(c.id, im.Files)
|
||||
}
|
||||
|
||||
case messageTypeRequest:
|
||||
req := c.mreader.readRequest()
|
||||
if c.mreader.Err() != nil {
|
||||
c.close(c.mreader.Err())
|
||||
var req RequestMessage
|
||||
req.decodeXDR(c.xr)
|
||||
if c.xr.Error() != nil {
|
||||
c.close(c.xr.Error())
|
||||
break loop
|
||||
}
|
||||
go c.processRequest(hdr.msgID, req)
|
||||
|
||||
case messageTypeResponse:
|
||||
data := c.mreader.readResponse()
|
||||
data := c.xr.ReadBytes()
|
||||
|
||||
if c.mreader.Err() != nil {
|
||||
c.close(c.mreader.Err())
|
||||
if c.xr.Error() != nil {
|
||||
c.close(c.xr.Error())
|
||||
break loop
|
||||
} else {
|
||||
c.Lock()
|
||||
rc, ok := c.awaiting[hdr.msgID]
|
||||
delete(c.awaiting, hdr.msgID)
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
if ok {
|
||||
rc <- asyncResult{data, c.mreader.Err()}
|
||||
close(rc)
|
||||
}
|
||||
c.Lock()
|
||||
rc, ok := c.awaiting[hdr.msgID]
|
||||
delete(c.awaiting, hdr.msgID)
|
||||
c.Unlock()
|
||||
|
||||
if ok {
|
||||
rc <- asyncResult{data, c.xr.Error()}
|
||||
close(rc)
|
||||
}
|
||||
|
||||
case messageTypePing:
|
||||
c.Lock()
|
||||
c.mwriter.WriteUint32(encodeHeader(header{0, hdr.msgID, messageTypePong}))
|
||||
header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw)
|
||||
err := c.flush()
|
||||
c.Unlock()
|
||||
if err != nil {
|
||||
c.close(err)
|
||||
break loop
|
||||
} else if c.mwriter.Err() != nil {
|
||||
c.close(c.mwriter.Err())
|
||||
} else if c.xw.Error() != nil {
|
||||
c.close(c.xw.Error())
|
||||
break loop
|
||||
}
|
||||
|
||||
@@ -357,8 +351,18 @@ loop:
|
||||
}
|
||||
|
||||
case messageTypeOptions:
|
||||
var om OptionsMessage
|
||||
om.decodeXDR(c.xr)
|
||||
if c.xr.Error() != nil {
|
||||
c.close(c.xr.Error())
|
||||
break loop
|
||||
}
|
||||
|
||||
c.optionsLock.Lock()
|
||||
c.peerOptions = c.mreader.readOptions()
|
||||
c.peerOptions = make(map[string]string, len(om.Options))
|
||||
for _, opt := range om.Options {
|
||||
c.peerOptions[opt.Key] = opt.Value
|
||||
}
|
||||
c.optionsLock.Unlock()
|
||||
|
||||
if mh, rh := c.myOptions["clusterHash"], c.peerOptions["clusterHash"]; len(mh) > 0 && len(rh) > 0 && mh != rh {
|
||||
@@ -373,13 +377,12 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) processRequest(msgID int, req request) {
|
||||
data, _ := c.receiver.Request(c.id, req.repo, req.name, req.offset, req.size, req.hash)
|
||||
func (c *Connection) processRequest(msgID int, req RequestMessage) {
|
||||
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
|
||||
|
||||
c.Lock()
|
||||
c.mwriter.WriteUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
|
||||
c.mwriter.writeResponse(data)
|
||||
err := c.mwriter.Err()
|
||||
header{0, msgID, messageTypeResponse}.encodeXDR(c.xw)
|
||||
_, err := c.xw.WriteBytes(data)
|
||||
if err == nil {
|
||||
err = c.flush()
|
||||
}
|
||||
@@ -428,8 +431,8 @@ func (c *Connection) Statistics() Statistics {
|
||||
|
||||
stats := Statistics{
|
||||
At: time.Now(),
|
||||
InBytesTotal: int(c.mreader.Tot()),
|
||||
OutBytesTotal: int(c.mwriter.Tot()),
|
||||
InBytesTotal: int(c.xr.Tot()),
|
||||
OutBytesTotal: int(c.xw.Tot()),
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
Reference in New Issue
Block a user