lib/model, lib/protocol: Use error handling to avoid panic on non-started folder (fixes #6174) (#6212)
This adds error returns to model methods called by the protocol layer. Returning an error will cause the connection to be torn down as the message couldn't be handled. Using this to signal that a folder isn't currently available will then cause a reconnection a few moments later, when it'll hopefully work better. Tested manually by running with STRECHECKDBEVERY=0 on a nontrivially sized setup. This panics reliably before this patch, but just causes a disconnect/reconnect now.
This commit is contained in:
@@ -166,10 +166,12 @@ func negotiateTLS(cert tls.Certificate, conn0, conn1 net.Conn) (net.Conn, net.Co
|
||||
|
||||
type fakeModel struct{}
|
||||
|
||||
func (m *fakeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (m *fakeModel) Index(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (m *fakeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
|
||||
@@ -181,11 +183,13 @@ func (m *fakeModel) Request(deviceID DeviceID, folder, name string, size int32,
|
||||
return &fakeRequestResponse{buf}, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) {
|
||||
func (m *fakeModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) Closed(conn Connection, err error) {
|
||||
}
|
||||
|
||||
func (m *fakeModel) DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) {
|
||||
func (m *fakeModel) DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,13 +25,15 @@ func newTestModel() *TestModel {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
if t.indexFn != nil {
|
||||
t.indexFn(deviceID, folder, files)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
|
||||
@@ -52,13 +54,15 @@ func (t *TestModel) Closed(conn Connection, err error) {
|
||||
close(t.closedCh)
|
||||
}
|
||||
|
||||
func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) {
|
||||
func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) error {
|
||||
if t.ccFn != nil {
|
||||
t.ccFn(deviceID, config)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate) {
|
||||
func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestModel) closedError() error {
|
||||
|
||||
@@ -12,18 +12,18 @@ type nativeModel struct {
|
||||
Model
|
||||
}
|
||||
|
||||
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
for i := range files {
|
||||
files[i].Name = norm.NFD.String(files[i].Name)
|
||||
}
|
||||
m.Model.Index(deviceID, folder, files)
|
||||
return m.Model.Index(deviceID, folder, files)
|
||||
}
|
||||
|
||||
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
for i := range files {
|
||||
files[i].Name = norm.NFD.String(files[i].Name)
|
||||
}
|
||||
m.Model.IndexUpdate(deviceID, folder, files)
|
||||
return m.Model.IndexUpdate(deviceID, folder, files)
|
||||
}
|
||||
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
|
||||
|
||||
@@ -15,14 +15,14 @@ type nativeModel struct {
|
||||
Model
|
||||
}
|
||||
|
||||
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
files = fixupFiles(files)
|
||||
m.Model.Index(deviceID, folder, files)
|
||||
return m.Model.Index(deviceID, folder, files)
|
||||
}
|
||||
|
||||
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
|
||||
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error {
|
||||
files = fixupFiles(files)
|
||||
m.Model.IndexUpdate(deviceID, folder, files)
|
||||
return m.Model.IndexUpdate(deviceID, folder, files)
|
||||
}
|
||||
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) {
|
||||
|
||||
@@ -111,17 +111,17 @@ var (
|
||||
|
||||
type Model interface {
|
||||
// An index was received from the peer device
|
||||
Index(deviceID DeviceID, folder string, files []FileInfo)
|
||||
Index(deviceID DeviceID, folder string, files []FileInfo) error
|
||||
// An index update was received from the peer device
|
||||
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo)
|
||||
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error
|
||||
// A request was made by the peer device
|
||||
Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error)
|
||||
// A cluster configuration message was received
|
||||
ClusterConfig(deviceID DeviceID, config ClusterConfig)
|
||||
ClusterConfig(deviceID DeviceID, config ClusterConfig) error
|
||||
// The peer device closed the connection
|
||||
Closed(conn Connection, err error)
|
||||
// The peer device sent progress updates for the files it is currently downloading
|
||||
DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate)
|
||||
DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) error
|
||||
}
|
||||
|
||||
type RequestResponse interface {
|
||||
@@ -388,7 +388,9 @@ func (c *rawConnection) dispatcherLoop() (err error) {
|
||||
if state != stateInitial {
|
||||
return fmt.Errorf("protocol error: cluster config message in state %d", state)
|
||||
}
|
||||
c.receiver.ClusterConfig(c.id, *msg)
|
||||
if err := c.receiver.ClusterConfig(c.id, *msg); err != nil {
|
||||
return errors.Wrap(err, "receiver error")
|
||||
}
|
||||
state = stateReady
|
||||
|
||||
case *Index:
|
||||
@@ -399,7 +401,9 @@ func (c *rawConnection) dispatcherLoop() (err error) {
|
||||
if err := checkIndexConsistency(msg.Files); err != nil {
|
||||
return errors.Wrap(err, "protocol error: index")
|
||||
}
|
||||
c.handleIndex(*msg)
|
||||
if err := c.handleIndex(*msg); err != nil {
|
||||
return errors.Wrap(err, "receiver error")
|
||||
}
|
||||
state = stateReady
|
||||
|
||||
case *IndexUpdate:
|
||||
@@ -410,7 +414,9 @@ func (c *rawConnection) dispatcherLoop() (err error) {
|
||||
if err := checkIndexConsistency(msg.Files); err != nil {
|
||||
return errors.Wrap(err, "protocol error: index update")
|
||||
}
|
||||
c.handleIndexUpdate(*msg)
|
||||
if err := c.handleIndexUpdate(*msg); err != nil {
|
||||
return errors.Wrap(err, "receiver error")
|
||||
}
|
||||
state = stateReady
|
||||
|
||||
case *Request:
|
||||
@@ -435,7 +441,9 @@ func (c *rawConnection) dispatcherLoop() (err error) {
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: response message in state %d", state)
|
||||
}
|
||||
c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates)
|
||||
if err := c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates); err != nil {
|
||||
return errors.Wrap(err, "receiver error")
|
||||
}
|
||||
|
||||
case *Ping:
|
||||
l.Debugln("read Ping message")
|
||||
@@ -543,14 +551,14 @@ func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) {
|
||||
return hdr, nil
|
||||
}
|
||||
|
||||
func (c *rawConnection) handleIndex(im Index) {
|
||||
func (c *rawConnection) handleIndex(im Index) error {
|
||||
l.Debugf("Index(%v, %v, %d file)", c.id, im.Folder, len(im.Files))
|
||||
c.receiver.Index(c.id, im.Folder, im.Files)
|
||||
return c.receiver.Index(c.id, im.Folder, im.Files)
|
||||
}
|
||||
|
||||
func (c *rawConnection) handleIndexUpdate(im IndexUpdate) {
|
||||
func (c *rawConnection) handleIndexUpdate(im IndexUpdate) error {
|
||||
l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
|
||||
c.receiver.IndexUpdate(c.id, im.Folder, im.Files)
|
||||
return c.receiver.IndexUpdate(c.id, im.Folder, im.Files)
|
||||
}
|
||||
|
||||
// checkIndexConsistency verifies a number of invariants on FileInfos received in
|
||||
|
||||
Reference in New Issue
Block a user