lib/model: Optimise locking around conn-close and puller states (#5954)

This commit is contained in:
Simon Frei 2019-08-16 09:35:19 +02:00 committed by Jakob Borg
parent b1c74860e8
commit b677464dfa
3 changed files with 64 additions and 42 deletions

View File

@ -254,6 +254,7 @@ func TestCopierFinder(t *testing.T) {
pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan} pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan}
finish := <-finisherChan finish := <-finisherChan
defer cleanupSharedPullerState(finish)
select { select {
case <-pullChan: case <-pullChan:
@ -293,7 +294,6 @@ func TestCopierFinder(t *testing.T) {
t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String()) t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String())
} }
} }
finish.fd.Close()
} }
func TestWeakHash(t *testing.T) { func TestWeakHash(t *testing.T) {
@ -389,7 +389,7 @@ func TestWeakHash(t *testing.T) {
default: default:
} }
finish.fd.Close() cleanupSharedPullerState(finish)
if err := ffs.Remove(tempFile); err != nil { if err := ffs.Remove(tempFile); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -409,7 +409,7 @@ func TestWeakHash(t *testing.T) {
} }
finish = <-finisherChan finish = <-finisherChan
finish.fd.Close() cleanupSharedPullerState(finish)
expectShifted := expectBlocks - expectPulls expectShifted := expectBlocks - expectPulls
if finish.copyOriginShifted != expectShifted { if finish.copyOriginShifted != expectShifted {
@ -516,9 +516,9 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
t.Log("event took", time.Since(t0)) t.Log("event took", time.Since(t0))
state.mut.Lock() state.mut.Lock()
stateFd := state.fd stateWriter := state.writer
state.mut.Unlock() state.mut.Unlock()
if stateFd != nil { if stateWriter != nil {
t.Fatal("File not closed?") t.Fatal("File not closed?")
} }
@ -594,9 +594,9 @@ func TestDeregisterOnFailInPull(t *testing.T) {
t.Log("event took", time.Since(t0)) t.Log("event took", time.Since(t0))
state.mut.Lock() state.mut.Lock()
stateFd := state.fd stateWriter := state.writer
state.mut.Unlock() state.mut.Unlock()
if stateFd != nil { if stateWriter != nil {
t.Fatal("File not closed?") t.Fatal("File not closed?")
} }
@ -906,3 +906,14 @@ func TestSRConflictReplaceFileByLink(t *testing.T) {
t.Fatal("Expected request to scan", confls[0], "got", scan) t.Fatal("Expected request to scan", confls[0], "got", scan)
} }
} }
func cleanupSharedPullerState(s *sharedPullerState) {
s.mut.Lock()
defer s.mut.Unlock()
if s.writer == nil {
return
}
s.writer.mut.Lock()
s.writer.fd.Close()
s.writer.mut.Unlock()
}

View File

@ -1420,12 +1420,11 @@ func (m *model) Closed(conn protocol.Connection, err error) {
device := conn.ID() device := conn.ID()
m.pmut.Lock() m.pmut.Lock()
defer m.pmut.Unlock()
conn, ok := m.conn[device] conn, ok := m.conn[device]
if !ok { if !ok {
m.pmut.Unlock()
return return
} }
m.progressEmitter.temporaryIndexUnsubscribe(conn)
delete(m.conn, device) delete(m.conn, device)
delete(m.connRequestLimiters, device) delete(m.connRequestLimiters, device)
delete(m.helloMessages, device) delete(m.helloMessages, device)
@ -1433,6 +1432,9 @@ func (m *model) Closed(conn protocol.Connection, err error) {
delete(m.remotePausedFolders, device) delete(m.remotePausedFolders, device)
closed := m.closed[device] closed := m.closed[device]
delete(m.closed, device) delete(m.closed, device)
m.pmut.Unlock()
m.progressEmitter.temporaryIndexUnsubscribe(conn)
l.Infof("Connection to %s at %s closed: %v", device, conn.Name(), err) l.Infof("Connection to %s at %s closed: %v", device, conn.Name(), err)
m.evLogger.Log(events.DeviceDisconnected, map[string]string{ m.evLogger.Log(events.DeviceDisconnected, map[string]string{

View File

@ -35,7 +35,7 @@ type sharedPullerState struct {
// Mutable, must be locked for access // Mutable, must be locked for access
err error // The first error we hit err error // The first error we hit
fd fs.File // The fd of the temp file writer *lockedWriterAt // Wraps fd to prevent fd closing at the same time as writing
copyTotal int // Total number of copy actions for the whole job copyTotal int // Total number of copy actions for the whole job
pullTotal int // Total number of pull actions for the whole job pullTotal int // Total number of pull actions for the whole job
copyOrigin int // Number of blocks copied from the original file copyOrigin int // Number of blocks copied from the original file
@ -62,17 +62,32 @@ type pullerProgress struct {
BytesTotal int64 `json:"bytesTotal"` BytesTotal int64 `json:"bytesTotal"`
} }
// A lockedWriterAt synchronizes WriteAt calls with an external mutex. // lockedWriterAt adds a lock to protect from closing the fd at the same time as writing.
// WriteAt() is goroutine safe by itself, but not against for example Close(). // WriteAt() is goroutine safe by itself, but not against for example Close().
type lockedWriterAt struct { type lockedWriterAt struct {
mut *sync.RWMutex mut sync.RWMutex
wr io.WriterAt fd fs.File
} }
func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) { // WriteAt itself is goroutine safe, thus just needs to acquire a read-lock to
(*w.mut).Lock() // prevent closing concurrently (see SyncClose).
defer (*w.mut).Unlock() func (w *lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
return w.wr.WriteAt(p, off) w.mut.RLock()
defer w.mut.RUnlock()
return w.fd.WriteAt(p, off)
}
// SyncClose ensures that no more writes are happening before going ahead and
// syncing and closing the fd, thus needs to acquire a write-lock.
func (w *lockedWriterAt) SyncClose() error {
w.mut.Lock()
defer w.mut.Unlock()
if err := w.fd.Sync(); err != nil {
// Sync() is nice if it works but not worth failing the
// operation over if it fails.
l.Debugf("fsync failed: %v", err)
}
return w.fd.Close()
} }
// tempFile returns the fd for the temporary file, reusing an open fd // tempFile returns the fd for the temporary file, reusing an open fd
@ -87,8 +102,8 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
} }
// If the temp file is already open, return the file descriptor // If the temp file is already open, return the file descriptor
if s.fd != nil { if s.writer != nil {
return lockedWriterAt{&s.mut, s.fd}, nil return s.writer, nil
} }
if err := inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms); err != nil { if err := inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms); err != nil {
@ -96,7 +111,7 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
return nil, err return nil, err
} }
return lockedWriterAt{&s.mut, s.fd}, nil return s.writer, nil
} }
// tempFileInWritableDir should only be called from tempFile. // tempFileInWritableDir should only be called from tempFile.
@ -171,7 +186,7 @@ func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
} }
// Same fd will be used by all writers // Same fd will be used by all writers
s.fd = fd s.writer = &lockedWriterAt{sync.NewRWMutex(), fd}
return nil return nil
} }
@ -265,18 +280,12 @@ func (s *sharedPullerState) finalClose() (bool, error) {
return false, nil return false, nil
} }
if s.fd != nil { if s.writer != nil {
if err := s.fd.Sync(); err != nil { if err := s.writer.SyncClose(); err != nil && s.err == nil {
// Sync() is nice if it works but not worth failing the
// operation over if it fails.
l.Debugf("fsync %q failed: %v", s.tempName, err)
}
if err := s.fd.Close(); err != nil && s.err == nil {
// This is our error as we weren't errored before. // This is our error as we weren't errored before.
s.err = err s.err = err
} }
s.fd = nil s.writer = nil
} }
s.closed = true s.closed = true