Add mutex logging
This commit is contained in:
@@ -7,9 +7,8 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/syncthing/protocol"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
// deviceActivity tracks the number of outstanding requests per device and can
|
||||
@@ -23,6 +22,7 @@ type deviceActivity struct {
|
||||
func newDeviceActivity() *deviceActivity {
|
||||
return &deviceActivity{
|
||||
act: make(map[protocol.DeviceID]int),
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,10 +7,10 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
type folderState int
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
stdsync "sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/protocol"
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/syncthing/syncthing/internal/scanner"
|
||||
"github.com/syncthing/syncthing/internal/stats"
|
||||
"github.com/syncthing/syncthing/internal/symlinks"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
"github.com/syncthing/syncthing/internal/versioner"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
@@ -85,7 +86,7 @@ type Model struct {
|
||||
}
|
||||
|
||||
var (
|
||||
SymlinkWarning = sync.Once{}
|
||||
SymlinkWarning = stdsync.Once{}
|
||||
)
|
||||
|
||||
// NewModel creates and starts a new model. The model starts in read-only mode,
|
||||
@@ -113,6 +114,9 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
|
||||
protoConn: make(map[protocol.DeviceID]protocol.Connection),
|
||||
rawConn: make(map[protocol.DeviceID]io.Closer),
|
||||
deviceVer: make(map[protocol.DeviceID]string),
|
||||
|
||||
fmut: sync.NewRWMutex(),
|
||||
pmut: sync.NewRWMutex(),
|
||||
}
|
||||
if cfg.Options().ProgressUpdateIntervalS > -1 {
|
||||
go m.progressEmitter.Serve()
|
||||
@@ -125,8 +129,8 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
|
||||
// the locks cannot be acquired in the given timeout period.
|
||||
func (m *Model) StartDeadlockDetector(timeout time.Duration) {
|
||||
l.Infof("Starting deadlock detector with %v timeout", timeout)
|
||||
deadlockDetect(&m.fmut, timeout)
|
||||
deadlockDetect(&m.pmut, timeout)
|
||||
deadlockDetect(m.fmut, timeout)
|
||||
deadlockDetect(m.pmut, timeout)
|
||||
}
|
||||
|
||||
// StartRW starts read/write processing on the current model. When in
|
||||
@@ -1099,9 +1103,9 @@ func (m *Model) ScanFolders() map[string]error {
|
||||
m.fmut.RUnlock()
|
||||
|
||||
errors := make(map[string]error, len(m.folderCfgs))
|
||||
var errorsMut sync.Mutex
|
||||
errorsMut := sync.NewMutex()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg := sync.NewWaitGroup()
|
||||
wg.Add(len(folders))
|
||||
for _, folder := range folders {
|
||||
folder := folder
|
||||
|
||||
@@ -9,11 +9,11 @@ package model
|
||||
import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/config"
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
type ProgressEmitter struct {
|
||||
@@ -35,6 +35,7 @@ func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
|
||||
registry: make(map[string]*sharedPullerState),
|
||||
last: make(map[string]map[string]*pullerProgress),
|
||||
timer: time.NewTimer(time.Millisecond),
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
t.Changed(cfg.Raw())
|
||||
cfg.Subscribe(t)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/syncthing/syncthing/internal/config"
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
var timeout = 10 * time.Millisecond
|
||||
@@ -50,7 +51,9 @@ func TestProgressEmitter(t *testing.T) {
|
||||
|
||||
expectTimeout(w, t)
|
||||
|
||||
s := sharedPullerState{}
|
||||
s := sharedPullerState{
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
p.Register(&s)
|
||||
|
||||
expectEvent(w, t, 1)
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
package model
|
||||
|
||||
import "sync"
|
||||
import "github.com/syncthing/syncthing/internal/sync"
|
||||
|
||||
type jobQueue struct {
|
||||
progress []string
|
||||
@@ -15,7 +15,9 @@ type jobQueue struct {
|
||||
}
|
||||
|
||||
func newJobQueue() *jobQueue {
|
||||
return &jobQueue{}
|
||||
return &jobQueue{
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *jobQueue) Push(file string) {
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
type roFolder struct {
|
||||
@@ -23,11 +25,14 @@ type roFolder struct {
|
||||
|
||||
func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
|
||||
return &roFolder{
|
||||
stateTracker: stateTracker{folder: folder},
|
||||
folder: folder,
|
||||
intv: interval,
|
||||
model: model,
|
||||
stop: make(chan struct{}),
|
||||
stateTracker: stateTracker{
|
||||
folder: folder,
|
||||
mut: sync.NewMutex(),
|
||||
},
|
||||
folder: folder,
|
||||
intv: interval,
|
||||
model: model,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/protocol"
|
||||
@@ -24,6 +23,7 @@ import (
|
||||
"github.com/syncthing/syncthing/internal/osutil"
|
||||
"github.com/syncthing/syncthing/internal/scanner"
|
||||
"github.com/syncthing/syncthing/internal/symlinks"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
"github.com/syncthing/syncthing/internal/versioner"
|
||||
)
|
||||
|
||||
@@ -77,7 +77,10 @@ type rwFolder struct {
|
||||
|
||||
func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
|
||||
return &rwFolder{
|
||||
stateTracker: stateTracker{folder: cfg.ID},
|
||||
stateTracker: stateTracker{
|
||||
folder: cfg.ID,
|
||||
mut: sync.NewMutex(),
|
||||
},
|
||||
|
||||
model: m,
|
||||
progressEmitter: m.progressEmitter,
|
||||
@@ -279,10 +282,10 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
||||
copyChan := make(chan copyBlocksState)
|
||||
finisherChan := make(chan *sharedPullerState)
|
||||
|
||||
var updateWg sync.WaitGroup
|
||||
var copyWg sync.WaitGroup
|
||||
var pullWg sync.WaitGroup
|
||||
var doneWg sync.WaitGroup
|
||||
updateWg := sync.NewWaitGroup()
|
||||
copyWg := sync.NewWaitGroup()
|
||||
pullWg := sync.NewWaitGroup()
|
||||
doneWg := sync.NewWaitGroup()
|
||||
|
||||
if debug {
|
||||
l.Debugln(p, "c", p.copiers, "p", p.pullers)
|
||||
@@ -799,6 +802,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
|
||||
reused: reused,
|
||||
ignorePerms: p.ignorePerms,
|
||||
version: curFile.Version,
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
|
||||
if debug {
|
||||
|
||||
@@ -10,10 +10,10 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/syncthing/protocol"
|
||||
"github.com/syncthing/syncthing/internal/db"
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
// A sharedPullerState is kept for each file that is being synced and is kept
|
||||
@@ -59,8 +59,8 @@ type lockedWriterAt struct {
|
||||
}
|
||||
|
||||
func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
|
||||
w.mut.Lock()
|
||||
defer w.mut.Unlock()
|
||||
(*w.mut).Lock()
|
||||
defer (*w.mut).Unlock()
|
||||
return w.wr.WriteAt(p, off)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,11 +9,14 @@ package model
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/sync"
|
||||
)
|
||||
|
||||
func TestSourceFileOK(t *testing.T) {
|
||||
s := sharedPullerState{
|
||||
realName: "testdata/foo",
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
|
||||
fd, err := s.sourceFile()
|
||||
@@ -42,6 +45,7 @@ func TestSourceFileOK(t *testing.T) {
|
||||
func TestSourceFileBad(t *testing.T) {
|
||||
s := sharedPullerState{
|
||||
realName: "nonexistent",
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
|
||||
fd, err := s.sourceFile()
|
||||
@@ -67,6 +71,7 @@ func TestReadOnlyDir(t *testing.T) {
|
||||
|
||||
s := sharedPullerState{
|
||||
tempName: "testdata/read_only_dir/.temp_name",
|
||||
mut: sync.NewMutex(),
|
||||
}
|
||||
|
||||
fd, err := s.tempFile()
|
||||
|
||||
Reference in New Issue
Block a user