Expose connection type and relay status in the UI

This commit is contained in:
Audrius Butkevicius
2015-07-17 21:22:07 +01:00
parent 2c0f8dc546
commit 8f2db99c86
15 changed files with 183 additions and 123 deletions

File diff suppressed because one or more lines are too long

View File

@@ -22,13 +22,14 @@ import (
"github.com/syncthing/syncthing/lib/beacon"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay"
"github.com/syncthing/syncthing/lib/sync"
)
type Discoverer struct {
myID protocol.DeviceID
listenAddrs []string
relays []Relay
relaySvc *relay.Svc
localBcastIntv time.Duration
localBcastStart time.Time
cacheLifetime time.Duration
@@ -56,11 +57,11 @@ var (
ErrIncorrectMagic = errors.New("incorrect magic number")
)
func NewDiscoverer(id protocol.DeviceID, addresses []string, relayAdresses []string) *Discoverer {
func NewDiscoverer(id protocol.DeviceID, addresses []string, relaySvc *relay.Svc) *Discoverer {
return &Discoverer{
myID: id,
listenAddrs: addresses,
relays: measureLatency(relayAdresses),
relaySvc: relaySvc,
localBcastIntv: 30 * time.Second,
cacheLifetime: 5 * time.Minute,
negCacheCutoff: 3 * time.Minute,
@@ -143,7 +144,7 @@ func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
}
d.extPort = extPort
pkt := d.announcementPkt()
pkt := d.announcementPkt(true)
wg := sync.NewWaitGroup()
clients := make(chan Client, len(servers))
for _, address := range servers {
@@ -317,49 +318,32 @@ func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
return devices
}
func (d *Discoverer) announcementPkt() *Announce {
func (d *Discoverer) announcementPkt(allowExternal bool) *Announce {
var addrs []string
if d.extPort != 0 {
if d.extPort != 0 && allowExternal {
addrs = []string{fmt.Sprintf("tcp://:%d", d.extPort)}
} else {
for _, aurl := range d.listenAddrs {
uri, err := url.Parse(aurl)
if err != nil {
if debug {
l.Debugf("discovery: failed to parse listen address %s: %s", aurl, err)
}
continue
addrs = resolveAddrs(d.listenAddrs)
}
relayAddrs := make([]string, 0)
if d.relaySvc != nil {
status := d.relaySvc.ClientStatus()
for uri, ok := range status {
if ok {
relayAddrs = append(relayAddrs, uri)
}
addr, err := net.ResolveTCPAddr("tcp", uri.Host)
if err != nil {
l.Warnln("discover: %v: not announcing %s", err, aurl)
continue
} else if debug {
l.Debugf("discover: resolved %s as %#v", aurl, uri.Host)
}
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
uri.Host = fmt.Sprintf(":%d", addr.Port)
} else if bs := addr.IP.To4(); bs != nil {
uri.Host = fmt.Sprintf("%s:%d", bs.String(), addr.Port)
} else if bs := addr.IP.To16(); bs != nil {
uri.Host = fmt.Sprintf("[%s]:%d", bs.String(), addr.Port)
}
addrs = append(addrs, uri.String())
}
}
return &Announce{
Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs, d.relays},
This: Device{d.myID[:], addrs, measureLatency(relayAddrs)},
}
}
func (d *Discoverer) sendLocalAnnouncements() {
var addrs = resolveAddrs(d.listenAddrs)
var pkt = Announce{
Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs, d.relays},
}
var pkt = d.announcementPkt(false)
msg := pkt.MustMarshalXDR()
for {

View File

@@ -7,11 +7,17 @@
package model
import (
"crypto/tls"
"net"
"github.com/syncthing/protocol"
)
type IntermediateConnection struct {
Conn *tls.Conn
ConnType ConnectionType
}
type Connection struct {
net.Conn
protocol.Connection

View File

@@ -219,6 +219,7 @@ type ConnectionInfo struct {
protocol.Statistics
Address string
ClientVersion string
Type ConnectionType
}
func (info ConnectionInfo) MarshalJSON() ([]byte, error) {
@@ -227,6 +228,7 @@ func (info ConnectionInfo) MarshalJSON() ([]byte, error) {
"inBytesTotal": info.InBytesTotal,
"outBytesTotal": info.OutBytesTotal,
"address": info.Address,
"type": info.Type.String(),
"clientVersion": info.ClientVersion,
})
}
@@ -249,6 +251,7 @@ func (m *Model) ConnectionStats() map[string]interface{} {
}
if addr := m.conn[device].RemoteAddr(); addr != nil {
ci.Address = addr.String()
ci.Type = conn.Type
}
conns[device.String()] = ci
@@ -585,6 +588,7 @@ func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
}
if conn, ok := m.conn[deviceID]; ok {
event["type"] = conn.Type.String()
addr := conn.RemoteAddr()
if addr != nil {
event["addr"] = addr.String()

View File

@@ -11,10 +11,12 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/calmh/du"
"github.com/syncthing/syncthing/lib/sync"
@@ -221,3 +223,21 @@ func DiskFreePercentage(path string) (freePct float64, err error) {
u, err := du.Get(path)
return (float64(u.FreeBytes) / float64(u.TotalBytes)) * 100, err
}
// SetTCPOptions sets syncthings default TCP options on a TCP connection
func SetTCPOptions(conn *net.TCPConn) error {
var err error
if err = conn.SetLinger(0); err != nil {
return err
}
if err = conn.SetNoDelay(false); err != nil {
return err
}
if err = conn.SetKeepAlivePeriod(60 * time.Second); err != nil {
return err
}
if err = conn.SetKeepAlive(true); err != nil {
return err
}
return nil
}

19
lib/relay/debug.go Normal file
View File

@@ -0,0 +1,19 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package relay
import (
"os"
"strings"
"github.com/calmh/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "relay") || os.Getenv("STTRACE") == "all"
l = logger.DefaultLogger
)

197
lib/relay/relay.go Normal file
View File

@@ -0,0 +1,197 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package relay
import (
"crypto/tls"
"net"
"net/url"
"time"
"github.com/syncthing/relaysrv/client"
"github.com/syncthing/relaysrv/protocol"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
)
func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.IntermediateConnection) *Svc {
svc := &Svc{
Supervisor: suture.New("Svc", suture.Spec{
Log: func(log string) {
if debug {
l.Infoln(log)
}
},
FailureBackoff: 5 * time.Minute,
FailureDecay: float64((10 * time.Minute) / time.Second),
FailureThreshold: 5,
}),
cfg: cfg,
tlsCfg: tlsCfg,
tokens: make(map[string]suture.ServiceToken),
clients: make(map[string]*client.ProtocolClient),
mut: sync.NewRWMutex(),
invitations: make(chan protocol.SessionInvitation),
}
rcfg := cfg.Raw()
svc.CommitConfiguration(rcfg, rcfg)
cfg.Subscribe(svc)
receiver := &invitationReceiver{
tlsCfg: tlsCfg,
conns: conns,
invitations: svc.invitations,
}
svc.receiverToken = svc.Add(receiver)
return svc
}
type Svc struct {
*suture.Supervisor
cfg *config.Wrapper
tlsCfg *tls.Config
receiverToken suture.ServiceToken
tokens map[string]suture.ServiceToken
clients map[string]*client.ProtocolClient
mut sync.RWMutex
invitations chan protocol.SessionInvitation
}
func (s *Svc) VerifyConfiguration(from, to config.Configuration) error {
for _, addr := range to.Options.RelayServers {
_, err := url.Parse(addr)
if err != nil {
return err
}
}
return nil
}
func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
existing := make(map[string]struct{}, len(to.Options.RelayServers))
for _, addr := range to.Options.RelayServers {
uri, err := url.Parse(addr)
if err != nil {
if debug {
l.Debugln("Failed to parse relay address", addr, err)
}
continue
}
existing[uri.String()] = struct{}{}
_, ok := s.tokens[uri.String()]
if !ok {
if debug {
l.Debugln("Connecting to relay", uri)
}
c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations)
s.tokens[uri.String()] = s.Add(c)
s.mut.Lock()
s.clients[uri.String()] = c
s.mut.Unlock()
}
}
for uri, token := range s.tokens {
_, ok := existing[uri]
if !ok {
err := s.Remove(token)
delete(s.tokens, uri)
s.mut.Lock()
delete(s.clients, uri)
s.mut.Unlock()
if debug {
l.Debugln("Disconnecting from relay", uri, err)
}
}
}
return true
}
func (s *Svc) ClientStatus() map[string]bool {
s.mut.RLock()
status := make(map[string]bool, len(s.clients))
for uri, client := range s.clients {
status[uri] = client.StatusOK()
}
s.mut.RUnlock()
return status
}
type invitationReceiver struct {
invitations chan protocol.SessionInvitation
tlsCfg *tls.Config
conns chan<- model.IntermediateConnection
stop chan struct{}
}
func (r *invitationReceiver) Serve() {
if r.stop != nil {
return
}
r.stop = make(chan struct{})
for {
select {
case inv := <-r.invitations:
if debug {
l.Debugln("Received relay invitation", inv)
}
conn, err := client.JoinSession(inv)
if err != nil {
if debug {
l.Debugf("Failed to join relay session %s: %v", inv, err)
}
continue
}
err = osutil.SetTCPOptions(conn.(*net.TCPConn))
if err != nil {
l.Infoln(err)
}
var tc *tls.Conn
if inv.ServerSocket {
tc = tls.Server(conn, r.tlsCfg)
} else {
tc = tls.Client(conn, r.tlsCfg)
}
err = tc.Handshake()
if err != nil {
l.Infof("TLS handshake (BEP/relay %s): %v", inv, err)
tc.Close()
continue
}
r.conns <- model.IntermediateConnection{
tc, model.ConnectionTypeRelayAccept,
}
case <-r.stop:
return
}
}
}
func (r *invitationReceiver) Stop() {
if r.stop == nil {
return
}
r.stop <- struct{}{}
r.stop = nil
}