lib/model, lib/protocol: Handle request concurrency in model (#5216)

This commit is contained in:
Simon Frei
2018-11-13 08:53:55 +01:00
committed by Jakob Borg
parent 9212303906
commit 4f27bdfc27
13 changed files with 358 additions and 226 deletions

View File

@@ -171,12 +171,13 @@ func (m *fakeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
func (m *fakeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
}
func (m *fakeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, weakHAsh uint32, fromTemporary bool, buf []byte) error {
func (m *fakeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
// We write the offset to the end of the buffer, so the receiver
// can verify that it did in fact get some data back over the
// connection.
buf := make([]byte, size)
binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(offset))
return nil
return &fakeRequestResponse{buf}, nil
}
func (m *fakeModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) {

View File

@@ -4,32 +4,59 @@ package protocol
import "sync"
// Global pool to get buffers from. Requires Blocksizes to be initialised,
// therefore it is initialized in the same init() as BlockSizes
var BufferPool bufferPool
type bufferPool struct {
minSize int
pool sync.Pool
pools []sync.Pool
}
// get returns a new buffer of the requested size
func (p *bufferPool) get(size int) []byte {
intf := p.pool.Get()
if intf == nil {
// Pool is empty, must allocate.
return p.new(size)
}
bs := *intf.(*[]byte)
if cap(bs) < size {
// Buffer was too small, leave it for someone else and allocate.
p.pool.Put(intf)
return p.new(size)
}
return bs[:size]
func newBufferPool() bufferPool {
return bufferPool{make([]sync.Pool, len(BlockSizes))}
}
// upgrade grows the buffer to the requested size, while attempting to reuse
func (p *bufferPool) Get(size int) []byte {
// Too big, isn't pooled
if size > MaxBlockSize {
return make([]byte, size)
}
var i int
for i = range BlockSizes {
if size <= BlockSizes[i] {
break
}
}
var bs []byte
// Try the fitting and all bigger pools
for j := i; j < len(BlockSizes); j++ {
if intf := p.pools[j].Get(); intf != nil {
bs = *intf.(*[]byte)
return bs[:size]
}
}
// All pools are empty, must allocate.
return make([]byte, BlockSizes[i])[:size]
}
// Put makes the given byte slice availabe again in the global pool
func (p *bufferPool) Put(bs []byte) {
c := cap(bs)
// Don't buffer huge byte slices
if c > 2*MaxBlockSize {
return
}
for i := range BlockSizes {
if c >= BlockSizes[i] {
p.pools[i].Put(&bs)
return
}
}
}
// Upgrade grows the buffer to the requested size, while attempting to reuse
// it if possible.
func (p *bufferPool) upgrade(bs []byte, size int) []byte {
func (p *bufferPool) Upgrade(bs []byte, size int) []byte {
if cap(bs) >= size {
// Reslicing is enough, lets go!
return bs[:size]
@@ -37,23 +64,6 @@ func (p *bufferPool) upgrade(bs []byte, size int) []byte {
// It was too small. But it pack into the pool and try to get another
// buffer.
p.put(bs)
return p.get(size)
}
// put returns the buffer to the pool
func (p *bufferPool) put(bs []byte) {
p.pool.Put(&bs)
}
// new creates a new buffer of the requested size, taking the minimum
// allocation count into account. For internal use only.
func (p *bufferPool) new(size int) []byte {
allocSize := size
if allocSize < p.minSize {
// Avoid allocating tiny buffers that we won't be able to reuse for
// anything useful.
allocSize = p.minSize
}
return make([]byte, allocSize)[:size]
p.Put(bs)
return p.Get(size)
}

View File

@@ -9,7 +9,7 @@ type TestModel struct {
folder string
name string
offset int64
size int
size int32
hash []byte
weakHash uint32
fromTemporary bool
@@ -29,16 +29,17 @@ func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
}
func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, hash []byte, weakHash uint32, fromTemporary bool, buf []byte) error {
func (t *TestModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
t.folder = folder
t.name = name
t.offset = offset
t.size = len(buf)
t.size = size
t.hash = hash
t.weakHash = weakHash
t.fromTemporary = fromTemporary
buf := make([]byte, len(t.data))
copy(buf, t.data)
return nil
return &fakeRequestResponse{buf}, nil
}
func (t *TestModel) Closed(conn Connection, err error) {
@@ -60,3 +61,15 @@ func (t *TestModel) closedError() error {
return nil // Timeout
}
}
type fakeRequestResponse struct {
data []byte
}
func (r *fakeRequestResponse) Data() []byte {
return r.data
}
func (r *fakeRequestResponse) Close() {}
func (r *fakeRequestResponse) Wait() {}

View File

@@ -26,7 +26,7 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
m.Model.IndexUpdate(deviceID, folder, files)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, weakHash uint32, fromTemporary bool, buf []byte) error {
func (m nativeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
name = norm.NFD.String(name)
return m.Model.Request(deviceID, folder, name, offset, hash, weakHash, fromTemporary, buf)
return m.Model.Request(deviceID, folder, name, size, offset, hash, weakHash, fromTemporary)
}

View File

@@ -25,14 +25,14 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
m.Model.IndexUpdate(deviceID, folder, files)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, weakHash uint32, fromTemporary bool, buf []byte) error {
func (m nativeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
if strings.Contains(name, `\`) {
l.Warnf("Dropping request for %s, contains invalid path separator", name)
return ErrNoSuchFile
return nil, ErrNoSuchFile
}
name = filepath.FromSlash(name)
return m.Model.Request(deviceID, folder, name, offset, hash, weakHash, fromTemporary, buf)
return m.Model.Request(deviceID, folder, name, size, offset, hash, weakHash, fromTemporary)
}
func fixupFiles(files []FileInfo) []FileInfo {

View File

@@ -2,8 +2,10 @@
package protocol
import "testing"
import "reflect"
import (
"reflect"
"testing"
)
func TestFixupFiles(t *testing.T) {
files := []FileInfo{

View File

@@ -48,6 +48,7 @@ func init() {
BlockSizes = append(BlockSizes, blockSize)
sha256OfEmptyBlock[blockSize] = sha256.Sum256(make([]byte, blockSize))
}
BufferPool = newBufferPool()
}
// BlockSize returns the block size to use for the given file size
@@ -125,7 +126,7 @@ type Model interface {
// An index update was received from the peer device
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo)
// A request was made by the peer device
Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, weakHash uint32, fromTemporary bool, buf []byte) error
Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error)
// A cluster configuration message was received
ClusterConfig(deviceID DeviceID, config ClusterConfig)
// The peer device closed the connection
@@ -134,6 +135,12 @@ type Model interface {
DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate)
}
type RequestResponse interface {
Data() []byte
Close() // Must always be called once the byte slice is no longer in use
Wait() // Blocks until Close is called
}
type Connection interface {
Start()
ID() DeviceID
@@ -166,7 +173,6 @@ type rawConnection struct {
outbox chan asyncMessage
closed chan struct{}
once sync.Once
pool bufferPool
compression Compression
}
@@ -184,7 +190,7 @@ type message interface {
type asyncMessage struct {
msg message
done chan struct{} // done closes when we're done marshalling the message and its contents can be reused
done chan struct{} // done closes when we're done sending the message
}
const (
@@ -196,12 +202,6 @@ const (
ReceiveTimeout = 300 * time.Second
)
// A buffer pool for global use. We don't allocate smaller buffers than 64k,
// in the hope of being able to reuse them later.
var buffers = bufferPool{
minSize: 64 << 10,
}
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
cr := &countingReader{Reader: reader}
cw := &countingWriter{Writer: writer}
@@ -215,7 +215,6 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
awaiting: make(map[int32]chan asyncResult),
outbox: make(chan asyncMessage),
closed: make(chan struct{}),
pool: bufferPool{minSize: MinBlockSize},
compression: compress,
}
@@ -338,6 +337,7 @@ func (c *rawConnection) readerLoop() (err error) {
c.close(err)
}()
fourByteBuf := make([]byte, 4)
state := stateInitial
for {
select {
@@ -346,7 +346,7 @@ func (c *rawConnection) readerLoop() (err error) {
default:
}
msg, err := c.readMessage()
msg, err := c.readMessage(fourByteBuf)
if err == errUnknownMessage {
// Unknown message types are skipped, for future extensibility.
continue
@@ -394,7 +394,6 @@ func (c *rawConnection) readerLoop() (err error) {
if err := checkFilename(msg.Name); err != nil {
return fmt.Errorf("protocol error: request: %q: %v", msg.Name, err)
}
// Requests are handled asynchronously
go c.handleRequest(*msg)
case *Response:
@@ -429,30 +428,29 @@ func (c *rawConnection) readerLoop() (err error) {
}
}
func (c *rawConnection) readMessage() (message, error) {
hdr, err := c.readHeader()
func (c *rawConnection) readMessage(fourByteBuf []byte) (message, error) {
hdr, err := c.readHeader(fourByteBuf)
if err != nil {
return nil, err
}
return c.readMessageAfterHeader(hdr)
return c.readMessageAfterHeader(hdr, fourByteBuf)
}
func (c *rawConnection) readMessageAfterHeader(hdr Header) (message, error) {
func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) (message, error) {
// First comes a 4 byte message length
buf := buffers.get(4)
if _, err := io.ReadFull(c.cr, buf); err != nil {
if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil {
return nil, fmt.Errorf("reading message length: %v", err)
}
msgLen := int32(binary.BigEndian.Uint32(buf))
msgLen := int32(binary.BigEndian.Uint32(fourByteBuf))
if msgLen < 0 {
return nil, fmt.Errorf("negative message length %d", msgLen)
}
// Then comes the message
buf = buffers.upgrade(buf, int(msgLen))
buf := BufferPool.Get(int(msgLen))
if _, err := io.ReadFull(c.cr, buf); err != nil {
return nil, fmt.Errorf("reading message: %v", err)
}
@@ -465,7 +463,7 @@ func (c *rawConnection) readMessageAfterHeader(hdr Header) (message, error) {
case MessageCompressionLZ4:
decomp, err := c.lz4Decompress(buf)
buffers.put(buf)
BufferPool.Put(buf)
if err != nil {
return nil, fmt.Errorf("decompressing message: %v", err)
}
@@ -484,26 +482,25 @@ func (c *rawConnection) readMessageAfterHeader(hdr Header) (message, error) {
if err := msg.Unmarshal(buf); err != nil {
return nil, fmt.Errorf("unmarshalling message: %v", err)
}
buffers.put(buf)
BufferPool.Put(buf)
return msg, nil
}
func (c *rawConnection) readHeader() (Header, error) {
func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) {
// First comes a 2 byte header length
buf := buffers.get(2)
if _, err := io.ReadFull(c.cr, buf); err != nil {
if _, err := io.ReadFull(c.cr, fourByteBuf[:2]); err != nil {
return Header{}, fmt.Errorf("reading length: %v", err)
}
hdrLen := int16(binary.BigEndian.Uint16(buf))
hdrLen := int16(binary.BigEndian.Uint16(fourByteBuf))
if hdrLen < 0 {
return Header{}, fmt.Errorf("negative header length %d", hdrLen)
}
// Then comes the header
buf = buffers.upgrade(buf, int(hdrLen))
buf := BufferPool.Get(int(hdrLen))
if _, err := io.ReadFull(c.cr, buf); err != nil {
return Header{}, fmt.Errorf("reading header: %v", err)
}
@@ -513,7 +510,7 @@ func (c *rawConnection) readHeader() (Header, error) {
return Header{}, fmt.Errorf("unmarshalling header: %v", err)
}
buffers.put(buf)
BufferPool.Put(buf)
return hdr, nil
}
@@ -590,38 +587,22 @@ func checkFilename(name string) error {
}
func (c *rawConnection) handleRequest(req Request) {
size := int(req.Size)
usePool := size <= MaxBlockSize
var buf []byte
var done chan struct{}
if usePool {
buf = c.pool.get(size)
done = make(chan struct{})
} else {
buf = make([]byte, size)
}
err := c.receiver.Request(c.id, req.Folder, req.Name, req.Offset, req.Hash, req.WeakHash, req.FromTemporary, buf)
res, err := c.receiver.Request(c.id, req.Folder, req.Name, req.Size, req.Offset, req.Hash, req.WeakHash, req.FromTemporary)
if err != nil {
c.send(&Response{
ID: req.ID,
Data: nil,
Code: errorToCode(err),
}, done)
} else {
c.send(&Response{
ID: req.ID,
Data: buf,
Code: errorToCode(err),
}, done)
}
if usePool {
<-done
c.pool.put(buf)
}, nil)
return
}
done := make(chan struct{})
c.send(&Response{
ID: req.ID,
Data: res.Data(),
Code: errorToCode(nil),
}, done)
<-done
res.Close()
}
func (c *rawConnection) handleResponse(resp Response) {
@@ -639,6 +620,9 @@ func (c *rawConnection) send(msg message, done chan struct{}) bool {
case c.outbox <- asyncMessage{msg, done}:
return true
case <-c.closed:
if done != nil {
close(done)
}
return false
}
}
@@ -647,7 +631,11 @@ func (c *rawConnection) writerLoop() {
for {
select {
case hm := <-c.outbox:
if err := c.writeMessage(hm); err != nil {
err := c.writeMessage(hm)
if hm.done != nil {
close(hm.done)
}
if err != nil {
c.close(err)
return
}
@@ -667,13 +655,10 @@ func (c *rawConnection) writeMessage(hm asyncMessage) error {
func (c *rawConnection) writeCompressedMessage(hm asyncMessage) error {
size := hm.msg.ProtoSize()
buf := buffers.get(size)
buf := BufferPool.Get(size)
if _, err := hm.msg.MarshalTo(buf); err != nil {
return fmt.Errorf("marshalling message: %v", err)
}
if hm.done != nil {
close(hm.done)
}
compressed, err := c.lz4Compress(buf)
if err != nil {
@@ -690,7 +675,7 @@ func (c *rawConnection) writeCompressedMessage(hm asyncMessage) error {
}
totSize := 2 + hdrSize + 4 + len(compressed)
buf = buffers.upgrade(buf, totSize)
buf = BufferPool.Upgrade(buf, totSize)
// Header length
binary.BigEndian.PutUint16(buf, uint16(hdrSize))
@@ -702,10 +687,10 @@ func (c *rawConnection) writeCompressedMessage(hm asyncMessage) error {
binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(len(compressed)))
// Message
copy(buf[2+hdrSize+4:], compressed)
buffers.put(compressed)
BufferPool.Put(compressed)
n, err := c.cw.Write(buf)
buffers.put(buf)
BufferPool.Put(buf)
l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, len(compressed), size, err)
if err != nil {
@@ -726,7 +711,7 @@ func (c *rawConnection) writeUncompressedMessage(hm asyncMessage) error {
}
totSize := 2 + hdrSize + 4 + size
buf := buffers.get(totSize)
buf := BufferPool.Get(totSize)
// Header length
binary.BigEndian.PutUint16(buf, uint16(hdrSize))
@@ -740,12 +725,9 @@ func (c *rawConnection) writeUncompressedMessage(hm asyncMessage) error {
if _, err := hm.msg.MarshalTo(buf[2+hdrSize+4:]); err != nil {
return fmt.Errorf("marshalling message: %v", err)
}
if hm.done != nil {
close(hm.done)
}
n, err := c.cw.Write(buf[:totSize])
buffers.put(buf)
BufferPool.Put(buf)
l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err)
if err != nil {
@@ -904,7 +886,7 @@ func (c *rawConnection) Statistics() Statistics {
func (c *rawConnection) lz4Compress(src []byte) ([]byte, error) {
var err error
buf := buffers.get(len(src))
buf := BufferPool.Get(len(src))
buf, err = lz4.Encode(buf, src)
if err != nil {
return nil, err
@@ -918,7 +900,7 @@ func (c *rawConnection) lz4Decompress(src []byte) ([]byte, error) {
size := binary.BigEndian.Uint32(src)
binary.LittleEndian.PutUint32(src, size)
var err error
buf := buffers.get(int(size))
buf := BufferPool.Get(int(size))
buf, err = lz4.Decode(buf, src)
if err != nil {
return nil, err