diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 49c20e4d..84cae94f 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -22,7 +22,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "github.com/calmh/logger" @@ -34,6 +33,7 @@ import ( "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/model" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" "github.com/syncthing/syncthing/internal/upgrade" "github.com/vitrun/qart/qr" "golang.org/x/crypto/bcrypt" @@ -45,16 +45,16 @@ type guiError struct { } var ( - configInSync = true - guiErrors = []guiError{} - guiErrorsMut sync.Mutex - startTime = time.Now() + configInSync = true + guiErrors = []guiError{} + guiErrorsMut sync.Mutex = sync.NewMutex() + startTime = time.Now() eventSub *events.BufferedSubscription ) var ( lastEventRequest time.Time - lastEventRequestMut sync.Mutex + lastEventRequestMut sync.Mutex = sync.NewMutex() ) func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) error { @@ -522,7 +522,7 @@ func flushResponse(s string, w http.ResponseWriter) { } var cpuUsagePercent [10]float64 // The last ten seconds -var cpuUsageLock sync.RWMutex +var cpuUsageLock sync.RWMutex = sync.NewRWMutex() func restGetSystemStatus(w http.ResponseWriter, r *http.Request) { var m runtime.MemStats diff --git a/cmd/syncthing/gui_auth.go b/cmd/syncthing/gui_auth.go index 1fe62b58..21ad06dc 100644 --- a/cmd/syncthing/gui_auth.go +++ b/cmd/syncthing/gui_auth.go @@ -12,16 +12,16 @@ import ( "math/rand" "net/http" "strings" - "sync" "time" "github.com/syncthing/syncthing/internal/config" + "github.com/syncthing/syncthing/internal/sync" "golang.org/x/crypto/bcrypt" ) var ( - sessions = make(map[string]bool) - sessionsMut sync.Mutex + sessions = make(map[string]bool) + sessionsMut sync.Mutex = sync.NewMutex() ) func basicAuthAndSessionMiddleware(cfg config.GUIConfiguration, next http.Handler) http.Handler { diff --git a/cmd/syncthing/gui_csrf.go b/cmd/syncthing/gui_csrf.go index f9116ec3..721adf30 100644 --- a/cmd/syncthing/gui_csrf.go +++ b/cmd/syncthing/gui_csrf.go @@ -12,14 +12,14 @@ import ( "net/http" "os" "strings" - "sync" "time" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" ) var csrfTokens []string -var csrfMut sync.Mutex +var csrfMut sync.Mutex = sync.NewMutex() // Check for CSRF token on /rest/ URLs. If a correct one is not given, reject // the request with 403. For / and /index.html, set a new CSRF cookie if none diff --git a/cmd/syncthing/monitor.go b/cmd/syncthing/monitor.go index ced8b0ad..df1e5137 100644 --- a/cmd/syncthing/monitor.go +++ b/cmd/syncthing/monitor.go @@ -14,17 +14,17 @@ import ( "os/signal" "runtime" "strings" - "sync" "syscall" "time" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" ) var ( - stdoutFirstLines []string // The first 10 lines of stdout - stdoutLastLines []string // The last 50 lines of stdout - stdoutMut sync.Mutex + stdoutFirstLines []string // The first 10 lines of stdout + stdoutLastLines []string // The last 50 lines of stdout + stdoutMut sync.Mutex = sync.NewMutex() ) const ( diff --git a/cmd/syncthing/summarysvc.go b/cmd/syncthing/summarysvc.go index 44ab7fc3..983cf445 100644 --- a/cmd/syncthing/summarysvc.go +++ b/cmd/syncthing/summarysvc.go @@ -7,11 +7,11 @@ package main import ( - "sync" "time" "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/model" + "github.com/syncthing/syncthing/internal/sync" "github.com/thejerf/suture" ) @@ -37,6 +37,7 @@ func (c *folderSummarySvc) Serve() { c.stop = make(chan struct{}) c.folders = make(map[string]struct{}) c.srv = srv + c.foldersMut = sync.NewMutex() srv.Serve() } diff --git a/internal/config/wrapper.go b/internal/config/wrapper.go index bb6e8e15..aa7d96aa 100644 --- a/internal/config/wrapper.go +++ b/internal/config/wrapper.go @@ -10,11 +10,11 @@ import ( "io/ioutil" "os" "path/filepath" - "sync" "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" ) // An interface to handle configuration changes, and a wrapper type รก la @@ -49,7 +49,12 @@ type Wrapper struct { // Wrap wraps an existing Configuration structure and ties it to a file on // disk. func Wrap(path string, cfg Configuration) *Wrapper { - w := &Wrapper{cfg: cfg, path: path} + w := &Wrapper{ + cfg: cfg, + path: path, + mut: sync.NewMutex(), + sMut: sync.NewMutex(), + } w.replaces = make(chan Configuration) go w.Serve() return w diff --git a/internal/db/blockmap.go b/internal/db/blockmap.go index c7f41bd5..1be8c1bb 100644 --- a/internal/db/blockmap.go +++ b/internal/db/blockmap.go @@ -17,11 +17,11 @@ import ( "bytes" "encoding/binary" "sort" - "sync" "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -123,7 +123,8 @@ func NewBlockFinder(db *leveldb.DB, cfg *config.Wrapper) *BlockFinder { } f := &BlockFinder{ - db: db, + db: db, + mut: sync.NewRWMutex(), } f.Changed(cfg.Raw()) cfg.Subscribe(f) diff --git a/internal/db/concurrency_test.go b/internal/db/concurrency_test.go index 8a392555..00e2e610 100644 --- a/internal/db/concurrency_test.go +++ b/internal/db/concurrency_test.go @@ -10,10 +10,11 @@ import ( "crypto/rand" "log" "os" - "sync" "testing" "time" + "github.com/syncthing/syncthing/internal/sync" + "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" @@ -132,7 +133,7 @@ func TestConcurrentSetClear(t *testing.T) { dur := 30 * time.Second t0 := time.Now() - var wg sync.WaitGroup + wg := sync.NewWaitGroup() os.RemoveAll("testdata/concurrent-set-clear.db") db, err := leveldb.OpenFile("testdata/concurrent-set-clear.db", &opt.Options{OpenFilesCacheCapacity: 10}) @@ -188,7 +189,7 @@ func TestConcurrentSetOnly(t *testing.T) { dur := 30 * time.Second t0 := time.Now() - var wg sync.WaitGroup + wg := sync.NewWaitGroup() os.RemoveAll("testdata/concurrent-set-only.db") db, err := leveldb.OpenFile("testdata/concurrent-set-only.db", &opt.Options{OpenFilesCacheCapacity: 10}) diff --git a/internal/db/leveldb.go b/internal/db/leveldb.go index cb5280bd..241899cd 100644 --- a/internal/db/leveldb.go +++ b/internal/db/leveldb.go @@ -14,9 +14,9 @@ import ( "fmt" "runtime" "sort" - "sync" "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/internal/sync" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -25,7 +25,7 @@ import ( var ( clockTick int64 - clockMut sync.Mutex + clockMut sync.Mutex = sync.NewMutex() ) func clock(v int64) int64 { diff --git a/internal/db/set.go b/internal/db/set.go index b4629ba5..ce247ac9 100644 --- a/internal/db/set.go +++ b/internal/db/set.go @@ -13,10 +13,9 @@ package db import ( - "sync" - "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" "github.com/syndtr/goleveldb/leveldb" ) @@ -50,6 +49,7 @@ func NewFileSet(folder string, db *leveldb.DB) *FileSet { folder: folder, db: db, blockmap: NewBlockMap(db, folder), + mutex: sync.NewMutex(), } ldbCheckGlobals(db, []byte(folder)) diff --git a/internal/discover/client_test.go b/internal/discover/client_test.go index 47856c87..5b55c175 100644 --- a/internal/discover/client_test.go +++ b/internal/discover/client_test.go @@ -9,12 +9,13 @@ package discover import ( "fmt" "net" - "sync" "time" "testing" "github.com/syncthing/protocol" + + "github.com/syncthing/syncthing/internal/sync" ) var device protocol.DeviceID @@ -97,7 +98,7 @@ func TestUDP4Success(t *testing.T) { // Do a lookup in a separate routine addrs := []string{} - wg := sync.WaitGroup{} + wg := sync.NewWaitGroup() wg.Add(1) go func() { addrs = client.Lookup(device) @@ -193,7 +194,7 @@ func TestUDP4Failure(t *testing.T) { // Do a lookup in a separate routine addrs := []string{} - wg := sync.WaitGroup{} + wg := sync.NewWaitGroup() wg.Add(1) go func() { addrs = client.Lookup(device) diff --git a/internal/discover/client_udp.go b/internal/discover/client_udp.go index e73d6ab0..a76bfd3b 100644 --- a/internal/discover/client_udp.go +++ b/internal/discover/client_udp.go @@ -12,16 +12,19 @@ import ( "net" "net/url" "strconv" - "sync" "time" "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/internal/sync" ) func init() { for _, proto := range []string{"udp", "udp4", "udp6"} { Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) { - c := &UDPClient{} + c := &UDPClient{ + wg: sync.NewWaitGroup(), + mut: sync.NewRWMutex(), + } err := c.Start(uri, pkt) if err != nil { return nil, err diff --git a/internal/discover/discover.go b/internal/discover/discover.go index 9c7e7b98..b05acafa 100644 --- a/internal/discover/discover.go +++ b/internal/discover/discover.go @@ -13,12 +13,12 @@ import ( "io" "net" "strconv" - "sync" "time" "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/beacon" "github.com/syncthing/syncthing/internal/events" + "github.com/syncthing/syncthing/internal/sync" ) type Discoverer struct { @@ -59,6 +59,8 @@ func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer { negCacheCutoff: 3 * time.Minute, registry: make(map[protocol.DeviceID][]CacheEntry), lastLookup: make(map[protocol.DeviceID]time.Time), + registryLock: sync.NewRWMutex(), + mut: sync.NewRWMutex(), } } @@ -140,7 +142,7 @@ func (d *Discoverer) StartGlobal(servers []string, extPort uint16) { d.extPort = extPort pkt := d.announcementPkt() - wg := sync.WaitGroup{} + wg := sync.NewWaitGroup() clients := make(chan Client, len(servers)) for _, address := range servers { wg.Add(1) @@ -216,7 +218,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string { // server client and one local announcement interval has passed. This is // to avoid finding local peers on their remote address at startup. results := make(chan []string, len(d.clients)) - wg := sync.WaitGroup{} + wg := sync.NewWaitGroup() for _, client := range d.clients { wg.Add(1) go func(c Client) { diff --git a/internal/events/events.go b/internal/events/events.go index 3f22c4f4..ca49ec87 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -9,8 +9,10 @@ package events import ( "errors" - "sync" + stdsync "sync" "time" + + "github.com/syncthing/syncthing/internal/sync" ) type EventType int @@ -113,7 +115,8 @@ var ( func NewLogger() *Logger { return &Logger{ - subs: make(map[int]*Subscription), + subs: make(map[int]*Subscription), + mutex: sync.NewMutex(), } } @@ -150,6 +153,7 @@ func (l *Logger) Subscribe(mask EventType) *Subscription { mask: mask, id: l.nextID, events: make(chan Event, BufferSize), + mutex: sync.NewMutex(), } l.nextID++ l.subs[s.id] = s @@ -197,15 +201,16 @@ type BufferedSubscription struct { next int cur int mut sync.Mutex - cond *sync.Cond + cond *stdsync.Cond } func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription { bs := &BufferedSubscription{ sub: s, buf: make([]Event, size), + mut: sync.NewMutex(), } - bs.cond = sync.NewCond(&bs.mut) + bs.cond = stdsync.NewCond(bs.mut) go bs.pollingLoop() return bs } diff --git a/internal/ignore/ignore.go b/internal/ignore/ignore.go index 866d0a74..ee50048f 100644 --- a/internal/ignore/ignore.go +++ b/internal/ignore/ignore.go @@ -16,10 +16,10 @@ import ( "path/filepath" "regexp" "strings" - "sync" "time" "github.com/syncthing/syncthing/internal/fnmatch" + "github.com/syncthing/syncthing/internal/sync" ) type Pattern struct { @@ -48,6 +48,7 @@ func New(withCache bool) *Matcher { m := &Matcher{ withCache: withCache, stop: make(chan struct{}), + mut: sync.NewMutex(), } if withCache { go m.clean(2 * time.Hour) diff --git a/internal/model/deviceactivity.go b/internal/model/deviceactivity.go index 5bb950f9..e69620be 100644 --- a/internal/model/deviceactivity.go +++ b/internal/model/deviceactivity.go @@ -7,9 +7,8 @@ package model import ( - "sync" - "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/internal/sync" ) // deviceActivity tracks the number of outstanding requests per device and can @@ -23,6 +22,7 @@ type deviceActivity struct { func newDeviceActivity() *deviceActivity { return &deviceActivity{ act: make(map[protocol.DeviceID]int), + mut: sync.NewMutex(), } } diff --git a/internal/model/folderstate.go b/internal/model/folderstate.go index 089326fc..e56274da 100644 --- a/internal/model/folderstate.go +++ b/internal/model/folderstate.go @@ -7,10 +7,10 @@ package model import ( - "sync" "time" "github.com/syncthing/syncthing/internal/events" + "github.com/syncthing/syncthing/internal/sync" ) type folderState int diff --git a/internal/model/model.go b/internal/model/model.go index c149c27e..6ba57f7b 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -18,7 +18,7 @@ import ( "os" "path/filepath" "strings" - "sync" + stdsync "sync" "time" "github.com/syncthing/protocol" @@ -30,6 +30,7 @@ import ( "github.com/syncthing/syncthing/internal/scanner" "github.com/syncthing/syncthing/internal/stats" "github.com/syncthing/syncthing/internal/symlinks" + "github.com/syncthing/syncthing/internal/sync" "github.com/syncthing/syncthing/internal/versioner" "github.com/syndtr/goleveldb/leveldb" ) @@ -85,7 +86,7 @@ type Model struct { } var ( - SymlinkWarning = sync.Once{} + SymlinkWarning = stdsync.Once{} ) // NewModel creates and starts a new model. The model starts in read-only mode, @@ -113,6 +114,9 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, protoConn: make(map[protocol.DeviceID]protocol.Connection), rawConn: make(map[protocol.DeviceID]io.Closer), deviceVer: make(map[protocol.DeviceID]string), + + fmut: sync.NewRWMutex(), + pmut: sync.NewRWMutex(), } if cfg.Options().ProgressUpdateIntervalS > -1 { go m.progressEmitter.Serve() @@ -125,8 +129,8 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, // the locks cannot be acquired in the given timeout period. func (m *Model) StartDeadlockDetector(timeout time.Duration) { l.Infof("Starting deadlock detector with %v timeout", timeout) - deadlockDetect(&m.fmut, timeout) - deadlockDetect(&m.pmut, timeout) + deadlockDetect(m.fmut, timeout) + deadlockDetect(m.pmut, timeout) } // StartRW starts read/write processing on the current model. When in @@ -1099,9 +1103,9 @@ func (m *Model) ScanFolders() map[string]error { m.fmut.RUnlock() errors := make(map[string]error, len(m.folderCfgs)) - var errorsMut sync.Mutex + errorsMut := sync.NewMutex() - var wg sync.WaitGroup + wg := sync.NewWaitGroup() wg.Add(len(folders)) for _, folder := range folders { folder := folder diff --git a/internal/model/progressemitter.go b/internal/model/progressemitter.go index 506ee974..ca2d374e 100755 --- a/internal/model/progressemitter.go +++ b/internal/model/progressemitter.go @@ -9,11 +9,11 @@ package model import ( "path/filepath" "reflect" - "sync" "time" "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/events" + "github.com/syncthing/syncthing/internal/sync" ) type ProgressEmitter struct { @@ -35,6 +35,7 @@ func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter { registry: make(map[string]*sharedPullerState), last: make(map[string]map[string]*pullerProgress), timer: time.NewTimer(time.Millisecond), + mut: sync.NewMutex(), } t.Changed(cfg.Raw()) cfg.Subscribe(t) diff --git a/internal/model/progressemitter_test.go b/internal/model/progressemitter_test.go index 947dea3e..8c073d40 100644 --- a/internal/model/progressemitter_test.go +++ b/internal/model/progressemitter_test.go @@ -12,6 +12,7 @@ import ( "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/events" + "github.com/syncthing/syncthing/internal/sync" ) var timeout = 10 * time.Millisecond @@ -50,7 +51,9 @@ func TestProgressEmitter(t *testing.T) { expectTimeout(w, t) - s := sharedPullerState{} + s := sharedPullerState{ + mut: sync.NewMutex(), + } p.Register(&s) expectEvent(w, t, 1) diff --git a/internal/model/queue.go b/internal/model/queue.go index b4b6f05c..2d382a62 100644 --- a/internal/model/queue.go +++ b/internal/model/queue.go @@ -6,7 +6,7 @@ package model -import "sync" +import "github.com/syncthing/syncthing/internal/sync" type jobQueue struct { progress []string @@ -15,7 +15,9 @@ type jobQueue struct { } func newJobQueue() *jobQueue { - return &jobQueue{} + return &jobQueue{ + mut: sync.NewMutex(), + } } func (q *jobQueue) Push(file string) { diff --git a/internal/model/rofolder.go b/internal/model/rofolder.go index 8d1ea777..ac867273 100644 --- a/internal/model/rofolder.go +++ b/internal/model/rofolder.go @@ -10,6 +10,8 @@ import ( "fmt" "math/rand" "time" + + "github.com/syncthing/syncthing/internal/sync" ) type roFolder struct { @@ -23,11 +25,14 @@ type roFolder struct { func newROFolder(model *Model, folder string, interval time.Duration) *roFolder { return &roFolder{ - stateTracker: stateTracker{folder: folder}, - folder: folder, - intv: interval, - model: model, - stop: make(chan struct{}), + stateTracker: stateTracker{ + folder: folder, + mut: sync.NewMutex(), + }, + folder: folder, + intv: interval, + model: model, + stop: make(chan struct{}), } } diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index 69df99b5..b63acecc 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -13,7 +13,6 @@ import ( "math/rand" "os" "path/filepath" - "sync" "time" "github.com/syncthing/protocol" @@ -24,6 +23,7 @@ import ( "github.com/syncthing/syncthing/internal/osutil" "github.com/syncthing/syncthing/internal/scanner" "github.com/syncthing/syncthing/internal/symlinks" + "github.com/syncthing/syncthing/internal/sync" "github.com/syncthing/syncthing/internal/versioner" ) @@ -77,7 +77,10 @@ type rwFolder struct { func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { return &rwFolder{ - stateTracker: stateTracker{folder: cfg.ID}, + stateTracker: stateTracker{ + folder: cfg.ID, + mut: sync.NewMutex(), + }, model: m, progressEmitter: m.progressEmitter, @@ -279,10 +282,10 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) - var updateWg sync.WaitGroup - var copyWg sync.WaitGroup - var pullWg sync.WaitGroup - var doneWg sync.WaitGroup + updateWg := sync.NewWaitGroup() + copyWg := sync.NewWaitGroup() + pullWg := sync.NewWaitGroup() + doneWg := sync.NewWaitGroup() if debug { l.Debugln(p, "c", p.copiers, "p", p.pullers) @@ -799,6 +802,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks reused: reused, ignorePerms: p.ignorePerms, version: curFile.Version, + mut: sync.NewMutex(), } if debug { diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go index caa48028..ba3dcd7b 100644 --- a/internal/model/sharedpullerstate.go +++ b/internal/model/sharedpullerstate.go @@ -10,10 +10,10 @@ import ( "io" "os" "path/filepath" - "sync" "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/db" + "github.com/syncthing/syncthing/internal/sync" ) // A sharedPullerState is kept for each file that is being synced and is kept @@ -59,8 +59,8 @@ type lockedWriterAt struct { } func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) { - w.mut.Lock() - defer w.mut.Unlock() + (*w.mut).Lock() + defer (*w.mut).Unlock() return w.wr.WriteAt(p, off) } diff --git a/internal/model/sharedpullerstate_test.go b/internal/model/sharedpullerstate_test.go index a385f999..7b6d90aa 100644 --- a/internal/model/sharedpullerstate_test.go +++ b/internal/model/sharedpullerstate_test.go @@ -9,11 +9,14 @@ package model import ( "os" "testing" + + "github.com/syncthing/syncthing/internal/sync" ) func TestSourceFileOK(t *testing.T) { s := sharedPullerState{ realName: "testdata/foo", + mut: sync.NewMutex(), } fd, err := s.sourceFile() @@ -42,6 +45,7 @@ func TestSourceFileOK(t *testing.T) { func TestSourceFileBad(t *testing.T) { s := sharedPullerState{ realName: "nonexistent", + mut: sync.NewMutex(), } fd, err := s.sourceFile() @@ -67,6 +71,7 @@ func TestReadOnlyDir(t *testing.T) { s := sharedPullerState{ tempName: "testdata/read_only_dir/.temp_name", + mut: sync.NewMutex(), } fd, err := s.tempFile() diff --git a/internal/osutil/osutil.go b/internal/osutil/osutil.go index 6639ba53..fb313440 100644 --- a/internal/osutil/osutil.go +++ b/internal/osutil/osutil.go @@ -15,14 +15,15 @@ import ( "path/filepath" "runtime" "strings" - "sync" + + "github.com/syncthing/syncthing/internal/sync" ) var ErrNoHome = errors.New("No home directory found - set $HOME (or the platform equivalent).") // Try to keep this entire operation atomic-like. We shouldn't be doing this // often enough that there is any contention on this lock. -var renameLock sync.Mutex +var renameLock sync.Mutex = sync.NewMutex() // TryRename renames a file, leaving source file intact in case of failure. // Tries hard to succeed on various systems by temporarily tweaking directory diff --git a/internal/scanner/blockqueue.go b/internal/scanner/blockqueue.go index 4a7d6d46..dc08a677 100644 --- a/internal/scanner/blockqueue.go +++ b/internal/scanner/blockqueue.go @@ -9,9 +9,9 @@ package scanner import ( "os" "path/filepath" - "sync" "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/internal/sync" ) // The parallell hasher reads FileInfo structures from the inbox, hashes the @@ -20,7 +20,7 @@ import ( // is closed and all items handled. func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo) { - var wg sync.WaitGroup + wg := sync.NewWaitGroup() wg.Add(workers) for i := 0; i < workers; i++ { diff --git a/internal/sync/debug.go b/internal/sync/debug.go new file mode 100644 index 00000000..4c3c84e2 --- /dev/null +++ b/internal/sync/debug.go @@ -0,0 +1,31 @@ +// Copyright (C) 2015 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package sync + +import ( + "os" + "strconv" + "strings" + "time" + + "github.com/calmh/logger" +) + +var ( + debug = strings.Contains(os.Getenv("STTRACE"), "locks") || os.Getenv("STTRACE") == "all" + threshold = time.Duration(100 * time.Millisecond) + l = logger.DefaultLogger +) + +func init() { + if n, err := strconv.Atoi(os.Getenv("STLOCKTHRESHOLD")); debug && err == nil { + threshold = time.Duration(n) * time.Millisecond + } + if debug { + l.Debugf("Enabling lock logging at %v threshold", threshold) + } +} diff --git a/internal/sync/sync.go b/internal/sync/sync.go new file mode 100644 index 00000000..dbfb5063 --- /dev/null +++ b/internal/sync/sync.go @@ -0,0 +1,123 @@ +// Copyright (C) 2015 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package sync + +import ( + "fmt" + "path/filepath" + "runtime" + "sync" + "time" +) + +type Mutex interface { + Lock() + Unlock() +} + +type RWMutex interface { + Mutex + RLock() + RUnlock() +} + +type WaitGroup interface { + Add(int) + Done() + Wait() +} + +func NewMutex() Mutex { + if debug { + return &loggedMutex{} + } + return &sync.Mutex{} +} + +func NewRWMutex() RWMutex { + if debug { + return &loggedRWMutex{} + } + return &sync.RWMutex{} +} + +func NewWaitGroup() WaitGroup { + if debug { + return &loggedWaitGroup{} + } + return &sync.WaitGroup{} +} + +type loggedMutex struct { + sync.Mutex + start time.Time + lockedAt string +} + +func (m *loggedMutex) Lock() { + m.Mutex.Lock() + m.start = time.Now() + m.lockedAt = getCaller() +} + +func (m *loggedMutex) Unlock() { + duration := time.Now().Sub(m.start) + if duration >= threshold { + l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, m.lockedAt, getCaller()) + } + m.Mutex.Unlock() +} + +type loggedRWMutex struct { + sync.RWMutex + start time.Time + lockedAt string +} + +func (m *loggedRWMutex) Lock() { + start := time.Now() + + m.RWMutex.Lock() + + m.start = time.Now() + duration := m.start.Sub(start) + + m.lockedAt = getCaller() + if duration > threshold { + l.Debugf("RWMutex took %v to lock. Locked at %s", duration, m.lockedAt) + } +} + +func (m *loggedRWMutex) Unlock() { + duration := time.Now().Sub(m.start) + if duration >= threshold { + l.Debugf("RWMutex held for %v. Locked at %s: unlocked at %s", duration, m.lockedAt, getCaller()) + } + m.RWMutex.Unlock() +} + +type loggedWaitGroup struct { + sync.WaitGroup +} + +func (wg *loggedWaitGroup) Done() { + start := time.Now() + wg.WaitGroup.Done() + duration := time.Now().Sub(start) + if duration > threshold { + l.Debugf("WaitGroup took %v at %s", duration, getCaller()) + } +} + +func getCaller() string { + pc := make([]uintptr, 10) + runtime.Callers(3, pc) + f := runtime.FuncForPC(pc[0]) + file, line := f.FileLine(pc[0]) + file = filepath.Join(filepath.Base(filepath.Dir(file)), filepath.Base(file)) + return fmt.Sprintf("%s:%d", file, line) +} diff --git a/internal/upnp/upnp.go b/internal/upnp/upnp.go index ee813bd2..ea8ad721 100644 --- a/internal/upnp/upnp.go +++ b/internal/upnp/upnp.go @@ -22,8 +22,9 @@ import ( "net/url" "regexp" "strings" - "sync" "time" + + "github.com/syncthing/syncthing/internal/sync" ) // A container for relevant properties of a UPnP InternetGatewayDevice. @@ -129,7 +130,7 @@ func Discover(timeout time.Duration) []IGD { } }() - var wg sync.WaitGroup + wg := sync.NewWaitGroup() for _, intf := range interfaces { for _, deviceType := range []string{"urn:schemas-upnp-org:device:InternetGatewayDevice:1", "urn:schemas-upnp-org:device:InternetGatewayDevice:2"} { wg.Add(1) diff --git a/internal/versioner/staggered.go b/internal/versioner/staggered.go index cf122e6a..bf328de8 100644 --- a/internal/versioner/staggered.go +++ b/internal/versioner/staggered.go @@ -11,10 +11,10 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" "github.com/syncthing/syncthing/internal/osutil" + "github.com/syncthing/syncthing/internal/sync" ) func init() { @@ -33,7 +33,7 @@ type Staggered struct { cleanInterval int64 folderPath string interval [4]Interval - mutex *sync.Mutex + mutex sync.Mutex } // Rename versions with old version format @@ -87,7 +87,6 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version versionsDir = params["versionsPath"] } - var mutex sync.Mutex s := Staggered{ versionsPath: versionsDir, cleanInterval: cleanInterval, @@ -98,7 +97,7 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version {86400, 592000}, // next 30 days -> 1 day between versions {604800, maxAge}, // next year -> 1 week between versions }, - mutex: &mutex, + mutex: sync.NewMutex(), } if debug {