lib/connections: Refactor
1. Removes separate relay lists and relay clients/services, just makes it a listen address 2. Easier plugging-in of other transports 3. Allows "hot" disabling and enabling NAT services 4. Allows "hot" listen address changes 5. Changes listen address list with a preferable "default" value just like for discovery 6. Debounces global discovery announcements as external addresses change (which it might alot upon starting) 7. Stops this whole "pick other peers relay by latency". This information is no longer available, but I don't think it matters as most of the time other peer only has one relay. 8. Rename ListenAddress to ListenAddresses, as well as in javascript land. 9. Stop serializing deprecated values to JSON GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/2982
This commit is contained in:
committed by
Jakob Borg
parent
09832abe50
commit
674fc566bb
@@ -24,7 +24,7 @@ var (
|
||||
type RelayClient interface {
|
||||
Serve()
|
||||
Stop()
|
||||
StatusOK() bool
|
||||
Error() error
|
||||
Latency() time.Duration
|
||||
String() string
|
||||
Invitations() chan protocol.SessionInvitation
|
||||
|
||||
@@ -25,6 +25,7 @@ type dynamicClient struct {
|
||||
timeout time.Duration
|
||||
|
||||
mut sync.RWMutex
|
||||
err error
|
||||
client RelayClient
|
||||
stop chan struct{}
|
||||
}
|
||||
@@ -61,6 +62,7 @@ func (c *dynamicClient) Serve() {
|
||||
data, err := http.Get(uri.String())
|
||||
if err != nil {
|
||||
l.Debugln(c, "failed to lookup dynamic relays", err)
|
||||
c.setError(err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -69,6 +71,7 @@ func (c *dynamicClient) Serve() {
|
||||
data.Body.Close()
|
||||
if err != nil {
|
||||
l.Debugln(c, "failed to lookup dynamic relays", err)
|
||||
c.setError(err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -89,6 +92,7 @@ func (c *dynamicClient) Serve() {
|
||||
select {
|
||||
case <-c.stop:
|
||||
l.Debugln(c, "stopping")
|
||||
c.setError(nil)
|
||||
return
|
||||
default:
|
||||
ruri, err := url.Parse(addr)
|
||||
@@ -112,6 +116,7 @@ func (c *dynamicClient) Serve() {
|
||||
}
|
||||
}
|
||||
l.Debugln(c, "could not find a connectable relay")
|
||||
c.setError(fmt.Errorf("could not find a connectable relay"))
|
||||
}
|
||||
|
||||
func (c *dynamicClient) Stop() {
|
||||
@@ -124,13 +129,13 @@ func (c *dynamicClient) Stop() {
|
||||
c.client.Stop()
|
||||
}
|
||||
|
||||
func (c *dynamicClient) StatusOK() bool {
|
||||
func (c *dynamicClient) Error() error {
|
||||
c.mut.RLock()
|
||||
defer c.mut.RUnlock()
|
||||
if c.client == nil {
|
||||
return false
|
||||
return c.err
|
||||
}
|
||||
return c.client.StatusOK()
|
||||
return c.client.Error()
|
||||
}
|
||||
|
||||
func (c *dynamicClient) Latency() time.Duration {
|
||||
@@ -150,7 +155,7 @@ func (c *dynamicClient) URI() *url.URL {
|
||||
c.mut.RLock()
|
||||
defer c.mut.RUnlock()
|
||||
if c.client == nil {
|
||||
return c.pooladdr
|
||||
return nil
|
||||
}
|
||||
return c.client.URI()
|
||||
}
|
||||
@@ -171,6 +176,12 @@ func (c *dynamicClient) cleanup() {
|
||||
c.mut.Unlock()
|
||||
}
|
||||
|
||||
func (c *dynamicClient) setError(err error) {
|
||||
c.mut.Lock()
|
||||
c.err = err
|
||||
c.mut.Unlock()
|
||||
}
|
||||
|
||||
// This is the announcement recieved from the relay server;
|
||||
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
|
||||
type dynamicAnnouncement struct {
|
||||
|
||||
@@ -32,6 +32,7 @@ type staticClient struct {
|
||||
conn *tls.Conn
|
||||
|
||||
mut sync.RWMutex
|
||||
err error
|
||||
connected bool
|
||||
latency time.Duration
|
||||
}
|
||||
@@ -57,8 +58,7 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro
|
||||
stop: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
|
||||
mut: sync.NewRWMutex(),
|
||||
connected: false,
|
||||
mut: sync.NewRWMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,6 +69,7 @@ func (c *staticClient) Serve() {
|
||||
|
||||
if err := c.connect(); err != nil {
|
||||
l.Infof("Could not connect to relay %s: %s", c.uri, err)
|
||||
c.setError(err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -77,12 +78,14 @@ func (c *staticClient) Serve() {
|
||||
if err := c.join(); err != nil {
|
||||
c.conn.Close()
|
||||
l.Infof("Could not join relay %s: %s", c.uri, err)
|
||||
c.setError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.conn.SetDeadline(time.Time{}); err != nil {
|
||||
c.conn.Close()
|
||||
l.Infoln("Relay set deadline:", err)
|
||||
c.setError(err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -109,6 +112,7 @@ func (c *staticClient) Serve() {
|
||||
case protocol.Ping:
|
||||
if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil {
|
||||
l.Infoln("Relay write:", err)
|
||||
c.setError(err)
|
||||
c.disconnect()
|
||||
} else {
|
||||
l.Debugln(c, "sent pong")
|
||||
@@ -123,15 +127,18 @@ func (c *staticClient) Serve() {
|
||||
|
||||
case protocol.RelayFull:
|
||||
l.Infof("Disconnected from relay %s due to it becoming full.", c.uri)
|
||||
c.setError(fmt.Errorf("Relay full"))
|
||||
c.disconnect()
|
||||
|
||||
default:
|
||||
l.Infoln("Relay: protocol error: unexpected message %v", msg)
|
||||
c.setError(fmt.Errorf("protocol error: unexpected message %v", msg))
|
||||
c.disconnect()
|
||||
}
|
||||
|
||||
case <-c.stop:
|
||||
l.Debugln(c, "stopping")
|
||||
c.setError(nil)
|
||||
c.disconnect()
|
||||
|
||||
// We always exit via this branch of the select, to make sure the
|
||||
@@ -144,6 +151,9 @@ func (c *staticClient) Serve() {
|
||||
c.conn.Close()
|
||||
c.connected = false
|
||||
l.Infof("Disconnecting from relay %s due to error: %s", c.uri, err)
|
||||
c.err = err
|
||||
} else {
|
||||
c.err = nil
|
||||
}
|
||||
if c.closeInvitationsOnFinish {
|
||||
close(c.invitations)
|
||||
@@ -155,6 +165,7 @@ func (c *staticClient) Serve() {
|
||||
case <-timeout.C:
|
||||
l.Debugln(c, "timed out")
|
||||
c.disconnect()
|
||||
c.setError(fmt.Errorf("timed out"))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -237,6 +248,19 @@ func (c *staticClient) disconnect() {
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *staticClient) setError(err error) {
|
||||
c.mut.Lock()
|
||||
c.err = err
|
||||
c.mut.Unlock()
|
||||
}
|
||||
|
||||
func (c *staticClient) Error() error {
|
||||
c.mut.RLock()
|
||||
err := c.err
|
||||
c.mut.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *staticClient) join() error {
|
||||
if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil {
|
||||
return err
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
// Copyright (C) 2015 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package relay
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
l = logger.DefaultLogger.NewFacility("relay", "Relay connection handling")
|
||||
)
|
||||
|
||||
func init() {
|
||||
l.SetDebug("relay", strings.Contains(os.Getenv("STTRACE"), "relay") || os.Getenv("STTRACE") == "all")
|
||||
}
|
||||
@@ -1,286 +0,0 @@
|
||||
// Copyright (C) 2015 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package relay
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
|
||||
"net/url"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/events"
|
||||
"github.com/syncthing/syncthing/lib/relay/client"
|
||||
"github.com/syncthing/syncthing/lib/relay/protocol"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
|
||||
"github.com/thejerf/suture"
|
||||
)
|
||||
|
||||
const (
|
||||
eventBroadcasterCheckInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
suture.Service
|
||||
Accept() *tls.Conn
|
||||
Relays() []string
|
||||
RelayStatus(uri string) (time.Duration, bool)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
*suture.Supervisor
|
||||
cfg *config.Wrapper
|
||||
tlsCfg *tls.Config
|
||||
|
||||
tokens map[string]suture.ServiceToken
|
||||
clients map[string]client.RelayClient
|
||||
mut sync.RWMutex
|
||||
invitations chan protocol.SessionInvitation
|
||||
conns chan *tls.Conn
|
||||
}
|
||||
|
||||
func NewService(cfg *config.Wrapper, tlsCfg *tls.Config) Service {
|
||||
conns := make(chan *tls.Conn)
|
||||
|
||||
service := &service{
|
||||
Supervisor: suture.New("Service", suture.Spec{
|
||||
Log: func(log string) {
|
||||
l.Debugln(log)
|
||||
},
|
||||
FailureBackoff: 5 * time.Minute,
|
||||
FailureDecay: float64((10 * time.Minute) / time.Second),
|
||||
FailureThreshold: 5,
|
||||
}),
|
||||
cfg: cfg,
|
||||
tlsCfg: tlsCfg,
|
||||
|
||||
tokens: make(map[string]suture.ServiceToken),
|
||||
clients: make(map[string]client.RelayClient),
|
||||
mut: sync.NewRWMutex(),
|
||||
invitations: make(chan protocol.SessionInvitation),
|
||||
conns: conns,
|
||||
}
|
||||
|
||||
rcfg := cfg.Raw()
|
||||
service.CommitConfiguration(rcfg, rcfg)
|
||||
cfg.Subscribe(service)
|
||||
|
||||
receiver := &invitationReceiver{
|
||||
tlsCfg: tlsCfg,
|
||||
conns: conns,
|
||||
invitations: service.invitations,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
eventBc := &eventBroadcaster{
|
||||
Service: service,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
service.Add(receiver)
|
||||
service.Add(eventBc)
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
func (s *service) VerifyConfiguration(from, to config.Configuration) error {
|
||||
for _, addr := range to.Options.RelayServers {
|
||||
_, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) CommitConfiguration(from, to config.Configuration) bool {
|
||||
existing := make(map[string]*url.URL, len(to.Options.RelayServers))
|
||||
|
||||
for _, addr := range to.Options.RelayServers {
|
||||
uri, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
l.Debugln("Failed to parse relay address", addr, err)
|
||||
continue
|
||||
}
|
||||
existing[uri.String()] = uri
|
||||
}
|
||||
|
||||
s.mut.Lock()
|
||||
|
||||
for key, uri := range existing {
|
||||
_, ok := s.tokens[key]
|
||||
if !ok {
|
||||
l.Debugln("Connecting to relay", uri)
|
||||
c, err := client.NewClient(uri, s.tlsCfg.Certificates, s.invitations, 10*time.Second)
|
||||
if err != nil {
|
||||
l.Infoln("Failed to connect to relay", uri, err)
|
||||
continue
|
||||
}
|
||||
s.tokens[key] = s.Add(c)
|
||||
s.clients[key] = c
|
||||
}
|
||||
}
|
||||
|
||||
for key, token := range s.tokens {
|
||||
_, ok := existing[key]
|
||||
if !ok {
|
||||
err := s.Remove(token)
|
||||
delete(s.tokens, key)
|
||||
delete(s.clients, key)
|
||||
l.Debugln("Disconnecting from relay", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
s.mut.Unlock()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
URL string
|
||||
OK bool
|
||||
Latency int
|
||||
}
|
||||
|
||||
// Relays return the list of relays that currently have an OK status.
|
||||
func (s *service) Relays() []string {
|
||||
if s == nil {
|
||||
// A nil client does not have a status, really. Yet we may be called
|
||||
// this way, for raisins...
|
||||
return nil
|
||||
}
|
||||
|
||||
s.mut.RLock()
|
||||
relays := make([]string, 0, len(s.clients))
|
||||
for _, client := range s.clients {
|
||||
relays = append(relays, client.URI().String())
|
||||
}
|
||||
s.mut.RUnlock()
|
||||
|
||||
sort.Strings(relays)
|
||||
|
||||
return relays
|
||||
}
|
||||
|
||||
// RelayStatus returns the latency and OK status for a given relay.
|
||||
func (s *service) RelayStatus(uri string) (time.Duration, bool) {
|
||||
if s == nil {
|
||||
// A nil client does not have a status, really. Yet we may be called
|
||||
// this way, for raisins...
|
||||
return time.Hour, false
|
||||
}
|
||||
|
||||
s.mut.RLock()
|
||||
defer s.mut.RUnlock()
|
||||
|
||||
for _, client := range s.clients {
|
||||
if client.URI().String() == uri {
|
||||
return client.Latency(), client.StatusOK()
|
||||
}
|
||||
}
|
||||
|
||||
return time.Hour, false
|
||||
}
|
||||
|
||||
// Accept returns a new *tls.Conn. The connection is already handshaken.
|
||||
func (s *service) Accept() *tls.Conn {
|
||||
return <-s.conns
|
||||
}
|
||||
|
||||
type invitationReceiver struct {
|
||||
invitations chan protocol.SessionInvitation
|
||||
tlsCfg *tls.Config
|
||||
conns chan<- *tls.Conn
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func (r *invitationReceiver) Serve() {
|
||||
for {
|
||||
select {
|
||||
case inv := <-r.invitations:
|
||||
l.Debugln("Received relay invitation", inv)
|
||||
conn, err := client.JoinSession(inv)
|
||||
if err != nil {
|
||||
l.Debugf("Failed to join relay session %s: %v", inv, err)
|
||||
continue
|
||||
}
|
||||
|
||||
var tc *tls.Conn
|
||||
|
||||
if inv.ServerSocket {
|
||||
tc = tls.Server(conn, r.tlsCfg)
|
||||
} else {
|
||||
tc = tls.Client(conn, r.tlsCfg)
|
||||
}
|
||||
err = tc.Handshake()
|
||||
if err != nil {
|
||||
l.Infof("TLS handshake (BEP/relay %s): %v", inv, err)
|
||||
tc.Close()
|
||||
continue
|
||||
}
|
||||
r.conns <- tc
|
||||
|
||||
case <-r.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *invitationReceiver) Stop() {
|
||||
close(r.stop)
|
||||
}
|
||||
|
||||
// The eventBroadcaster sends a RelayStateChanged event when the relay status
|
||||
// changes. We need this somewhat ugly polling mechanism as there's currently
|
||||
// no way to get the event feed directly from the relay lib. This may be
|
||||
// something to revisit later, possibly.
|
||||
type eventBroadcaster struct {
|
||||
Service Service
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func (e *eventBroadcaster) Serve() {
|
||||
timer := time.NewTicker(eventBroadcasterCheckInterval)
|
||||
defer timer.Stop()
|
||||
|
||||
var prevOKRelays []string
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
curOKRelays := e.Service.Relays()
|
||||
|
||||
changed := len(curOKRelays) != len(prevOKRelays)
|
||||
if !changed {
|
||||
for i := range curOKRelays {
|
||||
if curOKRelays[i] != prevOKRelays[i] {
|
||||
changed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
events.Default.Log(events.RelayStateChanged, map[string][]string{
|
||||
"old": prevOKRelays,
|
||||
"new": curOKRelays,
|
||||
})
|
||||
}
|
||||
|
||||
prevOKRelays = curOKRelays
|
||||
|
||||
case <-e.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *eventBroadcaster) Stop() {
|
||||
close(e.stop)
|
||||
}
|
||||
Reference in New Issue
Block a user