From b677464dfa72095ee6b577e32ab00140b8de2cae Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Fri, 16 Aug 2019 09:35:19 +0200 Subject: [PATCH] lib/model: Optimise locking around conn-close and puller states (#5954) --- lib/model/folder_sendrecv_test.go | 25 ++++++++--- lib/model/model.go | 6 ++- lib/model/sharedpullerstate.go | 75 +++++++++++++++++-------------- 3 files changed, 64 insertions(+), 42 deletions(-) diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 2dbda2b4..6deb8712 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -254,6 +254,7 @@ func TestCopierFinder(t *testing.T) { pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan} finish := <-finisherChan + defer cleanupSharedPullerState(finish) select { case <-pullChan: @@ -293,7 +294,6 @@ func TestCopierFinder(t *testing.T) { t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String()) } } - finish.fd.Close() } func TestWeakHash(t *testing.T) { @@ -389,7 +389,7 @@ func TestWeakHash(t *testing.T) { default: } - finish.fd.Close() + cleanupSharedPullerState(finish) if err := ffs.Remove(tempFile); err != nil { t.Fatal(err) } @@ -409,7 +409,7 @@ func TestWeakHash(t *testing.T) { } finish = <-finisherChan - finish.fd.Close() + cleanupSharedPullerState(finish) expectShifted := expectBlocks - expectPulls if finish.copyOriginShifted != expectShifted { @@ -516,9 +516,9 @@ func TestDeregisterOnFailInCopy(t *testing.T) { t.Log("event took", time.Since(t0)) state.mut.Lock() - stateFd := state.fd + stateWriter := state.writer state.mut.Unlock() - if stateFd != nil { + if stateWriter != nil { t.Fatal("File not closed?") } @@ -594,9 +594,9 @@ func TestDeregisterOnFailInPull(t *testing.T) { t.Log("event took", time.Since(t0)) state.mut.Lock() - stateFd := state.fd + stateWriter := state.writer state.mut.Unlock() - if stateFd != nil { + if stateWriter != nil { t.Fatal("File not closed?") } @@ -906,3 +906,14 @@ func TestSRConflictReplaceFileByLink(t *testing.T) { t.Fatal("Expected request to scan", confls[0], "got", scan) } } + +func cleanupSharedPullerState(s *sharedPullerState) { + s.mut.Lock() + defer s.mut.Unlock() + if s.writer == nil { + return + } + s.writer.mut.Lock() + s.writer.fd.Close() + s.writer.mut.Unlock() +} diff --git a/lib/model/model.go b/lib/model/model.go index b63a13e1..9b929f6f 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1420,12 +1420,11 @@ func (m *model) Closed(conn protocol.Connection, err error) { device := conn.ID() m.pmut.Lock() - defer m.pmut.Unlock() conn, ok := m.conn[device] if !ok { + m.pmut.Unlock() return } - m.progressEmitter.temporaryIndexUnsubscribe(conn) delete(m.conn, device) delete(m.connRequestLimiters, device) delete(m.helloMessages, device) @@ -1433,6 +1432,9 @@ func (m *model) Closed(conn protocol.Connection, err error) { delete(m.remotePausedFolders, device) closed := m.closed[device] delete(m.closed, device) + m.pmut.Unlock() + + m.progressEmitter.temporaryIndexUnsubscribe(conn) l.Infof("Connection to %s at %s closed: %v", device, conn.Name(), err) m.evLogger.Log(events.DeviceDisconnected, map[string]string{ diff --git a/lib/model/sharedpullerstate.go b/lib/model/sharedpullerstate.go index 2ef491ad..cb2313dd 100644 --- a/lib/model/sharedpullerstate.go +++ b/lib/model/sharedpullerstate.go @@ -34,19 +34,19 @@ type sharedPullerState struct { created time.Time // Mutable, must be locked for access - err error // The first error we hit - fd fs.File // The fd of the temp file - copyTotal int // Total number of copy actions for the whole job - pullTotal int // Total number of pull actions for the whole job - copyOrigin int // Number of blocks copied from the original file - copyOriginShifted int // Number of blocks copied from the original file but shifted - copyNeeded int // Number of copy actions still pending - pullNeeded int // Number of block pulls still pending - updated time.Time // Time when any of the counters above were last updated - closed bool // True if the file has been finalClosed. - available []int32 // Indexes of the blocks that are available in the temporary file - availableUpdated time.Time // Time when list of available blocks was last updated - mut sync.RWMutex // Protects the above + err error // The first error we hit + writer *lockedWriterAt // Wraps fd to prevent fd closing at the same time as writing + copyTotal int // Total number of copy actions for the whole job + pullTotal int // Total number of pull actions for the whole job + copyOrigin int // Number of blocks copied from the original file + copyOriginShifted int // Number of blocks copied from the original file but shifted + copyNeeded int // Number of copy actions still pending + pullNeeded int // Number of block pulls still pending + updated time.Time // Time when any of the counters above were last updated + closed bool // True if the file has been finalClosed. + available []int32 // Indexes of the blocks that are available in the temporary file + availableUpdated time.Time // Time when list of available blocks was last updated + mut sync.RWMutex // Protects the above } // A momentary state representing the progress of the puller @@ -62,17 +62,32 @@ type pullerProgress struct { BytesTotal int64 `json:"bytesTotal"` } -// A lockedWriterAt synchronizes WriteAt calls with an external mutex. +// lockedWriterAt adds a lock to protect from closing the fd at the same time as writing. // WriteAt() is goroutine safe by itself, but not against for example Close(). type lockedWriterAt struct { - mut *sync.RWMutex - wr io.WriterAt + mut sync.RWMutex + fd fs.File } -func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) { - (*w.mut).Lock() - defer (*w.mut).Unlock() - return w.wr.WriteAt(p, off) +// WriteAt itself is goroutine safe, thus just needs to acquire a read-lock to +// prevent closing concurrently (see SyncClose). +func (w *lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) { + w.mut.RLock() + defer w.mut.RUnlock() + return w.fd.WriteAt(p, off) +} + +// SyncClose ensures that no more writes are happening before going ahead and +// syncing and closing the fd, thus needs to acquire a write-lock. +func (w *lockedWriterAt) SyncClose() error { + w.mut.Lock() + defer w.mut.Unlock() + if err := w.fd.Sync(); err != nil { + // Sync() is nice if it works but not worth failing the + // operation over if it fails. + l.Debugf("fsync failed: %v", err) + } + return w.fd.Close() } // tempFile returns the fd for the temporary file, reusing an open fd @@ -87,8 +102,8 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) { } // If the temp file is already open, return the file descriptor - if s.fd != nil { - return lockedWriterAt{&s.mut, s.fd}, nil + if s.writer != nil { + return s.writer, nil } if err := inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms); err != nil { @@ -96,7 +111,7 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) { return nil, err } - return lockedWriterAt{&s.mut, s.fd}, nil + return s.writer, nil } // tempFileInWritableDir should only be called from tempFile. @@ -171,7 +186,7 @@ func (s *sharedPullerState) tempFileInWritableDir(_ string) error { } // Same fd will be used by all writers - s.fd = fd + s.writer = &lockedWriterAt{sync.NewRWMutex(), fd} return nil } @@ -265,18 +280,12 @@ func (s *sharedPullerState) finalClose() (bool, error) { return false, nil } - if s.fd != nil { - if err := s.fd.Sync(); err != nil { - // Sync() is nice if it works but not worth failing the - // operation over if it fails. - l.Debugf("fsync %q failed: %v", s.tempName, err) - } - - if err := s.fd.Close(); err != nil && s.err == nil { + if s.writer != nil { + if err := s.writer.SyncClose(); err != nil && s.err == nil { // This is our error as we weren't errored before. s.err = err } - s.fd = nil + s.writer = nil } s.closed = true