From 283f39ae5f5739ba15780cc28b4d0e710dce6c24 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Wed, 8 May 2019 08:08:26 +0200 Subject: [PATCH] lib/protocol: Revert unreleased changes related to closing connections (#5688) This reverts commits: ec7c88ca55bcc6827abab339defe774a1cf6cea1 19b51c9b92aae2997e6471e001f01ab419555244 5da41f75fab1b8cf07907789c738c59c960ace4b 04b927104fe2983875016fe565c352ee9517036a --- lib/protocol/protocol.go | 152 +++++++++++++-------------------------- 1 file changed, 51 insertions(+), 101 deletions(-) diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index e7ce12e3..d5ec3115 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -182,13 +182,11 @@ type rawConnection struct { nextID int32 nextIDMut sync.Mutex - sentClusterConfig chan struct{} - outbox chan asyncMessage - closed chan struct{} - closeOnce sync.Once - sendCloseOnce sync.Once - wg sync.WaitGroup - compression Compression + outbox chan asyncMessage + closed chan struct{} + closeOnce sync.Once + sendCloseOnce sync.Once + compression Compression } type asyncResult struct { @@ -222,16 +220,15 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv cw := &countingWriter{Writer: writer} c := rawConnection{ - id: deviceID, - name: name, - receiver: nativeModel{receiver}, - cr: cr, - cw: cw, - awaiting: make(map[int32]chan asyncResult), - sentClusterConfig: make(chan struct{}), - outbox: make(chan asyncMessage), - closed: make(chan struct{}), - compression: compress, + id: deviceID, + name: name, + receiver: nativeModel{receiver}, + cr: cr, + cw: cw, + awaiting: make(map[int32]chan asyncResult), + outbox: make(chan asyncMessage), + closed: make(chan struct{}), + compression: compress, } return wireFormatConnection{&c} @@ -240,21 +237,13 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv // Start creates the goroutines for sending and receiving of messages. It must // be called exactly once after creating a connection. func (c *rawConnection) Start() { - c.startGoroutine(c.readerLoop) - c.startGoroutine(c.writerLoop) - c.startGoroutine(c.pingSender) - c.startGoroutine(c.pingReceiver) -} - -func (c *rawConnection) startGoroutine(loop func() error) { - c.wg.Add(1) go func() { - err := loop() - c.wg.Done() - if err != nil && err != ErrClosed { - c.internalClose(err) - } + err := c.readerLoop() + c.internalClose(err) }() + go c.writerLoop() + go c.pingSender() + go c.pingReceiver() } func (c *rawConnection) ID() DeviceID { @@ -333,20 +322,9 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i return res.val, res.err } -// ClusterConfig sends the cluster configuration message to the peer. -// It must be called just once (as per BEP). +// ClusterConfig send the cluster configuration message to the peer and returns any error func (c *rawConnection) ClusterConfig(config ClusterConfig) { - select { - case <-c.sentClusterConfig: - return - case <-c.closed: - return - default: - } - if err := c.writeMessage(&config); err != nil { - c.internalClose(err) - } - close(c.sentClusterConfig) + c.send(&config, nil) } func (c *rawConnection) Closed() bool { @@ -370,44 +348,26 @@ func (c *rawConnection) ping() bool { return c.send(&Ping{}, nil) } -type messageWithError struct { - msg message - err error -} - -func (c *rawConnection) readerLoop() error { +func (c *rawConnection) readerLoop() (err error) { fourByteBuf := make([]byte, 4) - inbox := make(chan messageWithError) - - // Reading from the wire may block until the underlying connection is closed. - go func() { - for { - msg, err := c.readMessage(fourByteBuf) - select { - case inbox <- messageWithError{msg: msg, err: err}: - case <-c.closed: - return - } - } - }() - state := stateInitial - var msgWithErr messageWithError for { select { - case msgWithErr = <-inbox: case <-c.closed: return ErrClosed - } - if msgWithErr.err != nil { - if msgWithErr.err == errUnknownMessage { - // Unknown message types are skipped, for future extensibility. - continue - } - return msgWithErr.err + default: } - switch msg := msgWithErr.msg.(type) { + msg, err := c.readMessage(fourByteBuf) + if err == errUnknownMessage { + // Unknown message types are skipped, for future extensibility. + continue + } + if err != nil { + return err + } + + switch msg := msg.(type) { case *ClusterConfig: l.Debugln("read ClusterConfig message") if state != stateInitial { @@ -667,26 +627,19 @@ func (c *rawConnection) handleResponse(resp Response) { c.awaitingMut.Unlock() } -func (c *rawConnection) send(msg message, done chan struct{}) (sent bool) { - defer func() { - if !sent && done != nil { - close(done) - } - }() - select { - case <-c.sentClusterConfig: - case <-c.closed: - return false - } +func (c *rawConnection) send(msg message, done chan struct{}) bool { select { case c.outbox <- asyncMessage{msg, done}: return true case <-c.closed: + if done != nil { + close(done) + } return false } } -func (c *rawConnection) writerLoop() error { +func (c *rawConnection) writerLoop() { for { select { case hm := <-c.outbox: @@ -695,11 +648,12 @@ func (c *rawConnection) writerLoop() error { close(hm.done) } if err != nil { - return err + c.internalClose(err) + return } case <-c.closed: - return ErrClosed + return } } } @@ -872,7 +826,10 @@ func (c *rawConnection) Close(err error) { } }) - c.internalClose(err) + // No more sends are necessary, therefore further steps to close the + // connection outside of this package can proceed immediately. + // And this prevents a potential deadlock due to calling c.receiver.Closed + go c.internalClose(err) } // internalClose is called if there is an unexpected error during normal operation. @@ -890,14 +847,7 @@ func (c *rawConnection) internalClose(err error) { } c.awaitingMut.Unlock() - // Wait for all our operations to terminate before signaling - // to the receiver that the connection was closed. - c.wg.Wait() - - // No more sends are necessary, therefore further steps to close the - // connection outside of this package can proceed immediately. - // And this prevents a potential deadlock. - go c.receiver.Closed(c, err) + c.receiver.Closed(c, err) }) } @@ -906,7 +856,7 @@ func (c *rawConnection) internalClose(err error) { // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This // results in an effecting ping interval of somewhere between // PingSendInterval/2 and PingSendInterval. -func (c *rawConnection) pingSender() error { +func (c *rawConnection) pingSender() { ticker := time.NewTicker(PingSendInterval / 2) defer ticker.Stop() @@ -923,7 +873,7 @@ func (c *rawConnection) pingSender() error { c.ping() case <-c.closed: - return ErrClosed + return } } } @@ -931,7 +881,7 @@ func (c *rawConnection) pingSender() error { // The pingReceiver checks that we've received a message (any message will do, // but we expect pings in the absence of other messages) within the last // ReceiveTimeout. If not, we close the connection with an ErrTimeout. -func (c *rawConnection) pingReceiver() error { +func (c *rawConnection) pingReceiver() { ticker := time.NewTicker(ReceiveTimeout / 2) defer ticker.Stop() @@ -941,13 +891,13 @@ func (c *rawConnection) pingReceiver() error { d := time.Since(c.cr.Last()) if d > ReceiveTimeout { l.Debugln(c.id, "ping timeout", d) - return ErrTimeout + c.internalClose(ErrTimeout) } l.Debugln(c.id, "last read within", d) case <-c.closed: - return ErrClosed + return } } }