Add LZ4 compression

This commit is contained in:
Jakob Borg
2014-07-25 15:16:23 +02:00
parent e8c8cc550b
commit fbf8f3dc68
18 changed files with 14005 additions and 26 deletions

View File

@@ -25,11 +25,13 @@ Transport and Authentication
----------------------------
BEP is deployed as the highest level in a protocol stack, with the lower
level protocols providing encryption and authentication.
level protocols providing compression, encryption and authentication.
+-----------------------------|
| Block Exchange Protocol |
|-----------------------------|
| Compression (LZ4) |
|-----------------------------|
| Encryption & Auth (TLS 1.2) |
|-----------------------------|
| TCP |
@@ -60,6 +62,37 @@ requests are received.
The underlying transport protocol MUST be TCP.
Compression
-----------
All data is sent within compressed blocks. Blocks are compressed using
the LZ4 format and algorithm described in
https://code.google.com/p/lz4/. Each compressed block is preceded by a
header consisting of three 32 bit words, in network order (big endian):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Magic (0x0x5e63b278) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Data Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Uncompressed Block Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Compressed Data \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
The Data Length indicates the length of data following the Data Length
field until the next header, i.e. the length of the Compressed Data
section plus four bytes for the Uncompressed Block Length field. The
Uncompressed Block Length indicates the amount of data that will result
when decompressing the Compressed Data section.
A single BEP message SHOULD be sent as a single compressed block. A
single compressed block MAY NOT contain more than one BEP message.
Messages
--------

119
protocol/lz4stream.go Normal file
View File

@@ -0,0 +1,119 @@
package protocol
import (
"bytes"
"encoding/binary"
"errors"
"io"
"sync"
lz4 "github.com/bkaradzic/go-lz4"
)
const lz4Magic = 0x5e63b278
type lz4Writer struct {
wr io.Writer
mut sync.Mutex
buf []byte
}
func newLZ4Writer(w io.Writer) *lz4Writer {
return &lz4Writer{wr: w}
}
func (w *lz4Writer) Write(bs []byte) (int, error) {
w.mut.Lock()
defer w.mut.Unlock()
var err error
w.buf, err = lz4.Encode(w.buf[:cap(w.buf)], bs)
if err != nil {
return 0, err
}
var hdr [8]byte
binary.BigEndian.PutUint32(hdr[0:], lz4Magic)
binary.BigEndian.PutUint32(hdr[4:], uint32(len(w.buf)))
_, err = w.wr.Write(hdr[:])
if err != nil {
return 0, err
}
_, err = w.wr.Write(w.buf)
if err != nil {
return 0, err
}
if debug {
l.Debugf("lz4 write; %d / %d bytes", len(bs), 8+len(w.buf))
}
return len(bs), nil
}
type lz4Reader struct {
rd io.Reader
mut sync.Mutex
buf []byte
ebuf []byte
obuf *bytes.Buffer
ibytes uint64
obytes uint64
}
func newLZ4Reader(r io.Reader) *lz4Reader {
return &lz4Reader{rd: r}
}
func (r *lz4Reader) Read(bs []byte) (int, error) {
r.mut.Lock()
defer r.mut.Unlock()
if r.obuf == nil {
r.obuf = bytes.NewBuffer(nil)
}
if r.obuf.Len() == 0 {
if err := r.moreBits(); err != nil {
return 0, err
}
}
n, err := r.obuf.Read(bs)
if debug {
l.Debugf("lz4 read; %d bytes", n)
}
return n, err
}
func (r *lz4Reader) moreBits() error {
var hdr [8]byte
_, err := io.ReadFull(r.rd, hdr[:])
if binary.BigEndian.Uint32(hdr[0:]) != lz4Magic {
return errors.New("bad magic")
}
ln := int(binary.BigEndian.Uint32(hdr[4:]))
if len(r.buf) < ln {
r.buf = make([]byte, int(ln))
} else {
r.buf = r.buf[:ln]
}
_, err = io.ReadFull(r.rd, r.buf)
if err != nil {
return err
}
r.ebuf, err = lz4.Decode(r.ebuf[:cap(r.ebuf)], r.buf)
if err != nil {
return err
}
if debug {
l.Debugf("lz4 moreBits: %d / %d bytes", ln+8, len(r.ebuf))
}
_, err = r.obuf.Write(r.ebuf)
return err
}

