diff --git a/lib/connections/service.go b/lib/connections/service.go index 256fce93..d98265d9 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -54,9 +54,11 @@ type Service struct { natService *nat.Service natServiceToken *suture.ServiceToken - mut sync.RWMutex - listeners map[string]genericListener - listenerTokens map[string]suture.ServiceToken + listenersMut sync.RWMutex + listeners map[string]genericListener + listenerTokens map[string]suture.ServiceToken + + curConMut sync.Mutex currentConnection map[protocol.DeviceID]Connection } @@ -76,9 +78,11 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg * lans: lans, natService: nat.NewService(myID, cfg), - mut: sync.NewRWMutex(), - listeners: make(map[string]genericListener), - listenerTokens: make(map[string]suture.ServiceToken), + listenersMut: sync.NewRWMutex(), + listeners: make(map[string]genericListener), + listenerTokens: make(map[string]suture.ServiceToken), + + curConMut: sync.NewMutex(), currentConnection: make(map[protocol.DeviceID]Connection), } cfg.Subscribe(service) @@ -153,9 +157,9 @@ next: // If we have a relay connection, and the new incoming connection is // not a relay connection, we should drop that, and prefer the this one. - s.mut.RLock() - skip := false + s.curConMut.Lock() ct, ok := s.currentConnection[remoteID] + s.curConMut.Unlock() // Lower priority is better, just like nice etc. if ok && ct.Priority > c.Priority { @@ -170,14 +174,10 @@ next: // connections still established... l.Infof("Connected to already connected device (%s)", remoteID) c.Close() - skip = true + continue } else if s.model.IsPaused(remoteID) { l.Infof("Connection from paused device (%s)", remoteID) c.Close() - skip = true - } - s.mut.RUnlock() - if skip { continue } @@ -222,10 +222,10 @@ next: l.Infof("Established secure connection to %s at %s", remoteID, name) l.Debugf("cipher suite: %04X in lan: %t", c.ConnectionState().CipherSuite, !limit) - s.mut.Lock() s.model.AddConnection(modelConn, hello) + s.curConMut.Lock() s.currentConnection[remoteID] = modelConn - s.mut.Unlock() + s.curConMut.Unlock() continue next } } @@ -239,6 +239,7 @@ func (s *Service) connect() { nextDial := make(map[string]time.Time) delay := time.Second sleep := time.Second + for { l.Debugln("Reconnect loop") @@ -251,18 +252,18 @@ func (s *Service) connect() { continue } - l.Debugln("Reconnect loop for", deviceID) - - s.mut.RLock() paused := s.model.IsPaused(deviceID) - connected := s.model.ConnectedTo(deviceID) - ct := s.currentConnection[deviceID] - s.mut.RUnlock() - if paused { continue } + l.Debugln("Reconnect loop for", deviceID) + + connected := s.model.ConnectedTo(deviceID) + s.curConMut.Lock() + ct := s.currentConnection[deviceID] + s.curConMut.Unlock() + var addrs []string for _, addr := range deviceCfg.Addresses { if addr == "dynamic" { @@ -354,6 +355,7 @@ func (s *Service) shouldLimit(addr net.Addr) bool { } func (s *Service) createListener(addr string) { + // must be called with listenerMut held uri, err := url.Parse(addr) if err != nil { l.Infoln("Failed to parse listen address:", addr, err) @@ -368,10 +370,8 @@ func (s *Service) createListener(addr string) { listener := listenerFactory(uri, s.tlsCfg, s.conns, s.natService) listener.OnAddressesChanged(s.logListenAddressesChangedEvent) - s.mut.Lock() s.listeners[addr] = listener s.listenerTokens[addr] = s.Add(listener) - s.mut.Unlock() } func (s *Service) logListenAddressesChangedEvent(l genericListener) { @@ -402,21 +402,16 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool { } } - s.mut.RLock() - existingListeners := s.listeners - s.mut.RUnlock() - + s.listenersMut.Lock() seen := make(map[string]struct{}) - for _, addr := range config.Wrap("", to).ListenAddresses() { - if _, ok := existingListeners[addr]; !ok { + if _, ok := s.listeners[addr]; !ok { l.Debugln("Staring listener", addr) s.createListener(addr) } seen[addr] = struct{}{} } - s.mut.Lock() for addr := range s.listeners { if _, ok := seen[addr]; !ok { l.Debugln("Stopping listener", addr) @@ -425,7 +420,7 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool { delete(s.listeners, addr) } } - s.mut.Unlock() + s.listenersMut.Unlock() if to.Options.NATEnabled && s.natServiceToken == nil { l.Debugln("Starting NAT service") @@ -441,7 +436,7 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool { } func (s *Service) AllAddresses() []string { - s.mut.RLock() + s.listenersMut.RLock() var addrs []string for _, listener := range s.listeners { for _, lanAddr := range listener.LANAddresses() { @@ -451,24 +446,24 @@ func (s *Service) AllAddresses() []string { addrs = append(addrs, wanAddr.String()) } } - s.mut.RUnlock() + s.listenersMut.RUnlock() return util.UniqueStrings(addrs) } func (s *Service) ExternalAddresses() []string { - s.mut.RLock() + s.listenersMut.RLock() var addrs []string for _, listener := range s.listeners { for _, wanAddr := range listener.WANAddresses() { addrs = append(addrs, wanAddr.String()) } } - s.mut.RUnlock() + s.listenersMut.RUnlock() return util.UniqueStrings(addrs) } func (s *Service) Status() map[string]interface{} { - s.mut.RLock() + s.listenersMut.RLock() result := make(map[string]interface{}) for addr, listener := range s.listeners { status := make(map[string]interface{}) @@ -483,7 +478,7 @@ func (s *Service) Status() map[string]interface{} { result[addr] = status } - s.mut.RUnlock() + s.listenersMut.RUnlock() return result }