lib/protocol: Prioritize close msg and add close timeout (#5746)
This commit is contained in:
parent
6e8aa0ec25
commit
e39d3f95dd
@ -3266,6 +3266,12 @@ func TestSanitizePath(t *testing.T) {
|
|||||||
// on a protocol connection that has a blocking reader (blocking writer can't
|
// on a protocol connection that has a blocking reader (blocking writer can't
|
||||||
// be done as the test requires clusterconfigs to go through).
|
// be done as the test requires clusterconfigs to go through).
|
||||||
func TestConnCloseOnRestart(t *testing.T) {
|
func TestConnCloseOnRestart(t *testing.T) {
|
||||||
|
oldCloseTimeout := protocol.CloseTimeout
|
||||||
|
protocol.CloseTimeout = 100 * time.Millisecond
|
||||||
|
defer func() {
|
||||||
|
protocol.CloseTimeout = oldCloseTimeout
|
||||||
|
}()
|
||||||
|
|
||||||
w, fcfg := tmpDefaultWrapper()
|
w, fcfg := tmpDefaultWrapper()
|
||||||
m := setupModel(w)
|
m := setupModel(w)
|
||||||
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
||||||
|
|||||||
@ -184,6 +184,7 @@ type rawConnection struct {
|
|||||||
|
|
||||||
inbox chan message
|
inbox chan message
|
||||||
outbox chan asyncMessage
|
outbox chan asyncMessage
|
||||||
|
closeBox chan asyncMessage
|
||||||
clusterConfigBox chan *ClusterConfig
|
clusterConfigBox chan *ClusterConfig
|
||||||
dispatcherLoopStopped chan struct{}
|
dispatcherLoopStopped chan struct{}
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
@ -218,6 +219,11 @@ const (
|
|||||||
ReceiveTimeout = 300 * time.Second
|
ReceiveTimeout = 300 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// CloseTimeout is the longest we'll wait when trying to send the close
|
||||||
|
// message before just closing the connection.
|
||||||
|
// Should not be modified in production code, just for testing.
|
||||||
|
var CloseTimeout = 10 * time.Second
|
||||||
|
|
||||||
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
|
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
|
||||||
cr := &countingReader{Reader: reader}
|
cr := &countingReader{Reader: reader}
|
||||||
cw := &countingWriter{Writer: writer}
|
cw := &countingWriter{Writer: writer}
|
||||||
@ -231,6 +237,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
|
|||||||
awaiting: make(map[int32]chan asyncResult),
|
awaiting: make(map[int32]chan asyncResult),
|
||||||
inbox: make(chan message),
|
inbox: make(chan message),
|
||||||
outbox: make(chan asyncMessage),
|
outbox: make(chan asyncMessage),
|
||||||
|
closeBox: make(chan asyncMessage),
|
||||||
clusterConfigBox: make(chan *ClusterConfig),
|
clusterConfigBox: make(chan *ClusterConfig),
|
||||||
dispatcherLoopStopped: make(chan struct{}),
|
dispatcherLoopStopped: make(chan struct{}),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
@ -671,6 +678,10 @@ func (c *rawConnection) writerLoop() {
|
|||||||
c.internalClose(err)
|
c.internalClose(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case hm := <-c.closeBox:
|
||||||
|
_ = c.writeMessage(hm.msg)
|
||||||
|
close(hm.done)
|
||||||
|
return
|
||||||
case <-c.closed:
|
case <-c.closed:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -686,6 +697,11 @@ func (c *rawConnection) writerLoop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case hm := <-c.closeBox:
|
||||||
|
_ = c.writeMessage(hm.msg)
|
||||||
|
close(hm.done)
|
||||||
|
return
|
||||||
|
|
||||||
case <-c.closed:
|
case <-c.closed:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -853,17 +869,20 @@ func (c *rawConnection) shouldCompressMessage(msg message) bool {
|
|||||||
func (c *rawConnection) Close(err error) {
|
func (c *rawConnection) Close(err error) {
|
||||||
c.sendCloseOnce.Do(func() {
|
c.sendCloseOnce.Do(func() {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
c.send(&Close{err.Error()}, done)
|
timeout := time.NewTimer(CloseTimeout)
|
||||||
|
select {
|
||||||
|
case c.closeBox <- asyncMessage{&Close{err.Error()}, done}:
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
|
case <-timeout.C:
|
||||||
|
case <-c.closed:
|
||||||
|
}
|
||||||
|
case <-timeout.C:
|
||||||
case <-c.closed:
|
case <-c.closed:
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// No more sends are necessary, therefore further steps to close the
|
c.internalClose(err)
|
||||||
// connection outside of this package can proceed immediately.
|
|
||||||
// And this prevents a potential deadlock due to calling c.receiver.Closed
|
|
||||||
go c.internalClose(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// internalClose is called if there is an unexpected error during normal operation.
|
// internalClose is called if there is an unexpected error during normal operation.
|
||||||
|
|||||||
@ -86,6 +86,12 @@ func TestClose(t *testing.T) {
|
|||||||
// Close is called while the underlying connection is broken (send blocks).
|
// Close is called while the underlying connection is broken (send blocks).
|
||||||
// https://github.com/syncthing/syncthing/pull/5442
|
// https://github.com/syncthing/syncthing/pull/5442
|
||||||
func TestCloseOnBlockingSend(t *testing.T) {
|
func TestCloseOnBlockingSend(t *testing.T) {
|
||||||
|
oldCloseTimeout := CloseTimeout
|
||||||
|
CloseTimeout = 100 * time.Millisecond
|
||||||
|
defer func() {
|
||||||
|
CloseTimeout = oldCloseTimeout
|
||||||
|
}()
|
||||||
|
|
||||||
m := newTestModel()
|
m := newTestModel()
|
||||||
|
|
||||||
c := NewConnection(c0ID, &testutils.BlockingRW{}, &testutils.BlockingRW{}, m, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
|
c := NewConnection(c0ID, &testutils.BlockingRW{}, &testutils.BlockingRW{}, m, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
|
||||||
@ -214,6 +220,33 @@ func TestClusterConfigFirst(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCloseTimeout checks that calling Close times out and proceeds, if sending
|
||||||
|
// the close message does not succeed.
|
||||||
|
func TestCloseTimeout(t *testing.T) {
|
||||||
|
oldCloseTimeout := CloseTimeout
|
||||||
|
CloseTimeout = 100 * time.Millisecond
|
||||||
|
defer func() {
|
||||||
|
CloseTimeout = oldCloseTimeout
|
||||||
|
}()
|
||||||
|
|
||||||
|
m := newTestModel()
|
||||||
|
|
||||||
|
c := NewConnection(c0ID, &testutils.BlockingRW{}, &testutils.BlockingRW{}, m, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
|
||||||
|
c.Start()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
c.Close(errManual)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * CloseTimeout):
|
||||||
|
t.Fatal("timed out before Close returned")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMarshalIndexMessage(t *testing.T) {
|
func TestMarshalIndexMessage(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
quickCfg.MaxCount = 10
|
quickCfg.MaxCount = 10
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user