diff --git a/common_test.go b/common_test.go index f46b6a8d..706a3b87 100644 --- a/common_test.go +++ b/common_test.go @@ -31,7 +31,7 @@ func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo, fl func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { } -func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option, buf []byte) error { t.folder = folder t.name = name t.offset = offset @@ -39,7 +39,8 @@ func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64 t.hash = hash t.flags = flags t.options = options - return t.data, nil + copy(buf, t.data) + return nil } func (t *TestModel) Close(deviceID DeviceID, err error) { diff --git a/nativemodel_darwin.go b/nativemodel_darwin.go index 502a71f2..eb755a6e 100644 --- a/nativemodel_darwin.go +++ b/nativemodel_darwin.go @@ -26,9 +26,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI m.next.IndexUpdate(deviceID, folder, files, flags, options) } -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { name = norm.NFD.String(name) - return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { diff --git a/nativemodel_unix.go b/nativemodel_unix.go index 21585e30..0611865e 100644 --- a/nativemodel_unix.go +++ b/nativemodel_unix.go @@ -18,8 +18,8 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI m.next.IndexUpdate(deviceID, folder, files, flags, options) } -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { - return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { diff --git a/nativemodel_windows.go b/nativemodel_windows.go index f1a24898..36a1d274 100644 --- a/nativemodel_windows.go +++ b/nativemodel_windows.go @@ -34,9 +34,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI m.next.IndexUpdate(deviceID, folder, files, flags, options) } -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { name = filepath.FromSlash(name) - return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { diff --git a/protocol.go b/protocol.go index d0e23055..8b41c013 100644 --- a/protocol.go +++ b/protocol.go @@ -81,7 +81,7 @@ type Model interface { // An index update was received from the peer device IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) // A request was made by the peer device - Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) + Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error // A cluster configuration message was received ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) // The peer device closed the connection @@ -112,11 +112,11 @@ type rawConnection struct { idxMut sync.Mutex // ensures serialization of Index calls - nextID chan int - outbox chan hdrMsg - closed chan struct{} - once sync.Once - + nextID chan int + outbox chan hdrMsg + closed chan struct{} + once sync.Once + pool sync.Pool compression Compression rdbuf0 []byte // used & reused by readMessage @@ -129,8 +129,9 @@ type asyncResult struct { } type hdrMsg struct { - hdr header - msg encodable + hdr header + msg encodable + done chan struct{} } type encodable interface { @@ -151,14 +152,19 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv cw := &countingWriter{Writer: writer} c := rawConnection{ - id: deviceID, - name: name, - receiver: nativeModel{receiver}, - cr: cr, - cw: cw, - outbox: make(chan hdrMsg), - nextID: make(chan int), - closed: make(chan struct{}), + id: deviceID, + name: name, + receiver: nativeModel{receiver}, + cr: cr, + cw: cw, + outbox: make(chan hdrMsg), + nextID: make(chan int), + closed: make(chan struct{}), + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, BlockSize) + }, + }, compression: compress, } @@ -195,7 +201,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, optio Files: idx, Flags: flags, Options: options, - }) + }, nil) c.idxMut.Unlock() return nil } @@ -213,7 +219,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, Files: idx, Flags: flags, Options: options, - }) + }, nil) c.idxMut.Unlock() return nil } @@ -243,7 +249,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i Hash: hash, Flags: flags, Options: options, - }) + }, nil) if !ok { return nil, ErrClosed } @@ -257,7 +263,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i // ClusterConfig send the cluster configuration message to the peer and returns any error func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { - c.send(-1, messageTypeClusterConfig, config) + c.send(-1, messageTypeClusterConfig, config, nil) } func (c *rawConnection) ping() bool { @@ -273,7 +279,7 @@ func (c *rawConnection) ping() bool { c.awaiting[id] = rc c.awaitingMut.Unlock() - ok := c.send(id, messageTypePing, nil) + ok := c.send(id, messageTypePing, nil, nil) if !ok { return false } @@ -342,7 +348,7 @@ func (c *rawConnection) readerLoop() (err error) { if state != stateReady { return fmt.Errorf("protocol error: ping message in state %d", state) } - c.send(hdr.msgID, messageTypePong, pongMessage{}) + c.send(hdr.msgID, messageTypePong, pongMessage{}, nil) case pongMessage: if state != stateReady { @@ -519,12 +525,36 @@ func filterIndexMessageFiles(fs []FileInfo) []FileInfo { } func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { - data, err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size), req.Hash, req.Flags, req.Options) + size := int(req.Size) + usePool := size <= BlockSize - c.send(msgID, messageTypeResponse, ResponseMessage{ - Data: data, - Code: errorToCode(err), - }) + var buf []byte + var done chan struct{} + + if usePool { + buf = c.pool.Get().([]byte)[:size] + done = make(chan struct{}) + } else { + buf = make([]byte, size) + } + + err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf) + if err != nil { + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: nil, + Code: errorToCode(err), + }, done) + } else { + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: buf, + Code: errorToCode(err), + }, done) + } + + if usePool { + <-done + c.pool.Put(buf) + } } func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { @@ -547,7 +577,7 @@ func (c *rawConnection) handlePong(msgID int) { c.awaitingMut.Unlock() } -func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { +func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool { if msgID < 0 { select { case id := <-c.nextID: @@ -564,7 +594,7 @@ func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { } select { - case c.outbox <- hdrMsg{hdr, msg}: + case c.outbox <- hdrMsg{hdr, msg, done}: return true case <-c.closed: return false @@ -583,6 +613,9 @@ func (c *rawConnection) writerLoop() { if hm.msg != nil { // Uncompressed message in uncBuf uncBuf, err = hm.msg.AppendXDR(uncBuf[:0]) + if hm.done != nil { + close(hm.done) + } if err != nil { c.close(err) return