lib/protocol: Don't send any messages before cluster config (#5646)

* lib/model: Send cluster config before releasing pmut

* reshuffle

* add model.connReady to track cluster-config status

* Corrected comments/strings

* do it in protocol
This commit is contained in:
Simon Frei 2019-04-23 21:47:11 +02:00 committed by Audrius Butkevicius
parent 110806842c
commit 04b927104f

View File

@ -182,11 +182,12 @@ type rawConnection struct {
nextID int32 nextID int32
nextIDMut sync.Mutex nextIDMut sync.Mutex
outbox chan asyncMessage sentClusterConfig chan struct{}
closed chan struct{} outbox chan asyncMessage
closeOnce sync.Once closed chan struct{}
sendCloseOnce sync.Once closeOnce sync.Once
compression Compression sendCloseOnce sync.Once
compression Compression
} }
type asyncResult struct { type asyncResult struct {
@ -220,15 +221,16 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
cw := &countingWriter{Writer: writer} cw := &countingWriter{Writer: writer}
c := rawConnection{ c := rawConnection{
id: deviceID, id: deviceID,
name: name, name: name,
receiver: nativeModel{receiver}, receiver: nativeModel{receiver},
cr: cr, cr: cr,
cw: cw, cw: cw,
awaiting: make(map[int32]chan asyncResult), awaiting: make(map[int32]chan asyncResult),
outbox: make(chan asyncMessage), sentClusterConfig: make(chan struct{}),
closed: make(chan struct{}), outbox: make(chan asyncMessage),
compression: compress, closed: make(chan struct{}),
compression: compress,
} }
return wireFormatConnection{&c} return wireFormatConnection{&c}
@ -322,9 +324,20 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
return res.val, res.err return res.val, res.err
} }
// ClusterConfig send the cluster configuration message to the peer and returns any error // ClusterConfig sends the cluster configuration message to the peer.
// It must be called just once (as per BEP).
func (c *rawConnection) ClusterConfig(config ClusterConfig) { func (c *rawConnection) ClusterConfig(config ClusterConfig) {
c.send(&config, nil) select {
case <-c.sentClusterConfig:
return
case <-c.closed:
return
default:
}
if err := c.writeMessage(asyncMessage{&config, nil}); err != nil {
c.internalClose(err)
}
close(c.sentClusterConfig)
} }
func (c *rawConnection) Closed() bool { func (c *rawConnection) Closed() bool {
@ -628,13 +641,20 @@ func (c *rawConnection) handleResponse(resp Response) {
} }
func (c *rawConnection) send(msg message, done chan struct{}) bool { func (c *rawConnection) send(msg message, done chan struct{}) bool {
defer func() {
if done != nil {
close(done)
}
}()
select {
case <-c.sentClusterConfig:
case <-c.closed:
return false
}
select { select {
case c.outbox <- asyncMessage{msg, done}: case c.outbox <- asyncMessage{msg, done}:
return true return true
case <-c.closed: case <-c.closed:
if done != nil {
close(done)
}
return false return false
} }
} }