New Cluster Configuration message replaces Options (fixes #63)

This commit is contained in:
Jakob Borg
2014-04-13 15:28:26 +02:00
parent 41c228cb56
commit 5064f846fc
17 changed files with 767 additions and 209 deletions

View File

@@ -79,10 +79,14 @@ version, type and ID.
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
For BEP v1 the Version field is set to zero. Future versions with
incompatible message formats will increment the Version field.
incompatible message formats will increment the Version field. A message
with an unknown version is a protocol error and MUST result in the
connection being terminated. A client supporting multiple versions MAY
retry with a different protcol version upon disconnection.
The Type field indicates the type of data following the message header
and is one of the integers defined below.
and is one of the integers defined below. A message of an unknown type
is a protocol error and MUST result in the connection being terminated.
The Message ID is set to a unique value for each transmitted message. In
request messages the Reply To is set to zero. In response messages it is
@@ -110,6 +114,183 @@ Opaque data should not be interpreted but can be compared bytewise to
other opaque data. All strings MUST use the Unicode UTF-8 encoding,
normalization form C.
### Cluster Config (Type = 0)
This informational message provides information about the cluster
configuration, as it pertains to the current connection. It is sent by
both sides after connection establishment.
#### Graphical Representation
ClusterConfigMessage Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ClientName |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ClientName (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ClientVersion |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ClientVersion (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Repositories |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more Repository Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Options |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more Option Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Repository Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ID (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Nodes |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more Node Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Node Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ID (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Option Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Key |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Key (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Value |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Value (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
#### Fields
The ClientName and ClientVersion fields identify the implementation. The
values SHOULD be simple strings identifying the implementation name, as
a user would expect to see it, and the version string in the same
manner. An example ClientName is "syncthing" and an example
ClientVersion is "v0.7.2". The ClientVersion field SHOULD follow the
patterns laid out in the [Semantic Versioning](http://semver.org/)
standard.
The Repositories field lists all repositories that will be synchronized
over the current connection. Each repository has a list of participating
Nodes. Each node has an associated Flags field to indicate the sharing
mode of that node for the repository in question. See the discussion on
Sharing Modes.
The Node Flags field contains the following single bit flags:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Reserved |Pri| Reserved |R|T|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- Bit 31 ("T", Trusted) is set for nodes that participate in trusted
mode.
- Bit 30 ("R", Read Only) is set for nodes that participate in read
only mode.
- Bits 16 through 28 are reserved and MUST be set to zero.
- Bits 14-15 ("Pri) indicate the node's upload priority for this
repository. Possible values are:
- 00: The default. Normal priority.
- 01: High priority. Other nodes SHOULD favour requesting files from
this node over nodes with normal or low priority.
- 10: Low priority. Other nodes SHOULD avoid requesting files from
this node when they are available from other nodes.
- 11: Sharing disabled. Other nodes SHOULD NOT request files from
this node.
- Bits 0 through 14 are reserved and MUST be set to zero.
Exactly one of the T, R or S bits MUST be set.
The Options field contain option values to be used in an implementation
specific manner. The options list is conceptually a map of Key => Value
items, although it is transmitted in the form of a list of (Key, Value)
pairs, both of string type. Key ID:s are implementation specific. An
implementation MUST ignore unknown keys. An implementation MAY impose
limits on the length keys and values. The options list may be used to
inform nodes of relevant local configuration options such as rate
limiting or make recommendations about request parallellism, node
priorities, etc. An empty options list is valid for nodes not having any
such information to share. Nodes MAY NOT make any assumptions about
peers acting in a specific manner as a result of sent options.
#### XDR
struct ClusterConfigMessage {
string ClientName<>;
string ClientVersion<>;
Repository Repositories<>;
Option Options<>;
}
struct Repository {
string ID<>;
Node Nodes<>;
}
struct Node {
string ID<>;
unsigned int Flags;
}
struct Option {
string Key<>;
string Value<>;
}
### Index (Type = 1)
The Index message defines the contents of the senders repository. An
@@ -356,65 +537,32 @@ model, the Index Update merely amends it with new or updated file
information. Any files not mentioned in an Index Update are left
unchanged.
### Options (Type = 7)
Sharing Modes
-------------
This informational message provides information about the client
configuration, version, etc. It is sent at connection initiation and,
optionally, when any of the sent parameters have changed. The message is
in the form of a list of (key, value) pairs, both of string type.
### Trusted
Key ID:s apart from the well known ones are implementation specific. An
implementation is expected to ignore unknown keys. An implementation may
impose limits on key and value size.
Trusted mode is the default sharing mode. Updates are exchanged in both
directions.
Well known keys:
+------------+ Updates /---------\
| | -----------> / \
| Node | | Cluster |
| | <----------- \ /
+------------+ Updates \---------/
- "clientId" -- The name of the implementation. Example: "syncthing".
### Read Only
- "clientVersion" -- The version of the client. Example: "v1.0.33-47".
The Following the SemVer 2.0 specification for version strings is
encouraged but not enforced.
In read only mode a node does not synchronize the local repository to
the cluster, but publishes changes to it's local repository contents as
usual. The local repository can be seen as a "master copy" that is never
affected by the actions of other cluster nodes.
#### Graphical Representation
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Options |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more KeyValue Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
KeyValue Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Key |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Key (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Value |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Value (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
#### XDR
struct OptionsMessage {
KeyValue Options<>;
}
struct KeyValue {
string Key<>;
string Value<>;
}
+------------+ Updates /---------\
| | -----------> / \
| Node | | Cluster |
| | \ /
+------------+ \---------/
Message Limits
--------------

View File

@@ -38,6 +38,9 @@ func (t *TestModel) Close(nodeID string, err error) {
close(t.closedCh)
}
func (t *TestModel) ClusterConfig(nodeID string, config ClusterConfigMessage) {
}
func (t *TestModel) isClosed() bool {
select {
case <-t.closedCh:

View File

@@ -25,8 +25,21 @@ type RequestMessage struct {
Size uint32
}
type OptionsMessage struct {
Options []Option // max:64
type ClusterConfigMessage struct {
ClientName string // max:64
ClientVersion string // max:64
Repositories []Repository // max:64
Options []Option // max:64
}
type Repository struct {
ID string // max:64
Nodes []Node // max:64
}
type Node struct {
ID string // max:64
Flags uint32
}
type Option struct {

View File

@@ -198,19 +198,34 @@ func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error {
return xr.Error()
}
func (o OptionsMessage) EncodeXDR(w io.Writer) (int, error) {
func (o ClusterConfigMessage) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}
func (o OptionsMessage) MarshalXDR() []byte {
func (o ClusterConfigMessage) MarshalXDR() []byte {
var buf bytes.Buffer
var xw = xdr.NewWriter(&buf)
o.encodeXDR(xw)
return buf.Bytes()
}
func (o OptionsMessage) encodeXDR(xw *xdr.Writer) (int, error) {
func (o ClusterConfigMessage) encodeXDR(xw *xdr.Writer) (int, error) {
if len(o.ClientName) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.ClientName)
if len(o.ClientVersion) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.ClientVersion)
if len(o.Repositories) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteUint32(uint32(len(o.Repositories)))
for i := range o.Repositories {
o.Repositories[i].encodeXDR(xw)
}
if len(o.Options) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
@@ -221,18 +236,28 @@ func (o OptionsMessage) encodeXDR(xw *xdr.Writer) (int, error) {
return xw.Tot(), xw.Error()
}
func (o *OptionsMessage) DecodeXDR(r io.Reader) error {
func (o *ClusterConfigMessage) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}
func (o *OptionsMessage) UnmarshalXDR(bs []byte) error {
func (o *ClusterConfigMessage) UnmarshalXDR(bs []byte) error {
var buf = bytes.NewBuffer(bs)
var xr = xdr.NewReader(buf)
return o.decodeXDR(xr)
}
func (o *OptionsMessage) decodeXDR(xr *xdr.Reader) error {
func (o *ClusterConfigMessage) decodeXDR(xr *xdr.Reader) error {
o.ClientName = xr.ReadStringMax(64)
o.ClientVersion = xr.ReadStringMax(64)
_RepositoriesSize := int(xr.ReadUint32())
if _RepositoriesSize > 64 {
return xdr.ErrElementSizeExceeded
}
o.Repositories = make([]Repository, _RepositoriesSize)
for i := range o.Repositories {
(&o.Repositories[i]).decodeXDR(xr)
}
_OptionsSize := int(xr.ReadUint32())
if _OptionsSize > 64 {
return xdr.ErrElementSizeExceeded
@@ -244,6 +269,95 @@ func (o *OptionsMessage) decodeXDR(xr *xdr.Reader) error {
return xr.Error()
}
func (o Repository) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}
func (o Repository) MarshalXDR() []byte {
var buf bytes.Buffer
var xw = xdr.NewWriter(&buf)
o.encodeXDR(xw)
return buf.Bytes()
}
func (o Repository) encodeXDR(xw *xdr.Writer) (int, error) {
if len(o.ID) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.ID)
if len(o.Nodes) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteUint32(uint32(len(o.Nodes)))
for i := range o.Nodes {
o.Nodes[i].encodeXDR(xw)
}
return xw.Tot(), xw.Error()
}
func (o *Repository) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}
func (o *Repository) UnmarshalXDR(bs []byte) error {
var buf = bytes.NewBuffer(bs)
var xr = xdr.NewReader(buf)
return o.decodeXDR(xr)
}
func (o *Repository) decodeXDR(xr *xdr.Reader) error {
o.ID = xr.ReadStringMax(64)
_NodesSize := int(xr.ReadUint32())
if _NodesSize > 64 {
return xdr.ErrElementSizeExceeded
}
o.Nodes = make([]Node, _NodesSize)
for i := range o.Nodes {
(&o.Nodes[i]).decodeXDR(xr)
}
return xr.Error()
}
func (o Node) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}
func (o Node) MarshalXDR() []byte {
var buf bytes.Buffer
var xw = xdr.NewWriter(&buf)
o.encodeXDR(xw)
return buf.Bytes()
}
func (o Node) encodeXDR(xw *xdr.Writer) (int, error) {
if len(o.ID) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.ID)
xw.WriteUint32(o.Flags)
return xw.Tot(), xw.Error()
}
func (o *Node) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}
func (o *Node) UnmarshalXDR(bs []byte) error {
var buf = bytes.NewBuffer(bs)
var xr = xdr.NewReader(buf)
return o.decodeXDR(xr)
}
func (o *Node) decodeXDR(xr *xdr.Reader) error {
o.ID = xr.ReadStringMax(64)
o.Flags = xr.ReadUint32()
return xr.Error()
}
func (o Option) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)

View File

@@ -29,6 +29,10 @@ func (m nativeModel) Request(nodeID, repo string, name string, offset int64, siz
return m.next.Request(nodeID, repo, name, offset, size)
}
func (m nativeModel) ClusterConfig(nodeID string, config ClusterConfigMessage) {
m.next.ClusterConfig(nodeID, config)
}
func (m nativeModel) Close(nodeID string, err error) {
m.next.Close(nodeID, err)
}

View File

@@ -20,6 +20,10 @@ func (m nativeModel) Request(nodeID, repo string, name string, offset int64, siz
return m.next.Request(nodeID, repo, name, offset, size)
}
func (m nativeModel) ClusterConfig(nodeID string, config ClusterConfigMessage) {
m.next.ClusterConfig(nodeID, config)
}
func (m nativeModel) Close(nodeID string, err error) {
m.next.Close(nodeID, err)
}

View File

@@ -29,6 +29,10 @@ func (m nativeModel) Request(nodeID, repo string, name string, offset int64, siz
return m.next.Request(nodeID, repo, name, offset, size)
}
func (m nativeModel) ClusterConfig(nodeID string, config ClusterConfigMessage) {
m.next.ClusterConfig(nodeID, config)
}
func (m nativeModel) Close(nodeID string, err error) {
m.next.Close(nodeID, err)
}

View File

@@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"log"
"sync"
"time"
@@ -17,13 +16,13 @@ import (
const BlockSize = 128 * 1024
const (
messageTypeIndex = 1
messageTypeRequest = 2
messageTypeResponse = 3
messageTypePing = 4
messageTypePong = 5
messageTypeIndexUpdate = 6
messageTypeOptions = 7
messageTypeClusterConfig = 0
messageTypeIndex = 1
messageTypeRequest = 2
messageTypeResponse = 3
messageTypePing = 4
messageTypePong = 5
messageTypeIndexUpdate = 6
)
const (
@@ -32,6 +31,12 @@ const (
FlagDirectory = 1 << 14
)
const (
FlagShareTrusted uint32 = 1 << 0
FlagShareReadOnly = 1 << 1
FlagShareBits = 0x000000ff
)
var (
ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
ErrClosed = errors.New("connection closed")
@@ -44,6 +49,8 @@ type Model interface {
IndexUpdate(nodeID string, repo string, files []FileInfo)
// A request was made by the peer node
Request(nodeID string, repo string, name string, offset int64, size int) ([]byte, error)
// A cluster configuration message was received
ClusterConfig(nodeID string, config ClusterConfigMessage)
// The peer node closed the connection
Close(nodeID string, err error)
}
@@ -52,27 +59,24 @@ type Connection interface {
ID() string
Index(repo string, files []FileInfo)
Request(repo string, name string, offset int64, size int) ([]byte, error)
ClusterConfig(config ClusterConfigMessage)
Statistics() Statistics
Option(key string) string
}
type rawConnection struct {
sync.RWMutex
id string
receiver Model
reader io.ReadCloser
xr *xdr.Reader
writer io.WriteCloser
wb *bufio.Writer
xw *xdr.Writer
closed chan struct{}
awaiting map[int]chan asyncResult
nextID int
indexSent map[string]map[string][2]int64
peerOptions map[string]string
myOptions map[string]string
optionsLock sync.Mutex
id string
receiver Model
reader io.ReadCloser
xr *xdr.Reader
writer io.WriteCloser
wb *bufio.Writer
xw *xdr.Writer
closed chan struct{}
awaiting map[int]chan asyncResult
nextID int
indexSent map[string]map[string][2]int64
hasSentIndex bool
hasRecvdIndex bool
@@ -88,7 +92,7 @@ const (
pingIdleTime = 5 * time.Minute
)
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) Connection {
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) Connection {
flrd := flate.NewReader(reader)
flwr, err := flate.NewWriter(writer, flate.BestSpeed)
if err != nil {
@@ -112,28 +116,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
go c.readerLoop()
go c.pingerLoop()
if options != nil {
c.myOptions = options
go func() {
c.Lock()
header{0, c.nextID, messageTypeOptions}.encodeXDR(c.xw)
var om OptionsMessage
for k, v := range options {
om.Options = append(om.Options, Option{k, v})
}
om.encodeXDR(c.xw)
err := c.xw.Error()
if err == nil {
err = c.flush()
}
if err != nil {
log.Println("Warning: Write error during initial handshake:", err)
}
c.nextID++
c.Unlock()
}()
}
return wireFormatConnection{&c}
}
@@ -217,6 +199,27 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
return res.val, res.err
}
// ClusterConfig send the cluster configuration message to the peer and returns any error
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
c.Lock()
defer c.Unlock()
if c.isClosed() {
return
}
header{0, c.nextID, messageTypeClusterConfig}.encodeXDR(c.xw)
c.nextID = (c.nextID + 1) & 0xfff
_, err := config.encodeXDR(c.xw)
if err == nil {
err = c.flush()
}
if err != nil {
c.close(err)
}
}
func (c *rawConnection) ping() bool {
c.Lock()
if c.isClosed() {
@@ -386,24 +389,14 @@ loop:
c.Unlock()
}
case messageTypeOptions:
var om OptionsMessage
om.decodeXDR(c.xr)
case messageTypeClusterConfig:
var cm ClusterConfigMessage
cm.decodeXDR(c.xr)
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
}
c.optionsLock.Lock()
c.peerOptions = make(map[string]string, len(om.Options))
for _, opt := range om.Options {
c.peerOptions[opt.Key] = opt.Value
}
c.optionsLock.Unlock()
if mh, rh := c.myOptions["clusterHash"], c.peerOptions["clusterHash"]; len(mh) > 0 && len(rh) > 0 && mh != rh {
c.close(ErrClusterHash)
break loop
} else {
go c.receiver.ClusterConfig(c.id, cm)
}
default:
@@ -472,9 +465,3 @@ func (c *rawConnection) Statistics() Statistics {
OutBytesTotal: int(c.xw.Tot()),
}
}
func (c *rawConnection) Option(key string) string {
c.optionsLock.Lock()
defer c.optionsLock.Unlock()
return c.peerOptions[key]
}

View File

@@ -25,8 +25,8 @@ func TestPing(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, nil, nil).(wireFormatConnection).next.(*rawConnection)
c1 := NewConnection("c1", br, aw, nil, nil).(wireFormatConnection).next.(*rawConnection)
c0 := NewConnection("c0", ar, bw, nil).(wireFormatConnection).next.(*rawConnection)
c1 := NewConnection("c1", br, aw, nil).(wireFormatConnection).next.(*rawConnection)
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
@@ -49,8 +49,8 @@ func TestPingErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
c0 := NewConnection("c0", ar, ebw, m0, nil).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, eaw, m1, nil)
c0 := NewConnection("c0", ar, ebw, m0).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, eaw, m1)
res := c0.ping()
if (i < 4 || j < 4) && res {
@@ -125,8 +125,8 @@ func TestVersionErr(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0, nil).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, aw, m1, nil)
c0 := NewConnection("c0", ar, bw, m0).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, aw, m1)
c0.xw.WriteUint32(encodeHeader(header{
version: 2,
@@ -147,8 +147,8 @@ func TestTypeErr(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0, nil).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, aw, m1, nil)
c0 := NewConnection("c0", ar, bw, m0).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, aw, m1)
c0.xw.WriteUint32(encodeHeader(header{
version: 0,
@@ -169,8 +169,8 @@ func TestClose(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0, nil).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, aw, m1, nil)
c0 := NewConnection("c0", ar, bw, m0).(wireFormatConnection).next.(*rawConnection)
NewConnection("c1", br, aw, m1)
c0.close(nil)

View File

@@ -30,10 +30,10 @@ func (c wireFormatConnection) Request(repo, name string, offset int64, size int)
return c.next.Request(repo, name, offset, size)
}
func (c wireFormatConnection) ClusterConfig(config ClusterConfigMessage) {
c.next.ClusterConfig(config)
}
func (c wireFormatConnection) Statistics() Statistics {
return c.next.Statistics()
}
func (c wireFormatConnection) Option(key string) string {
return c.next.Option(key)
}