lib/connections: Un-deprecate relaysEnabled (fixes #3074)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3098
This commit is contained in:
Jakob Borg
2016-05-17 00:05:38 +00:00
parent adb7fb43cb
commit 2c1323ece6
13 changed files with 371 additions and 128 deletions

View File

@@ -70,10 +70,6 @@ func (d *relayDialer) RedialFrequency() time.Duration {
return time.Duration(d.cfg.Options().RelayReconnectIntervalM) * time.Minute
}
func (d *relayDialer) String() string {
return "Relay Dialer"
}
type relayDialerFactory struct{}
func (relayDialerFactory) New(cfg *config.Wrapper, tlsCfg *tls.Config) genericDialer {
@@ -86,3 +82,11 @@ func (relayDialerFactory) New(cfg *config.Wrapper, tlsCfg *tls.Config) genericDi
func (relayDialerFactory) Priority() int {
return relayPriority
}
func (relayDialerFactory) Enabled(cfg config.Configuration) bool {
return cfg.Options.RelaysEnabled
}
func (relayDialerFactory) String() string {
return "Relay Dialer"
}

View File

@@ -13,23 +13,26 @@ import (
"sync"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/relay/client"
)
func init() {
listeners["relay"] = newRelayListener
listeners["dynamic+http"] = newRelayListener
listeners["dynamic+https"] = newRelayListener
factory := &relayListenerFactory{}
listeners["relay"] = factory
listeners["dynamic+http"] = factory
listeners["dynamic+https"] = factory
}
type relayListener struct {
onAddressesChangedNotifier
uri *url.URL
tlsCfg *tls.Config
conns chan IntermediateConnection
uri *url.URL
tlsCfg *tls.Config
conns chan IntermediateConnection
factory listenerFactory
err error
client client.RelayClient
@@ -154,14 +157,25 @@ func (t *relayListener) Error() error {
return cerr
}
func (t *relayListener) Factory() listenerFactory {
return t.factory
}
func (t *relayListener) String() string {
return t.uri.String()
}
func newRelayListener(uri *url.URL, tlsCfg *tls.Config, conns chan IntermediateConnection, natService *nat.Service) genericListener {
type relayListenerFactory struct{}
func (f *relayListenerFactory) New(uri *url.URL, cfg *config.Wrapper, tlsCfg *tls.Config, conns chan IntermediateConnection, natService *nat.Service) genericListener {
return &relayListener{
uri: uri,
tlsCfg: tlsCfg,
conns: conns,
uri: uri,
tlsCfg: tlsCfg,
conns: conns,
factory: f,
}
}
func (relayListenerFactory) Enabled(cfg config.Configuration) bool {
return cfg.Options.RelaysEnabled
}

View File

@@ -9,6 +9,7 @@ package connections
import (
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
@@ -112,6 +113,10 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *
return service
}
var (
errDisabled = errors.New("disabled by configuration")
)
func (s *Service) handle() {
next:
for c := range s.conns {
@@ -237,24 +242,35 @@ next:
func (s *Service) connect() {
nextDial := make(map[string]time.Time)
delay := time.Second
sleep := time.Second
bestDialerPrio := 1<<31 - 1 // worse prio won't build on 32 bit
for _, df := range dialers {
if prio := df.Priority(); prio < bestDialerPrio {
bestDialerPrio = prio
}
}
// Used as delay for the first few connection attempts, increases
// exponentially
initialRampup := time.Second
// Calculated from actual dialers reconnectInterval
var sleep time.Duration
for {
cfg := s.cfg.Raw()
bestDialerPrio := 1<<31 - 1 // worse prio won't build on 32 bit
for _, df := range dialers {
if !df.Enabled(cfg) {
continue
}
if prio := df.Priority(); prio < bestDialerPrio {
bestDialerPrio = prio
}
}
l.Debugln("Reconnect loop")
now := time.Now()
var seen []string
nextDevice:
for deviceID, deviceCfg := range s.cfg.Devices() {
for _, deviceCfg := range cfg.Devices {
deviceID := deviceCfg.DeviceID
if deviceID == s.myID {
continue
}
@@ -292,35 +308,40 @@ func (s *Service) connect() {
seen = append(seen, addrs...)
for _, addr := range addrs {
nextDialAt, ok := nextDial[addr]
if ok && initialRampup >= sleep && nextDialAt.After(now) {
l.Debugf("Not dialing %v as sleep is %v, next dial is at %s and current time is %s", addr, sleep, nextDialAt, now)
continue
}
// If we fail at any step before actually getting the dialer
// retry in a minute
nextDial[addr] = now.Add(time.Minute)
uri, err := url.Parse(addr)
if err != nil {
l.Infoln("Failed to parse connection url:", addr, err)
l.Infof("Dialer for %s: %v", addr, err)
continue
}
dialerFactory, ok := dialers[uri.Scheme]
if !ok {
l.Debugln("Unknown address schema", uri)
dialerFactory, err := s.getDialerFactory(cfg, uri)
if err == errDisabled {
l.Debugln("Dialer for", uri, "is disabled")
continue
}
if err != nil {
l.Infof("Dialer for %v: %v", uri, err)
continue
}
if connected && dialerFactory.Priority() >= ct.Priority {
l.Debugf("Not dialing using %s as priorty is less than current connection (%d >= %d)", dialerFactory, dialerFactory.Priority(), ct.Priority)
continue
}
dialer := dialerFactory.New(s.cfg, s.tlsCfg)
nextDialAt, ok := nextDial[uri.String()]
// See below for comments on this delay >= sleep check
if delay >= sleep && ok && nextDialAt.After(now) {
l.Debugf("Not dialing as next dial is at %s and current time is %s", nextDialAt, now)
continue
}
nextDial[uri.String()] = now.Add(dialer.RedialFrequency())
if connected && dialer.Priority() >= ct.Priority {
l.Debugf("Not dialing using %s as priorty is less than current connection (%d >= %d)", dialer, dialer.Priority(), ct.Priority)
continue
}
l.Debugln("dial", deviceCfg.DeviceID, uri)
nextDial[addr] = now.Add(dialer.RedialFrequency())
conn, err := dialer.Dial(deviceID, uri)
if err != nil {
l.Debugln("dial failed", deviceCfg.DeviceID, uri, err)
@@ -338,12 +359,12 @@ func (s *Service) connect() {
nextDial, sleep = filterAndFindSleepDuration(nextDial, seen, now)
// delay variable is used to trigger much more frequent dialing after
// initial startup, essentially causing redials every 1, 2, 4, 8... seconds
if delay < sleep {
time.Sleep(delay)
delay *= 2
if initialRampup < sleep {
l.Debugln("initial rampup; sleep", initialRampup, "and update to", initialRampup*2)
time.Sleep(initialRampup)
initialRampup *= 2
} else {
l.Debugln("sleep until next dial", sleep)
time.Sleep(sleep)
}
}
@@ -366,24 +387,16 @@ func (s *Service) shouldLimit(addr net.Addr) bool {
return !tcpaddr.IP.IsLoopback()
}
func (s *Service) createListener(addr string) {
func (s *Service) createListener(factory listenerFactory, uri *url.URL) bool {
// must be called with listenerMut held
uri, err := url.Parse(addr)
if err != nil {
l.Infoln("Failed to parse listen address:", addr, err)
return
}
listenerFactory, ok := listeners[uri.Scheme]
if !ok {
l.Infoln("Unknown listen address scheme:", uri.String())
return
}
l.Debugln("Starting listener", uri)
listener := listenerFactory(uri, s.tlsCfg, s.conns, s.natService)
listener := factory.New(uri, s.cfg, s.tlsCfg, s.conns, s.natService)
listener.OnAddressesChanged(s.logListenAddressesChangedEvent)
s.listeners[addr] = listener
s.listenerTokens[addr] = s.Add(listener)
s.listeners[uri.String()] = listener
s.listenerTokens[uri.String()] = s.Add(listener)
return true
}
func (s *Service) logListenAddressesChangedEvent(l genericListener) {
@@ -417,15 +430,33 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
s.listenersMut.Lock()
seen := make(map[string]struct{})
for _, addr := range config.Wrap("", to).ListenAddresses() {
if _, ok := s.listeners[addr]; !ok {
l.Debugln("Staring listener", addr)
s.createListener(addr)
if _, ok := s.listeners[addr]; ok {
seen[addr] = struct{}{}
continue
}
uri, err := url.Parse(addr)
if err != nil {
l.Infof("Listener for %s: %v", addr, err)
continue
}
factory, err := s.getListenerFactory(to, uri)
if err == errDisabled {
l.Debugln("Listener for", uri, "is disabled")
continue
}
if err != nil {
l.Infof("Listener for %v: %v", uri, err)
continue
}
s.createListener(factory, uri)
seen[addr] = struct{}{}
}
for addr := range s.listeners {
if _, ok := seen[addr]; !ok {
for addr, listener := range s.listeners {
if _, ok := seen[addr]; !ok || !listener.Factory().Enabled(to) {
l.Debugln("Stopping listener", addr)
s.Remove(s.listenerTokens[addr])
delete(s.listenerTokens, addr)
@@ -494,6 +525,32 @@ func (s *Service) Status() map[string]interface{} {
return result
}
func (s *Service) getDialerFactory(cfg config.Configuration, uri *url.URL) (dialerFactory, error) {
dialerFactory, ok := dialers[uri.Scheme]
if !ok {
return nil, fmt.Errorf("unknown address scheme %q", uri.Scheme)
}
if !dialerFactory.Enabled(cfg) {
return nil, errDisabled
}
return dialerFactory, nil
}
func (s *Service) getListenerFactory(cfg config.Configuration, uri *url.URL) (listenerFactory, error) {
listenerFactory, ok := listeners[uri.Scheme]
if !ok {
return nil, fmt.Errorf("unknown address scheme %q", uri.Scheme)
}
if !listenerFactory.Enabled(cfg) {
return nil, errDisabled
}
return listenerFactory, nil
}
func exchangeHello(c net.Conn, h protocol.HelloMessage) (protocol.HelloMessage, error) {
if err := c.SetDeadline(time.Now().Add(2 * time.Second)); err != nil {
return protocol.HelloMessage{}, err

View File

@@ -31,16 +31,19 @@ type Connection struct {
type dialerFactory interface {
New(*config.Wrapper, *tls.Config) genericDialer
Priority() int
Enabled(config.Configuration) bool
String() string
}
type genericDialer interface {
Dial(protocol.DeviceID, *url.URL) (IntermediateConnection, error)
Priority() int
RedialFrequency() time.Duration
String() string
}
type listenerFactory func(*url.URL, *tls.Config, chan IntermediateConnection, *nat.Service) genericListener
type listenerFactory interface {
New(*url.URL, *config.Wrapper, *tls.Config, chan IntermediateConnection, *nat.Service) genericListener
Enabled(config.Configuration) bool
}
type genericListener interface {
Serve()
@@ -58,6 +61,7 @@ type genericListener interface {
Error() error
OnAddressesChanged(func(genericListener))
String() string
Factory() listenerFactory
}
type Model interface {

View File

@@ -20,8 +20,9 @@ import (
const tcpPriority = 10
func init() {
factory := &tcpDialerFactory{}
for _, scheme := range []string{"tcp", "tcp4", "tcp6"} {
dialers[scheme] = tcpDialerFactory{}
dialers[scheme] = factory
}
}
@@ -55,18 +56,10 @@ func (d *tcpDialer) Dial(id protocol.DeviceID, uri *url.URL) (IntermediateConnec
return IntermediateConnection{tc, "TCP (Client)", tcpPriority}, nil
}
func (tcpDialer) Priority() int {
return tcpPriority
}
func (d *tcpDialer) RedialFrequency() time.Duration {
return time.Duration(d.cfg.Options().ReconnectIntervalS) * time.Second
}
func (d *tcpDialer) String() string {
return "TCP Dialer"
}
type tcpDialerFactory struct{}
func (tcpDialerFactory) New(cfg *config.Wrapper, tlsCfg *tls.Config) genericDialer {
@@ -79,3 +72,11 @@ func (tcpDialerFactory) New(cfg *config.Wrapper, tlsCfg *tls.Config) genericDial
func (tcpDialerFactory) Priority() int {
return tcpPriority
}
func (tcpDialerFactory) Enabled(cfg config.Configuration) bool {
return true
}
func (tcpDialerFactory) String() string {
return "TCP Dialer"
}

View File

@@ -14,23 +14,26 @@ import (
"sync"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/nat"
)
func init() {
factory := &tcpListenerFactory{}
for _, scheme := range []string{"tcp", "tcp4", "tcp6"} {
listeners[scheme] = newTCPListener
listeners[scheme] = factory
}
}
type tcpListener struct {
onAddressesChangedNotifier
uri *url.URL
tlsCfg *tls.Config
stop chan struct{}
conns chan IntermediateConnection
uri *url.URL
tlsCfg *tls.Config
stop chan struct{}
conns chan IntermediateConnection
factory listenerFactory
natService *nat.Service
mapping *nat.Mapping
@@ -63,6 +66,9 @@ func (t *tcpListener) Serve() {
}
defer listener.Close()
l.Infof("TCP listener (%v) starting", listener.Addr())
defer l.Infof("TCP listener (%v) shutting down", listener.Addr())
mapping := t.natService.NewMapping(nat.TCP, tcaddr.IP, tcaddr.Port)
mapping.OnChanged(func(_ *nat.Mapping, _, _ []nat.Address) {
t.notifyAddressesChanged(t)
@@ -152,16 +158,61 @@ func (t *tcpListener) String() string {
return t.uri.String()
}
func newTCPListener(uri *url.URL, tlsCfg *tls.Config, conns chan IntermediateConnection, natService *nat.Service) genericListener {
func (t *tcpListener) Factory() listenerFactory {
return t.factory
}
type tcpListenerFactory struct{}
func (f *tcpListenerFactory) New(uri *url.URL, cfg *config.Wrapper, tlsCfg *tls.Config, conns chan IntermediateConnection, natService *nat.Service) genericListener {
return &tcpListener{
uri: fixupPort(uri),
tlsCfg: tlsCfg,
conns: conns,
natService: natService,
stop: make(chan struct{}),
factory: f,
}
}
func (tcpListenerFactory) Enabled(cfg config.Configuration) bool {
return true
}
func isPublicIPv4(ip net.IP) bool {
ip = ip.To4()
if ip == nil {
// Not an IPv4 address (IPv6)
return false
}
// IsGlobalUnicast below only checks that it's not link local or
// multicast, and we want to exclude private (NAT:ed) addresses as well.
rfc1918 := []net.IPNet{
{IP: net.IP{10, 0, 0, 0}, Mask: net.IPMask{255, 0, 0, 0}},
{IP: net.IP{172, 16, 0, 0}, Mask: net.IPMask{255, 240, 0, 0}},
{IP: net.IP{192, 168, 0, 0}, Mask: net.IPMask{255, 255, 0, 0}},
}
for _, n := range rfc1918 {
if n.Contains(ip) {
return false
}
}
return ip.IsGlobalUnicast()
}
func isPublicIPv6(ip net.IP) bool {
if ip.To4() != nil {
// Not an IPv6 address (IPv4)
// (To16() returns a v6 mapped v4 address so can't be used to check
// that it's an actual v6 address)
return false
}
return ip.IsGlobalUnicast()
}
func fixupPort(uri *url.URL) *url.URL {
copyURI := *uri