Merge remote-tracking branch 'syncthing/pr/1995'

* syncthing/pr/1995:
  Add switch to disable relays
  Do not start relay service unless explicitly asked for, or global announcement server is running
  Add dynamic relay lookup (DDoS relays.syncthing.net!)
  Discovery clients now take an announcer, global discovery is delayed
  Expose connection type and relay status in the UI
  Add dependencies (fixes #1364)
  Check relays for available devices
  Add incoming connection relay service
  Add unsubscribe to config
  Connections have types
  Large refactoring/feature commit
This commit is contained in:
Jakob Borg
2015-08-20 09:13:37 +02:00
37 changed files with 2493 additions and 457 deletions

View File

@@ -11,16 +11,28 @@ import (
"fmt"
"io"
"net"
"strings"
"net/url"
"sync"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/relaysrv/client"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/thejerf/suture"
)
type DialerFactory func(*url.URL, *tls.Config) (*tls.Conn, error)
type ListenerFactory func(*url.URL, *tls.Config, chan<- model.IntermediateConnection)
var (
dialers = make(map[string]DialerFactory, 0)
listeners = make(map[string]ListenerFactory, 0)
)
// The connection service listens on TLS and dials configured unconnected
// devices. Successful connections are handed to the model.
type connectionSvc struct {
@@ -29,48 +41,36 @@ type connectionSvc struct {
myID protocol.DeviceID
model *model.Model
tlsCfg *tls.Config
conns chan *tls.Conn
conns chan model.IntermediateConnection
lastRelayCheck map[protocol.DeviceID]time.Time
mut sync.RWMutex
connType map[protocol.DeviceID]model.ConnectionType
relaysEnabled bool
}
func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, model *model.Model, tlsCfg *tls.Config) *connectionSvc {
func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Model, tlsCfg *tls.Config) *connectionSvc {
svc := &connectionSvc{
Supervisor: suture.NewSimple("connectionSvc"),
cfg: cfg,
myID: myID,
model: model,
model: mdl,
tlsCfg: tlsCfg,
conns: make(chan *tls.Conn),
conns: make(chan model.IntermediateConnection),
connType: make(map[protocol.DeviceID]model.ConnectionType),
relaysEnabled: cfg.Options().RelaysEnabled,
lastRelayCheck: make(map[protocol.DeviceID]time.Time),
}
cfg.Subscribe(svc)
// There are several moving parts here; one routine per listening address
// to handle incoming connections, one routine to periodically attempt
// outgoing connections, and lastly one routine to the the common handling
// regardless of whether the connection was incoming or outgoing. It ends
// up as in the diagram below. We embed a Supervisor to manage the
// routines (i.e. log and restart if they crash or exit, etc).
//
// +-----------------+
// Incoming | +---------------+-+ +-----------------+
// Connections | | | | | Outgoing
// -------------->| | svc.listen | | | Connections
// | | (1 per listen | | svc.connect |-------------->
// | | address) | | |
// +-+ | | |
// +-----------------+ +-----------------+
// v v
// | |
// | |
// +------------+-----------+
// |
// | svc.conns
// v
// +-----------------+
// | |
// | |
// | svc.handle |------> model.AddConnection()
// | |
// | |
// +-----------------+
// outgoing connections, one routine to the the common handling
// regardless of whether the connection was incoming or outgoing.
// Furthermore, a relay service which handles incoming requests to connect
// via the relays.
//
// TODO: Clean shutdown, and/or handling config changes on the fly. We
// partly do this now - new devices and addresses will be picked up, but
@@ -79,11 +79,25 @@ func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, model *model.
svc.Add(serviceFunc(svc.connect))
for _, addr := range svc.cfg.Options().ListenAddress {
addr := addr
listener := serviceFunc(func() {
svc.listen(addr)
})
svc.Add(listener)
uri, err := url.Parse(addr)
if err != nil {
l.Infoln("Failed to parse listen address:", addr, err)
continue
}
listener, ok := listeners[uri.Scheme]
if !ok {
l.Infoln("Unknown listen address scheme:", uri.String())
continue
}
if debugNet {
l.Debugln("listening on", uri.String())
}
svc.Add(serviceFunc(func() {
listener(uri, svc.tlsCfg, svc.conns)
}))
}
svc.Add(serviceFunc(svc.handle))
@@ -92,15 +106,15 @@ func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, model *model.
func (s *connectionSvc) handle() {
next:
for conn := range s.conns {
cs := conn.ConnectionState()
for c := range s.conns {
cs := c.Conn.ConnectionState()
// We should have negotiated the next level protocol "bep/1.0" as part
// of the TLS handshake. Unfortunately this can't be a hard error,
// because there are implementations out there that don't support
// protocol negotiation (iOS for one...).
if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != bepProtocolName {
l.Infof("Peer %s did not negotiate bep/1.0", conn.RemoteAddr())
l.Infof("Peer %s did not negotiate bep/1.0", c.Conn.RemoteAddr())
}
// We should have received exactly one certificate from the other
@@ -108,8 +122,8 @@ next:
// connection.
certs := cs.PeerCertificates
if cl := len(certs); cl != 1 {
l.Infof("Got peer certificate list of length %d != 1 from %s; protocol error", cl, conn.RemoteAddr())
conn.Close()
l.Infof("Got peer certificate list of length %d != 1 from %s; protocol error", cl, c.Conn.RemoteAddr())
c.Conn.Close()
continue
}
remoteCert := certs[0]
@@ -120,19 +134,29 @@ next:
// clients between the same NAT gateway, and global discovery.
if remoteID == myID {
l.Infof("Connected to myself (%s) - should not happen", remoteID)
conn.Close()
c.Conn.Close()
continue
}
// We should not already be connected to the other party. TODO: This
// could use some better handling. If the old connection is dead but
// hasn't timed out yet we may want to drop *that* connection and keep
// this one. But in case we are two devices connecting to each other
// in parallel we don't want to do that or we end up with no
// connections still established...
if s.model.ConnectedTo(remoteID) {
// If we have a relay connection, and the new incoming connection is
// not a relay connection, we should drop that, and prefer the this one.
s.mut.RLock()
ct, ok := s.connType[remoteID]
s.mut.RUnlock()
if ok && !ct.IsDirect() && c.ConnType.IsDirect() {
if debugNet {
l.Debugln("Switching connections", remoteID)
}
s.model.Close(remoteID, fmt.Errorf("switching connections"))
} else if s.model.ConnectedTo(remoteID) {
// We should not already be connected to the other party. TODO: This
// could use some better handling. If the old connection is dead but
// hasn't timed out yet we may want to drop *that* connection and keep
// this one. But in case we are two devices connecting to each other
// in parallel we don't want to do that or we end up with no
// connections still established...
l.Infof("Connected to already connected device (%s)", remoteID)
conn.Close()
c.Conn.Close()
continue
}
@@ -150,35 +174,42 @@ next:
// Incorrect certificate name is something the user most
// likely wants to know about, since it's an advanced
// config. Warn instead of Info.
l.Warnf("Bad certificate from %s (%v): %v", remoteID, conn.RemoteAddr(), err)
conn.Close()
l.Warnf("Bad certificate from %s (%v): %v", remoteID, c.Conn.RemoteAddr(), err)
c.Conn.Close()
continue next
}
// If rate limiting is set, and based on the address we should
// limit the connection, then we wrap it in a limiter.
limit := s.shouldLimit(conn.RemoteAddr())
limit := s.shouldLimit(c.Conn.RemoteAddr())
wr := io.Writer(conn)
wr := io.Writer(c.Conn)
if limit && writeRateLimit != nil {
wr = &limitedWriter{conn, writeRateLimit}
wr = &limitedWriter{c.Conn, writeRateLimit}
}
rd := io.Reader(conn)
rd := io.Reader(c.Conn)
if limit && readRateLimit != nil {
rd = &limitedReader{conn, readRateLimit}
rd = &limitedReader{c.Conn, readRateLimit}
}
name := fmt.Sprintf("%s-%s", conn.LocalAddr(), conn.RemoteAddr())
name := fmt.Sprintf("%s-%s (%s)", c.Conn.LocalAddr(), c.Conn.RemoteAddr(), c.ConnType)
protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression)
l.Infof("Established secure connection to %s at %s", remoteID, name)
if debugNet {
l.Debugf("cipher suite: %04X in lan: %t", conn.ConnectionState().CipherSuite, !limit)
l.Debugf("cipher suite: %04X in lan: %t", c.Conn.ConnectionState().CipherSuite, !limit)
}
s.model.AddConnection(conn, protoConn)
s.model.AddConnection(model.Connection{
c.Conn,
protoConn,
c.ConnType,
})
s.mut.Lock()
s.connType[remoteID] = c.ConnType
s.mut.Unlock()
continue next
}
}
@@ -186,54 +217,14 @@ next:
if !s.cfg.IgnoredDevice(remoteID) {
events.Default.Log(events.DeviceRejected, map[string]string{
"device": remoteID.String(),
"address": conn.RemoteAddr().String(),
"address": c.Conn.RemoteAddr().String(),
})
l.Infof("Connection from %s with unknown device ID %s", conn.RemoteAddr(), remoteID)
l.Infof("Connection from %s (%s) with unknown device ID %s", c.Conn.RemoteAddr(), c.ConnType, remoteID)
} else {
l.Infof("Connection from %s with ignored device ID %s", conn.RemoteAddr(), remoteID)
l.Infof("Connection from %s (%s) with ignored device ID %s", c.Conn.RemoteAddr(), c.ConnType, remoteID)
}
conn.Close()
}
}
func (s *connectionSvc) listen(addr string) {
if debugNet {
l.Debugln("listening on", addr)
}
tcaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
l.Fatalln("listen (BEP):", err)
}
listener, err := net.ListenTCP("tcp", tcaddr)
if err != nil {
l.Fatalln("listen (BEP):", err)
}
for {
conn, err := listener.Accept()
if err != nil {
l.Warnln("Accepting connection:", err)
continue
}
if debugNet {
l.Debugln("connect from", conn.RemoteAddr())
}
tcpConn := conn.(*net.TCPConn)
s.setTCPOptions(tcpConn)
tc := tls.Server(conn, s.tlsCfg)
err = tc.Handshake()
if err != nil {
l.Infoln("TLS handshake:", err)
tc.Close()
continue
}
s.conns <- tc
c.Conn.Close()
}
}
@@ -246,19 +237,24 @@ func (s *connectionSvc) connect() {
continue
}
if s.model.ConnectedTo(deviceID) {
connected := s.model.ConnectedTo(deviceID)
s.mut.RLock()
ct, ok := s.connType[deviceID]
relaysEnabled := s.relaysEnabled
s.mut.RUnlock()
if connected && ok && ct.IsDirect() {
continue
}
var addrs []string
var relays []string
for _, addr := range deviceCfg.Addresses {
if addr == "dynamic" {
if discoverer != nil {
t := discoverer.Lookup(deviceID)
if len(t) == 0 {
continue
}
t, r := discoverer.Lookup(deviceID)
addrs = append(addrs, t...)
relays = append(relays, r...)
}
} else {
addrs = append(addrs, addr)
@@ -266,45 +262,108 @@ func (s *connectionSvc) connect() {
}
for _, addr := range addrs {
host, port, err := net.SplitHostPort(addr)
if err != nil && strings.HasPrefix(err.Error(), "missing port") {
// addr is on the form "1.2.3.4"
addr = net.JoinHostPort(addr, "22000")
} else if err == nil && port == "" {
// addr is on the form "1.2.3.4:"
addr = net.JoinHostPort(host, "22000")
uri, err := url.Parse(addr)
if err != nil {
l.Infoln("Failed to parse connection url:", addr, err)
continue
}
dialer, ok := dialers[uri.Scheme]
if !ok {
l.Infoln("Unknown address schema", uri.String())
continue
}
if debugNet {
l.Debugln("dial", deviceCfg.DeviceID, addr)
l.Debugln("dial", deviceCfg.DeviceID, uri.String())
}
raddr, err := net.ResolveTCPAddr("tcp", addr)
conn, err := dialer(uri, s.tlsCfg)
if err != nil {
if debugNet {
l.Debugln(err)
l.Debugln("dial failed", deviceCfg.DeviceID, uri.String(), err)
}
continue
}
conn, err := net.DialTCP("tcp", nil, raddr)
if connected {
s.model.Close(deviceID, fmt.Errorf("switching connections"))
}
s.conns <- model.IntermediateConnection{
conn, model.ConnectionTypeBasicDial,
}
continue nextDevice
}
// Only connect via relays if not already connected
// Also, do not set lastRelayCheck time if we have no relays,
// as otherwise when we do discover relays, we might have to
// wait up to RelayReconnectIntervalM to connect again.
// Also, do not try relays if we are explicitly told not to.
if connected || len(relays) == 0 || !relaysEnabled {
continue nextDevice
}
reconIntv := time.Duration(s.cfg.Options().RelayReconnectIntervalM) * time.Minute
if last, ok := s.lastRelayCheck[deviceID]; ok && time.Since(last) < reconIntv {
if debugNet {
l.Debugln("Skipping connecting via relay to", deviceID, "last checked at", last)
}
continue nextDevice
} else if debugNet {
l.Debugln("Trying relay connections to", deviceID, relays)
}
s.lastRelayCheck[deviceID] = time.Now()
for _, addr := range relays {
uri, err := url.Parse(addr)
if err != nil {
if debugNet {
l.Debugln(err)
}
l.Infoln("Failed to parse relay connection url:", addr, err)
continue
}
s.setTCPOptions(conn)
inv, err := client.GetInvitationFromRelay(uri, deviceID, s.tlsCfg.Certificates)
if err != nil {
if debugNet {
l.Debugf("Failed to get invitation for %s from %s: %v", deviceID, uri, err)
}
continue
} else if debugNet {
l.Debugln("Succesfully retrieved relay invitation", inv, "from", uri)
}
tc := tls.Client(conn, s.tlsCfg)
conn, err := client.JoinSession(inv)
if err != nil {
if debugNet {
l.Debugf("Failed to join relay session %s: %v", inv, err)
}
continue
} else if debugNet {
l.Debugln("Sucessfully joined relay session", inv)
}
err = osutil.SetTCPOptions(conn.(*net.TCPConn))
if err != nil {
l.Infoln(err)
}
var tc *tls.Conn
if inv.ServerSocket {
tc = tls.Server(conn, s.tlsCfg)
} else {
tc = tls.Client(conn, s.tlsCfg)
}
err = tc.Handshake()
if err != nil {
l.Infoln("TLS handshake:", err)
l.Infof("TLS handshake (BEP/relay %s): %v", inv, err)
tc.Close()
continue
}
s.conns <- tc
s.conns <- model.IntermediateConnection{
tc, model.ConnectionTypeRelayDial,
}
continue nextDevice
}
}
@@ -317,22 +376,6 @@ func (s *connectionSvc) connect() {
}
}
func (*connectionSvc) setTCPOptions(conn *net.TCPConn) {
var err error
if err = conn.SetLinger(0); err != nil {
l.Infoln(err)
}
if err = conn.SetNoDelay(false); err != nil {
l.Infoln(err)
}
if err = conn.SetKeepAlivePeriod(60 * time.Second); err != nil {
l.Infoln(err)
}
if err = conn.SetKeepAlive(true); err != nil {
l.Infoln(err)
}
}
func (s *connectionSvc) shouldLimit(addr net.Addr) bool {
if s.cfg.Options().LimitBandwidthInLan {
return true
@@ -355,6 +398,10 @@ func (s *connectionSvc) VerifyConfiguration(from, to config.Configuration) error
}
func (s *connectionSvc) CommitConfiguration(from, to config.Configuration) bool {
s.mut.Lock()
s.relaysEnabled = to.Options.RelaysEnabled
s.mut.Unlock()
// We require a restart if a device as been removed.
newDevices := make(map[protocol.DeviceID]bool, len(to.Devices))

View File

@@ -0,0 +1,105 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package main
import (
"crypto/tls"
"net"
"net/url"
"strings"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil"
)
func init() {
dialers["tcp"] = tcpDialer
listeners["tcp"] = tcpListener
}
func tcpDialer(uri *url.URL, tlsCfg *tls.Config) (*tls.Conn, error) {
host, port, err := net.SplitHostPort(uri.Host)
if err != nil && strings.HasPrefix(err.Error(), "missing port") {
// addr is on the form "1.2.3.4"
uri.Host = net.JoinHostPort(uri.Host, "22000")
} else if err == nil && port == "" {
// addr is on the form "1.2.3.4:"
uri.Host = net.JoinHostPort(host, "22000")
}
raddr, err := net.ResolveTCPAddr("tcp", uri.Host)
if err != nil {
if debugNet {
l.Debugln(err)
}
return nil, err
}
conn, err := net.DialTCP("tcp", nil, raddr)
if err != nil {
if debugNet {
l.Debugln(err)
}
return nil, err
}
err = osutil.SetTCPOptions(conn)
if err != nil {
l.Infoln(err)
}
tc := tls.Client(conn, tlsCfg)
err = tc.Handshake()
if err != nil {
tc.Close()
return nil, err
}
return tc, nil
}
func tcpListener(uri *url.URL, tlsCfg *tls.Config, conns chan<- model.IntermediateConnection) {
tcaddr, err := net.ResolveTCPAddr("tcp", uri.Host)
if err != nil {
l.Fatalln("listen (BEP/tcp):", err)
return
}
listener, err := net.ListenTCP("tcp", tcaddr)
if err != nil {
l.Fatalln("listen (BEP/tcp):", err)
return
}
for {
conn, err := listener.Accept()
if err != nil {
l.Warnln("Accepting connection (BEP/tcp):", err)
continue
}
if debugNet {
l.Debugln("connect from", conn.RemoteAddr())
}
err = osutil.SetTCPOptions(conn.(*net.TCPConn))
if err != nil {
l.Infoln(err)
}
tc := tls.Server(conn, tlsCfg)
err = tc.Handshake()
if err != nil {
l.Infoln("TLS handshake (BEP/tcp):", err)
tc.Close()
continue
}
conns <- model.IntermediateConnection{
tc, model.ConnectionTypeBasicAccept,
}
}
}

View File

@@ -628,6 +628,9 @@ func (s *apiSvc) getSystemStatus(w http.ResponseWriter, r *http.Request) {
if cfg.Options().GlobalAnnEnabled && discoverer != nil {
res["extAnnounceOK"] = discoverer.ExtAnnounceOK()
}
if relaySvc != nil {
res["relayClientStatus"] = relaySvc.ClientStatus()
}
cpuUsageLock.RLock()
var cpusum float64
for _, p := range cpuUsagePercent {

View File

@@ -34,8 +34,10 @@ import (
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay"
"github.com/syncthing/syncthing/lib/symlinks"
"github.com/syncthing/syncthing/lib/upgrade"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -110,6 +112,7 @@ var (
readRateLimit *ratelimit.Bucket
stop = make(chan int)
discoverer *discover.Discoverer
relaySvc *relay.Svc
cert tls.Certificate
lans []*net.IPNet
)
@@ -627,7 +630,7 @@ func syncthingMain() {
m.StartDeadlockDetector(time.Duration(it) * time.Second)
}
} else if !IsRelease || IsBeta {
m.StartDeadlockDetector(20 * 60 * time.Second)
m.StartDeadlockDetector(20 * time.Minute)
}
// Clear out old indexes for other devices. Otherwise we'll start up and
@@ -658,15 +661,30 @@ func syncthingMain() {
// The default port we announce, possibly modified by setupUPnP next.
addr, err := net.ResolveTCPAddr("tcp", opts.ListenAddress[0])
uri, err := url.Parse(opts.ListenAddress[0])
if err != nil {
l.Fatalf("Failed to parse listen address %s: %v", opts.ListenAddress[0], err)
}
addr, err := net.ResolveTCPAddr("tcp", uri.Host)
if err != nil {
l.Fatalln("Bad listen address:", err)
}
// Start the relevant services
connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg)
mainSvc.Add(connectionSvc)
if opts.RelaysEnabled && (opts.GlobalAnnEnabled || opts.RelayWithoutGlobalAnn) {
relaySvc = relay.NewSvc(cfg, tlsCfg, connectionSvc.conns)
connectionSvc.Add(relaySvc)
}
// Start discovery
localPort := addr.Port
discoverer = discovery(localPort)
discoverer = discovery(localPort, relaySvc)
// Start UPnP. The UPnP service will restart global discovery if the
// external port changes.
@@ -676,10 +694,6 @@ func syncthingMain() {
mainSvc.Add(upnpSvc)
}
connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg)
cfg.Subscribe(connectionSvc)
mainSvc.Add(connectionSvc)
if cpuProfile {
f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid()))
if err != nil {
@@ -900,18 +914,23 @@ func shutdown() {
stop <- exitSuccess
}
func discovery(extPort int) *discover.Discoverer {
func discovery(extPort int, relaySvc *relay.Svc) *discover.Discoverer {
opts := cfg.Options()
disc := discover.NewDiscoverer(myID, opts.ListenAddress)
disc := discover.NewDiscoverer(myID, opts.ListenAddress, relaySvc)
if opts.LocalAnnEnabled {
l.Infoln("Starting local discovery announcements")
disc.StartLocal(opts.LocalAnnPort, opts.LocalAnnMCAddr)
}
if opts.GlobalAnnEnabled {
l.Infoln("Starting global discovery announcements")
disc.StartGlobal(opts.GlobalAnnServers, uint16(extPort))
go func() {
// Defer starting global announce server, giving time to connect
// to relay servers.
time.Sleep(5 * time.Second)
l.Infoln("Starting global discovery announcements")
disc.StartGlobal(opts.GlobalAnnServers, uint16(extPort))
}()
}
return disc

View File

@@ -74,7 +74,7 @@ func (s *verboseSvc) formatEvent(ev events.Event) string {
return fmt.Sprintf("Discovered device %v at %v", data["device"], data["addrs"])
case events.DeviceConnected:
data := ev.Data.(map[string]string)
return fmt.Sprintf("Connected to device %v at %v", data["id"], data["addr"])
return fmt.Sprintf("Connected to device %v at %v (type %s)", data["id"], data["addr"], data["type"])
case events.DeviceDisconnected:
data := ev.Data.(map[string]string)
return fmt.Sprintf("Disconnected from device %v", data["id"])