diff --git a/cmd/syncthing/connections.go b/cmd/syncthing/connections.go index 161e06a7..60098ecc 100644 --- a/cmd/syncthing/connections.go +++ b/cmd/syncthing/connections.go @@ -60,36 +60,14 @@ func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Mo connType: make(map[protocol.DeviceID]model.ConnectionType), } + cfg.Subscribe(svc) // There are several moving parts here; one routine per listening address // to handle incoming connections, one routine to periodically attempt - // outgoing connections, and lastly one routine to the the common handling - // regardless of whether the connection was incoming or outgoing. It ends - // up as in the diagram below. We embed a Supervisor to manage the - // routines (i.e. log and restart if they crash or exit, etc). - // - // +-----------------+ - // Incoming | +---------------+-+ +-----------------+ - // Connections | | | | | - // -------------->| | listener | | | Outgoing connections via dialers - // | | (1 per listen | | svc.connect |-----------------------------------> - // | | address) | | | - // +-+ | | | - // +-----------------+ +-----------------+ - // v v - // | | - // | | - // +------------+-----------+ - // | - // | svc.conns - // v - // +-----------------+ - // | | - // | | - // | svc.handle |------> model.AddConnection() - // | | - // | | - // +-----------------+ + // outgoing connections, one routine to the the common handling + // regardless of whether the connection was incoming or outgoing. + // Furthermore, a relay service which handles incoming requests to connect + // via the relays. // // TODO: Clean shutdown, and/or handling config changes on the fly. We // partly do this now - new devices and addresses will be picked up, but diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index ffa0dc8d..15e35567 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -668,6 +668,13 @@ func syncthingMain() { l.Fatalln("Bad listen address:", err) } + // Start the relevant services + + connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg) + relaySvc := newRelaySvc(cfg, tlsCfg, connectionSvc.conns) + connectionSvc.Add(relaySvc) + mainSvc.Add(connectionSvc) + // Start discovery localPort := addr.Port @@ -681,10 +688,6 @@ func syncthingMain() { mainSvc.Add(upnpSvc) } - connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg) - cfg.Subscribe(connectionSvc) - mainSvc.Add(connectionSvc) - if cpuProfile { f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid())) if err != nil { diff --git a/cmd/syncthing/relays.go b/cmd/syncthing/relays.go new file mode 100644 index 00000000..af3da4bd --- /dev/null +++ b/cmd/syncthing/relays.go @@ -0,0 +1,193 @@ +// Copyright (C) 2015 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "crypto/tls" + "net" + "net/url" + "time" + + "github.com/syncthing/relaysrv/client" + "github.com/syncthing/relaysrv/protocol" + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/model" + "github.com/syncthing/syncthing/lib/sync" + + "github.com/thejerf/suture" +) + +func newRelaySvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- intermediateConnection) *relaySvc { + svc := &relaySvc{ + Supervisor: suture.New("relaySvc", suture.Spec{ + Log: func(log string) { + if debugNet { + l.Infoln(log) + } + }, + FailureBackoff: 5 * time.Minute, + FailureDecay: float64((10 * time.Minute) / time.Second), + FailureThreshold: 5, + }), + cfg: cfg, + tlsCfg: tlsCfg, + + tokens: make(map[string]suture.ServiceToken), + clients: make(map[string]*client.ProtocolClient), + mut: sync.NewRWMutex(), + + invitations: make(chan protocol.SessionInvitation), + } + + rcfg := cfg.Raw() + svc.CommitConfiguration(rcfg, rcfg) + cfg.Subscribe(svc) + + receiver := &invitationReceiver{ + tlsCfg: tlsCfg, + conns: conns, + invitations: svc.invitations, + } + + svc.receiverToken = svc.Add(receiver) + + return svc +} + +type relaySvc struct { + *suture.Supervisor + cfg *config.Wrapper + tlsCfg *tls.Config + + receiverToken suture.ServiceToken + tokens map[string]suture.ServiceToken + clients map[string]*client.ProtocolClient + mut sync.RWMutex + invitations chan protocol.SessionInvitation +} + +func (s *relaySvc) VerifyConfiguration(from, to config.Configuration) error { + for _, addr := range to.Options.RelayServers { + _, err := url.Parse(addr) + if err != nil { + return err + } + } + return nil +} + +func (s *relaySvc) CommitConfiguration(from, to config.Configuration) bool { + existing := make(map[string]struct{}, len(to.Options.RelayServers)) + for _, addr := range to.Options.RelayServers { + uri, err := url.Parse(addr) + if err != nil { + if debugNet { + l.Debugln("Failed to parse relay address", addr, err) + } + continue + } + + existing[uri.String()] = struct{}{} + + _, ok := s.tokens[uri.String()] + if !ok { + if debugNet { + l.Debugln("Connecting to relay", uri) + } + c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations) + s.tokens[uri.String()] = s.Add(c) + s.mut.Lock() + s.clients[uri.String()] = c + s.mut.Unlock() + } + } + + for uri, token := range s.tokens { + _, ok := existing[uri] + if !ok { + err := s.Remove(token) + delete(s.tokens, uri) + s.mut.Lock() + delete(s.clients, uri) + s.mut.Unlock() + if debugNet { + l.Debugln("Disconnecting from relay", uri, err) + } + } + } + + return true +} + +func (s *relaySvc) ClientStatus() map[string]bool { + s.mut.RLock() + status := make(map[string]bool, len(s.clients)) + for uri, client := range s.clients { + status[uri] = client.StatusOK() + } + s.mut.RUnlock() + return status +} + +type invitationReceiver struct { + invitations chan protocol.SessionInvitation + tlsCfg *tls.Config + conns chan<- intermediateConnection + stop chan struct{} +} + +func (r *invitationReceiver) Serve() { + if r.stop != nil { + return + } + r.stop = make(chan struct{}) + + for { + select { + case inv := <-r.invitations: + if debugNet { + l.Debugln("Received relay invitation", inv) + } + conn, err := client.JoinSession(inv) + if err != nil { + if debugNet { + l.Debugf("Failed to join relay session %s: %v", inv, err) + } + continue + } + + setTCPOptions(conn.(*net.TCPConn)) + + var tc *tls.Conn + + if inv.ServerSocket { + tc = tls.Server(conn, r.tlsCfg) + } else { + tc = tls.Client(conn, r.tlsCfg) + } + err = tc.Handshake() + if err != nil { + l.Infof("TLS handshake (BEP/relay %s): %v", inv, err) + tc.Close() + continue + } + r.conns <- intermediateConnection{ + tc, model.ConnectionTypeRelayAccept, + } + case <-r.stop: + return + } + } +} + +func (r *invitationReceiver) Stop() { + if r.stop == nil { + return + } + r.stop <- struct{}{} + r.stop = nil +} diff --git a/lib/model/connection.go b/lib/model/connection.go index 6489e015..9fb79923 100644 --- a/lib/model/connection.go +++ b/lib/model/connection.go @@ -21,6 +21,8 @@ type Connection struct { const ( ConnectionTypeBasicAccept ConnectionType = iota ConnectionTypeBasicDial + ConnectionTypeRelayAccept + ConnectionTypeRelayDial ) type ConnectionType int @@ -31,6 +33,14 @@ func (t ConnectionType) String() string { return "basic-accept" case ConnectionTypeBasicDial: return "basic-dial" + case ConnectionTypeRelayAccept: + return "relay-accept" + case ConnectionTypeRelayDial: + return "relay-dial" } return "unknown" } + +func (t ConnectionType) IsDirect() bool { + return t == ConnectionTypeBasicAccept || t == ConnectionTypeBasicDial +}