View File

@@ -0,0 +1,60 @@
package protocol
import (
"bytes"
"crypto/rand"
"io"
"testing"
)
var toWrite = [][]byte{
[]byte("this is a short byte string that should pass through somewhat compressed this is a short byte string that should pass through somewhat compressed this is a short byte string that should pass through somewhat compressed this is a short byte string that should pass through somewhat compressed this is a short byte string that should pass through somewhat compressed this is a short byte string that should pass through somewhat compressed"),
[]byte("this is another short byte string that should pass through uncompressed"),
[]byte{0, 1, 2, 3, 4, 5},
}
func TestLZ4Stream(t *testing.T) {
tb := make([]byte, 128*1024)
rand.Reader.Read(tb)
toWrite = append(toWrite, tb)
tb = make([]byte, 512*1024)
rand.Reader.Read(tb)
toWrite = append(toWrite, tb)
toWrite = append(toWrite, toWrite[0])
toWrite = append(toWrite, toWrite[1])
rd, wr := io.Pipe()
lz4r := newLZ4Reader(rd)
lz4w := newLZ4Writer(wr)
go func() {
for i := 0; i < 5; i++ {
for _, bs := range toWrite {
n, err := lz4w.Write(bs)
if err != nil {
t.Error(err)
}
if n != len(bs) {
t.Errorf("weird write length; %d != %d", n, len(bs))
}
}
}
}()
buf := make([]byte, 512*1024)
for i := 0; i < 5; i++ {
for _, bs := range toWrite {
n, err := lz4r.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != len(bs) {
t.Errorf("Unexpected len %d != %d", n, len(bs))
}
if bytes.Compare(bs, buf[:n]) != 0 {
t.Error("Unexpected data")
}
}
}
}

View File

@@ -109,11 +109,17 @@ const (
)
func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection {
// Byte counters are at the lowest level, counting compressed bytes
cr := &countingReader{Reader: reader}
cw := &countingWriter{Writer: writer}
rb := bufio.NewReader(cr)
wb := bufio.NewWriterSize(cw, 65536)
// Compression is just above counting
zr := newLZ4Reader(cr)
zw := newLZ4Writer(cw)
// We buffer writes on top of compression.
// The LZ4 reader is already internally buffered
wb := bufio.NewWriterSize(zw, 65536)
c := rawConnection{
id: nodeID,
@@ -121,7 +127,7 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M
receiver: nativeModel{receiver},
state: stateInitial,
cr: cr,
xr: xdr.NewReader(rb),
xr: xdr.NewReader(zr),
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
@@ -428,10 +434,6 @@ func (c *rawConnection) writerLoop() {
}
}
type flusher interface {
Flush() error
}
func (c *rawConnection) flush() error {
if err := c.xw.Error(); err != nil {
return err

View File

@@ -73,8 +73,8 @@ func TestPing(t *testing.T) {
func TestPingErr(t *testing.T) {
e := errors.New("something broke")
for i := 0; i < 12; i++ {
for j := 0; j < 12; j++ {
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
m0 := newTestModel()
m1 := newTestModel()
@@ -87,9 +87,9 @@ func TestPingErr(t *testing.T) {
NewConnection(c1ID, br, eaw, m1, "name")
res := c0.ping()
if (i < 4 || j < 4) && res {
if (i < 8 || j < 8) && res {
t.Errorf("Unexpected ping success; i=%d, j=%d", i, j)
} else if (i >= 8 && j >= 8) && !res {
} else if (i >= 12 && j >= 12) && !res {
t.Errorf("Unexpected ping fail; i=%d, j=%d", i, j)
}
}