diff --git a/main.go b/main.go index 24978eb8..cd6d97c8 100644 --- a/main.go +++ b/main.go @@ -295,6 +295,11 @@ func listen(myID string, addr string, m *model.Model, cfg *tls.Config) { l, err := tls.Listen("tcp", addr, cfg) fatalErr(err) + connOpts := map[string]string{ + "clientId": "syncthing", + "clientVersion": Version, + } + listen: for { conn, err := l.Accept() @@ -329,7 +334,7 @@ listen: for nodeID := range nodeAddrs { if nodeID == remoteID { - protoConn := protocol.NewConnection(remoteID, conn, conn, m) + protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) m.AddConnection(conn, protoConn) continue listen } @@ -361,6 +366,11 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M warnf("No discovery possible (%v)", err) } + connOpts := map[string]string{ + "clientId": "syncthing", + "clientVersion": Version, + } + for { nextNode: for nodeID, addrs := range nodeAddrs { @@ -399,7 +409,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M continue } - protoConn := protocol.NewConnection(remoteID, conn, conn, m) + protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) m.AddConnection(conn, protoConn) continue nextNode } diff --git a/model/model.go b/model/model.go index 0b710eff..1426b511 100644 --- a/model/model.go +++ b/model/model.go @@ -59,6 +59,7 @@ type Connection interface { Index([]protocol.FileInfo) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) Statistics() protocol.Statistics + Option(key string) string } const ( @@ -155,7 +156,9 @@ func (m *Model) LocalAge() float64 { type ConnectionInfo struct { protocol.Statistics - Address string + Address string + ClientID string + ClientVersion string } // ConnectionStats returns a map with connection statistics for each connected node. @@ -169,7 +172,9 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { var res = make(map[string]ConnectionInfo) for node, conn := range m.protoConn { ci := ConnectionInfo{ - Statistics: conn.Statistics(), + Statistics: conn.Statistics(), + ClientID: conn.Option("clientId"), + ClientVersion: conn.Option("clientVersion"), } if nc, ok := m.rawConn[node].(remoteAddrer); ok { ci.Address = nc.RemoteAddr().String() diff --git a/model/model_test.go b/model/model_test.go index 8e9c0496..d864c118 100644 --- a/model/model_test.go +++ b/model/model_test.go @@ -12,7 +12,7 @@ import ( ) func TestNewModel(t *testing.T) { - m := NewModel("foo") + m := NewModel("foo", 1e6) if m == nil { t.Fatalf("NewModel returned nil") @@ -53,7 +53,7 @@ func init() { } func TestUpdateLocal(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -95,7 +95,7 @@ func TestUpdateLocal(t *testing.T) { } func TestRemoteUpdateExisting(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -112,7 +112,7 @@ func TestRemoteUpdateExisting(t *testing.T) { } func TestRemoteAddNew(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -129,7 +129,7 @@ func TestRemoteAddNew(t *testing.T) { } func TestRemoteUpdateOld(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -147,7 +147,7 @@ func TestRemoteUpdateOld(t *testing.T) { } func TestRemoteIndexUpdate(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -180,7 +180,7 @@ func TestRemoteIndexUpdate(t *testing.T) { } func TestDelete(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -282,7 +282,7 @@ func TestDelete(t *testing.T) { } func TestForgetNode(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -335,7 +335,7 @@ func TestForgetNode(t *testing.T) { } func TestRequest(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -357,7 +357,7 @@ func TestRequest(t *testing.T) { } func TestIgnoreWithUnknownFlags(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) @@ -404,7 +404,7 @@ func prepareModel(n int, m *Model) []protocol.FileInfo { } func BenchmarkRecomputeGlobal10k(b *testing.B) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) prepareModel(10000, m) b.ResetTimer() @@ -414,7 +414,7 @@ func BenchmarkRecomputeGlobal10k(b *testing.B) { } func BenchmarkRecomputeNeed10K(b *testing.B) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) prepareModel(10000, m) b.ResetTimer() @@ -424,7 +424,7 @@ func BenchmarkRecomputeNeed10K(b *testing.B) { } func BenchmarkIndexUpdate10000(b *testing.B) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) files := prepareModel(10000, m) b.ResetTimer() @@ -446,6 +446,10 @@ func (f FakeConnection) ID() string { return string(f.id) } +func (f FakeConnection) Option(string) string { + return "" +} + func (FakeConnection) Index([]protocol.FileInfo) {} func (f FakeConnection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) { @@ -461,7 +465,7 @@ func (FakeConnection) Statistics() protocol.Statistics { } func BenchmarkRequest(b *testing.B) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) fs, _ := m.Walk(false) m.ReplaceLocal(fs) diff --git a/model/walk_test.go b/model/walk_test.go index 331c8698..93265c07 100644 --- a/model/walk_test.go +++ b/model/walk_test.go @@ -21,7 +21,7 @@ var correctIgnores = map[string][]string{ } func TestWalk(t *testing.T) { - m := NewModel("testdata") + m := NewModel("testdata", 1e6) files, ignores := m.Walk(false) if l1, l2 := len(files), len(testdata); l1 != l2 { diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md index 8fb090ab..aed7823d 100644 --- a/protocol/PROTOCOL.md +++ b/protocol/PROTOCOL.md @@ -193,6 +193,33 @@ model, the Index Update merely amends it with new or updated file information. Any files not mentioned in an Index Update are left unchanged. +### Options (Type = 7) + +This informational message provides information about the client +configuration, version, etc. It is sent at connection initiation and, +optionally, when any of the sent parameters have changed. The message is +in the form of a list of (key, value) pairs, both of string type. + + struct OptionsMessage { + KeyValue Options<>; + } + + struct KeyValue { + string Key; + string Value; + } + +Key ID:s apart from the well known ones are implementation +specific. An implementation is expected to ignore unknown keys. An +implementation may impose limits on key and value size. + +Well known keys: + + - "clientId" -- The name of the implementation. Example: "syncthing". + - "clientVersion" -- The version of the client. Example: "v1.0.33-47". The + Following the SemVer 2.0 specification for version strings is + encouraged but not enforced. + Example Exchange ---------------- diff --git a/protocol/messages.go b/protocol/messages.go index 9b6fd6da..9a294609 100644 --- a/protocol/messages.go +++ b/protocol/messages.go @@ -65,6 +65,14 @@ func (w *marshalWriter) writeResponse(data []byte) { w.writeBytes(data) } +func (w *marshalWriter) writeOptions(opts map[string]string) { + w.writeUint32(uint32(len(opts))) + for k, v := range opts { + w.writeString(k) + w.writeString(v) + } +} + func (r *marshalReader) readHeader() header { return decodeHeader(r.readUint32()) } @@ -109,3 +117,14 @@ func (r *marshalReader) readRequest() request { func (r *marshalReader) readResponse() []byte { return r.readBytes() } + +func (r *marshalReader) readOptions() map[string]string { + n := r.readUint32() + opts := make(map[string]string, n) + for i := 0; i < int(n); i++ { + k := r.readString() + v := r.readString() + opts[k] = v + } + return opts +} diff --git a/protocol/messages_test.go b/protocol/messages_test.go index d1c4ee83..3be31b7e 100644 --- a/protocol/messages_test.go +++ b/protocol/messages_test.go @@ -117,3 +117,23 @@ func BenchmarkWriteRequest(b *testing.B) { wr.writeRequest(req) } } + +func TestOptions(t *testing.T) { + opts := map[string]string{ + "foo": "bar", + "someKey": "otherValue", + "hello": "", + "": "42", + } + + var buf = new(bytes.Buffer) + var wr = marshalWriter{w: buf} + wr.writeOptions(opts) + + var rd = marshalReader{r: buf} + var ropts = rd.readOptions() + + if !reflect.DeepEqual(opts, ropts) { + t.Error("Incorrect options marshal/demarshal") + } +} diff --git a/protocol/protocol.go b/protocol/protocol.go index 993df877..d919a06d 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "log" "sync" "time" @@ -18,6 +19,7 @@ const ( messageTypePing = 4 messageTypePong = 5 messageTypeIndexUpdate = 6 + messageTypeOptions = 7 ) const ( @@ -52,16 +54,18 @@ type Model interface { type Connection struct { sync.RWMutex - id string - receiver Model - reader io.Reader - mreader *marshalReader - writer io.Writer - mwriter *marshalWriter - closed bool - awaiting map[int]chan asyncResult - nextId int - indexSent map[string][2]int64 + id string + receiver Model + reader io.Reader + mreader *marshalReader + writer io.Writer + mwriter *marshalWriter + closed bool + awaiting map[int]chan asyncResult + nextId int + indexSent map[string][2]int64 + options map[string]string + optionsLock sync.Mutex hasSentIndex bool hasRecvdIndex bool @@ -81,7 +85,7 @@ const ( pingIdleTime = 5 * time.Minute ) -func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection { +func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) *Connection { flrd := flate.NewReader(reader) flwr, err := flate.NewWriter(writer, flate.BestSpeed) if err != nil { @@ -101,6 +105,20 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M go c.readerLoop() go c.pingerLoop() + if options != nil { + go func() { + c.Lock() + c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions}) + c.mwriter.writeOptions(options) + err := c.flush() + if err != nil { + log.Printf("Warning:", err) + } + c.nextId++ + c.Unlock() + }() + } + return &c } @@ -328,6 +346,11 @@ loop: c.Unlock() } + case messageTypeOptions: + c.optionsLock.Lock() + c.options = c.mreader.readOptions() + c.optionsLock.Unlock() + default: c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) break loop @@ -396,3 +419,9 @@ func (c *Connection) Statistics() Statistics { return stats } + +func (c *Connection) Option(key string) string { + c.optionsLock.Lock() + defer c.optionsLock.Unlock() + return c.options[key] +} diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index 3cd0f720..b9434588 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -43,8 +43,8 @@ func TestPing(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, nil) - c1 := NewConnection("c1", br, aw, nil) + c0 := NewConnection("c0", ar, bw, nil, nil) + c1 := NewConnection("c1", br, aw, nil, nil) if ok := c0.ping(); !ok { t.Error("c0 ping failed") @@ -67,8 +67,8 @@ func TestPingErr(t *testing.T) { eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} - c0 := NewConnection("c0", ar, ebw, m0) - NewConnection("c1", br, eaw, m1) + c0 := NewConnection("c0", ar, ebw, m0, nil) + NewConnection("c1", br, eaw, m1, nil) res := c0.ping() if (i < 4 || j < 4) && res { @@ -94,8 +94,8 @@ func TestRequestResponseErr(t *testing.T) { eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} - NewConnection("c0", ar, ebw, m0) - c1 := NewConnection("c1", br, eaw, m1) + NewConnection("c0", ar, ebw, m0, nil) + c1 := NewConnection("c1", br, eaw, m1, nil) d, err := c1.Request("tn", 1234, 3456, []byte("hashbytes")) if err == e || err == ErrClosed { @@ -143,8 +143,8 @@ func TestVersionErr(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, m0) - NewConnection("c1", br, aw, m1) + c0 := NewConnection("c0", ar, bw, m0, nil) + NewConnection("c1", br, aw, m1, nil) c0.mwriter.writeHeader(header{ version: 2, @@ -165,8 +165,8 @@ func TestTypeErr(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, m0) - NewConnection("c1", br, aw, m1) + c0 := NewConnection("c0", ar, bw, m0, nil) + NewConnection("c1", br, aw, m1, nil) c0.mwriter.writeHeader(header{ version: 0, @@ -187,8 +187,8 @@ func TestClose(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, m0) - NewConnection("c1", br, aw, m1) + c0 := NewConnection("c0", ar, bw, m0, nil) + NewConnection("c1", br, aw, m1, nil) c0.close(nil)