lib/protocol, lib/discover, lib/db: Use protocol buffer serialization (fixes #3080)

This changes the BEP protocol to use protocol buffer serialization
instead of XDR, and therefore also the database format. The local
discovery protocol is also updated to be protocol buffer format.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3276
LGTM: AudriusButkevicius
This commit is contained in:
Jakob Borg
2016-07-04 10:40:29 +00:00
committed by Audrius Butkevicius
parent 21f5b16e47
commit fa0101bd60
269 changed files with 477296 additions and 4175 deletions

View File

@@ -3,21 +3,17 @@
package protocol
import (
"encoding/binary"
"encoding/hex"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"reflect"
"strings"
"testing"
"testing/quick"
"time"
"github.com/calmh/xdr"
"github.com/syncthing/syncthing/lib/rand"
)
var (
@@ -26,64 +22,6 @@ var (
quickCfg = &quick.Config{}
)
func TestHeaderEncodeDecode(t *testing.T) {
f := func(ver, id, typ int) bool {
ver = int(uint(ver) % 16)
id = int(uint(id) % 4096)
typ = int(uint(typ) % 256)
h0 := header{version: ver, msgID: id, msgType: typ}
h1 := decodeHeader(encodeHeader(h0))
return h0 == h1
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
func TestHeaderMarshalUnmarshal(t *testing.T) {
f := func(ver, id, typ int) bool {
ver = int(uint(ver) % 16)
id = int(uint(id) % 4096)
typ = int(uint(typ) % 256)
buf := make([]byte, 4)
h0 := header{version: ver, msgID: id, msgType: typ}
h0.MarshalXDRInto(&xdr.Marshaller{Data: buf})
var h1 header
h1.UnmarshalXDRFrom(&xdr.Unmarshaller{Data: buf})
return h0 == h1
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
func TestHeaderLayout(t *testing.T) {
var e, a uint32
// Version are the first four bits
e = 0xf0000000
a = encodeHeader(header{version: 0xf})
if a != e {
t.Errorf("Header layout incorrect; %08x != %08x", a, e)
}
// Message ID are the following 12 bits
e = 0x0fff0000
a = encodeHeader(header{msgID: 0xfff})
if a != e {
t.Errorf("Header layout incorrect; %08x != %08x", a, e)
}
// Type are the last 8 bits before reserved
e = 0x0000ff00
a = encodeHeader(header{msgType: 0xff})
if a != e {
t.Errorf("Header layout incorrect; %08x != %08x", a, e)
}
}
func TestPing(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
@@ -92,8 +30,8 @@ func TestPing(t *testing.T) {
c0.Start()
c1 := NewConnection(c1ID, br, aw, newTestModel(), "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{})
c0.ClusterConfig(ClusterConfig{})
c1.ClusterConfig(ClusterConfig{})
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
@@ -103,56 +41,6 @@ func TestPing(t *testing.T) {
}
}
func TestVersionErr(t *testing.T) {
m0 := newTestModel()
m1 := newTestModel()
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{})
timeoutWriteHeader(c0.cw, header{
version: 2, // higher than supported
msgID: 0,
msgType: messageTypeIndex,
})
if err := m1.closedError(); err == nil || !strings.Contains(err.Error(), "unknown protocol version") {
t.Error("Connection should close due to unknown version, not", err)
}
}
func TestTypeErr(t *testing.T) {
m0 := newTestModel()
m1 := newTestModel()
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{})
timeoutWriteHeader(c0.cw, header{
version: 0,
msgID: 0,
msgType: 42, // unknown type
})
if err := m1.closedError(); err == nil || !strings.Contains(err.Error(), "unknown message type") {
t.Error("Connection should close due to unknown message type, not", err)
}
}
func TestClose(t *testing.T) {
m0 := newTestModel()
m1 := newTestModel()
@@ -164,8 +52,8 @@ func TestClose(t *testing.T) {
c0.Start()
c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{})
c0.ClusterConfig(ClusterConfig{})
c1.ClusterConfig(ClusterConfig{})
c0.close(errors.New("manual close"))
@@ -180,8 +68,8 @@ func TestClose(t *testing.T) {
t.Error("Ping should not return true")
}
c0.Index("default", nil, 0, nil)
c0.Index("default", nil, 0, nil)
c0.Index("default", nil)
c0.Index("default", nil)
if _, err := c0.Request("default", "foo", 0, 0, nil, false); err == nil {
t.Error("Request should return an error")
@@ -193,15 +81,11 @@ func TestMarshalIndexMessage(t *testing.T) {
quickCfg.MaxCount = 10
}
f := func(m1 IndexMessage) bool {
if len(m1.Options) == 0 {
m1.Options = nil
}
f := func(m1 Index) bool {
if len(m1.Files) == 0 {
m1.Files = nil
}
for i, f := range m1.Files {
m1.Files[i].CachedSize = 0
if len(f.Blocks) == 0 {
m1.Files[i].Blocks = nil
} else {
@@ -212,9 +96,12 @@ func TestMarshalIndexMessage(t *testing.T) {
}
}
}
if len(f.Version.Counters) == 0 {
m1.Files[i].Version.Counters = nil
}
}
return testMarshal(t, "index", &m1, &IndexMessage{})
return testMarshal(t, "index", &m1, &Index{})
}
if err := quick.Check(f, quickCfg); err != nil {
@@ -227,14 +114,11 @@ func TestMarshalRequestMessage(t *testing.T) {
quickCfg.MaxCount = 10
}
f := func(m1 RequestMessage) bool {
if len(m1.Options) == 0 {
m1.Options = nil
}
f := func(m1 Request) bool {
if len(m1.Hash) == 0 {
m1.Hash = nil
}
return testMarshal(t, "request", &m1, &RequestMessage{})
return testMarshal(t, "request", &m1, &Request{})
}
if err := quick.Check(f, quickCfg); err != nil {
@@ -247,11 +131,11 @@ func TestMarshalResponseMessage(t *testing.T) {
quickCfg.MaxCount = 10
}
f := func(m1 ResponseMessage) bool {
f := func(m1 Response) bool {
if len(m1.Data) == 0 {
m1.Data = nil
}
return testMarshal(t, "response", &m1, &ResponseMessage{})
return testMarshal(t, "response", &m1, &Response{})
}
if err := quick.Check(f, quickCfg); err != nil {
@@ -264,10 +148,7 @@ func TestMarshalClusterConfigMessage(t *testing.T) {
quickCfg.MaxCount = 10
}
f := func(m1 ClusterConfigMessage) bool {
if len(m1.Options) == 0 {
m1.Options = nil
}
f := func(m1 ClusterConfig) bool {
if len(m1.Folders) == 0 {
m1.Folders = nil
}
@@ -275,11 +156,8 @@ func TestMarshalClusterConfigMessage(t *testing.T) {
if len(m1.Folders[i].Devices) == 0 {
m1.Folders[i].Devices = nil
}
if len(m1.Folders[i].Options) == 0 {
m1.Folders[i].Options = nil
}
}
return testMarshal(t, "clusterconfig", &m1, &ClusterConfigMessage{})
return testMarshal(t, "clusterconfig", &m1, &ClusterConfig{})
}
if err := quick.Check(f, quickCfg); err != nil {
@@ -292,8 +170,8 @@ func TestMarshalCloseMessage(t *testing.T) {
quickCfg.MaxCount = 10
}
f := func(m1 CloseMessage) bool {
return testMarshal(t, "close", &m1, &CloseMessage{})
f := func(m1 Close) bool {
return testMarshal(t, "close", &m1, &Close{})
}
if err := quick.Check(f, quickCfg); err != nil {
@@ -301,108 +179,97 @@ func TestMarshalCloseMessage(t *testing.T) {
}
}
type message interface {
MarshalXDR() ([]byte, error)
UnmarshalXDR([]byte) error
}
func testMarshal(t *testing.T, prefix string, m1, m2 message) bool {
failed := func(bc []byte) {
bs, _ := json.MarshalIndent(m1, "", " ")
ioutil.WriteFile(prefix+"-1.txt", bs, 0644)
bs, _ = json.MarshalIndent(m2, "", " ")
ioutil.WriteFile(prefix+"-2.txt", bs, 0644)
if len(bc) > 0 {
f, _ := os.Create(prefix + "-data.txt")
fmt.Fprint(f, hex.Dump(bc))
f.Close()
buf, err := m1.Marshal()
if err != nil {
t.Fatal(err)
}
err = m2.Unmarshal(buf)
if err != nil {
t.Fatal(err)
}
bs1, _ := json.MarshalIndent(m1, "", " ")
bs2, _ := json.MarshalIndent(m2, "", " ")
if !bytes.Equal(bs1, bs2) {
ioutil.WriteFile(prefix+"-1.txt", bs1, 0644)
ioutil.WriteFile(prefix+"-2.txt", bs1, 0644)
return false
}
return true
}
func TestMarshalledIndexMessageSize(t *testing.T) {
// We should be able to handle a 1 TiB file without
// blowing the default max message size.
if testing.Short() {
t.Skip("this test requires a lot of memory")
return
}
const (
maxMessageSize = MaxMessageLen
fileSize = 1 << 40
blockSize = BlockSize
)
f := FileInfo{
Name: "a normal length file name withoout any weird stuff.txt",
Type: FileInfoTypeFile,
Size: fileSize,
Permissions: 0666,
Modified: time.Now().Unix(),
Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}},
Blocks: make([]BlockInfo, fileSize/blockSize),
}
for i := 0; i < fileSize/blockSize; i++ {
f.Blocks[i].Offset = int64(i) * blockSize
f.Blocks[i].Size = blockSize
f.Blocks[i].Hash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 1, 2, 3, 4, 5, 6, 7, 8, 9, 30, 1, 2}
}
idx := Index{
Folder: "some folder ID",
Files: []FileInfo{f},
}
msgSize := idx.ProtoSize()
if msgSize > maxMessageSize {
t.Errorf("Message size %d bytes is larger than max %d", msgSize, maxMessageSize)
}
}
func TestLZ4Compression(t *testing.T) {
c := new(rawConnection)
for i := 0; i < 10; i++ {
dataLen := 150 + rand.Intn(150)
data := make([]byte, dataLen)
_, err := io.ReadFull(rand.Reader, data[100:])
if err != nil {
t.Fatal(err)
}
comp, err := c.lz4Compress(data)
if err != nil {
t.Errorf("compressing %d bytes: %v", dataLen, err)
continue
}
}
buf, err := m1.MarshalXDR()
if err != nil && strings.Contains(err.Error(), "exceeds size") {
return true
}
if err != nil {
failed(nil)
t.Fatal(err)
}
err = m2.UnmarshalXDR(buf)
if err != nil {
failed(buf)
t.Fatal(err)
}
ok := reflect.DeepEqual(m1, m2)
if !ok {
failed(buf)
}
return ok
}
func timeoutWriteHeader(w io.Writer, hdr header) {
// This tries to write a message header to w, but times out after a while.
// This is useful because in testing, with a PipeWriter, it will block
// forever if the other side isn't reading any more. On the other hand we
// can't just "go" it into the background, because if the other side is
// still there we should wait for the write to complete. Yay.
var buf [8]byte // header and message length
binary.BigEndian.PutUint32(buf[:], encodeHeader(hdr))
binary.BigEndian.PutUint32(buf[4:], 0) // zero message length, explicitly
done := make(chan struct{})
go func() {
w.Write(buf[:])
close(done)
}()
select {
case <-done:
case <-time.After(250 * time.Millisecond):
}
}
func TestFileInfoSize(t *testing.T) {
fi := FileInfo{
Blocks: []BlockInfo{
{Size: 42},
{Offset: 42, Size: 23},
{Offset: 42 + 23, Size: 34},
},
}
size := fi.Size()
want := int64(42 + 23 + 34)
if size != want {
t.Errorf("Incorrect size reported, got %d, want %d", size, want)
}
size = fi.Size() // Cached, this time
if size != want {
t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
}
fi.CachedSize = 8
want = 8
size = fi.Size() // Ensure it came from the cache
if size != want {
t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
}
fi.CachedSize = 0
fi.Flags = FlagDirectory
want = 128
size = fi.Size() // Directories are 128 bytes large
if size != want {
t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
}
fi.CachedSize = 0
fi.Flags = FlagDeleted
want = 128
size = fi.Size() // Also deleted files
if size != want {
t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
res, err := c.lz4Decompress(comp)
if err != nil {
t.Errorf("decompressing %d bytes to %d: %v", len(comp), dataLen, err)
continue
}
if len(res) != len(data) {
t.Errorf("Incorrect len %d != expected %d", len(res), len(data))
}
if !bytes.Equal(data, res) {
t.Error("Incorrect decompressed data")
}
t.Logf("OK #%d, %d -> %d -> %d", i, dataLen, len(comp), dataLen)
}
}