Add timeouts to relay methods

This commit is contained in:
Audrius Butkevicius
2015-11-23 21:14:46 +00:00
parent 45c1357bab
commit 431d51f5c4
6 changed files with 22 additions and 18 deletions

View File

@@ -11,7 +11,7 @@ import (
"github.com/syncthing/syncthing/lib/relay/protocol"
)
type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient
type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient
var (
supportedSchemes = map[string]relayClientFactory{
@@ -31,11 +31,11 @@ type RelayClient interface {
URI() *url.URL
}
func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) (RelayClient, error) {
func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) (RelayClient, error) {
factory, ok := supportedSchemes[uri.Scheme]
if !ok {
return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme)
}
return factory(uri, certs, invitations), nil
return factory(uri, certs, invitations, timeout), nil
}

View File

@@ -22,13 +22,14 @@ type dynamicClient struct {
certs []tls.Certificate
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
timeout time.Duration
mut sync.RWMutex
client RelayClient
stop chan struct{}
}
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient {
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
@@ -39,6 +40,7 @@ func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan pr
certs: certs,
invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
timeout: timeout,
mut: sync.NewRWMutex(),
}
@@ -94,7 +96,7 @@ func (c *dynamicClient) Serve() {
l.Debugln(c, "skipping relay", addr, err)
continue
}
client, err := NewClient(ruri, c.certs, c.invitations)
client, err := NewClient(ruri, c.certs, c.invitations, c.timeout)
if err != nil {
continue
}

View File

@@ -16,7 +16,7 @@ import (
"github.com/syncthing/syncthing/lib/relay/protocol"
)
func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs []tls.Certificate) (protocol.SessionInvitation, error) {
func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs []tls.Certificate, timeout time.Duration) (protocol.SessionInvitation, error) {
if uri.Scheme != "relay" {
return protocol.SessionInvitation{}, fmt.Errorf("Unsupported relay scheme: %v", uri.Scheme)
}
@@ -27,7 +27,7 @@ func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs [
}
conn := tls.Client(rconn, configForCerts(certs))
conn.SetDeadline(time.Now().Add(10 * time.Second))
conn.SetDeadline(time.Now().Add(timeout))
if err := performHandshakeAndValidation(conn, uri); err != nil {
return protocol.SessionInvitation{}, err
@@ -99,10 +99,10 @@ func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) {
}
}
func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool {
func TestRelay(uri *url.URL, certs []tls.Certificate, sleep, timeout time.Duration, times int) bool {
id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0])
invs := make(chan protocol.SessionInvitation, 1)
c, err := NewClient(uri, certs, invs)
c, err := NewClient(uri, certs, invs, timeout)
if err != nil {
close(invs)
return false
@@ -114,7 +114,7 @@ func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times
}()
for i := 0; i < times; i++ {
_, err := GetInvitationFromRelay(uri, id, certs)
_, err := GetInvitationFromRelay(uri, id, certs, timeout)
if err == nil {
return true
}

View File

@@ -22,7 +22,8 @@ type staticClient struct {
config *tls.Config
timeout time.Duration
messageTimeout time.Duration
connectTimeout time.Duration
stop chan struct{}
stopped chan struct{}
@@ -34,7 +35,7 @@ type staticClient struct {
latency time.Duration
}
func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient {
func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
@@ -49,7 +50,8 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro
config: configForCerts(certs),
timeout: time.Minute * 2,
messageTimeout: time.Minute * 2,
connectTimeout: timeout,
stop: make(chan struct{}),
stopped: make(chan struct{}),
@@ -95,12 +97,12 @@ func (c *staticClient) Serve() {
go messageReader(c.conn, messages, errors)
timeout := time.NewTimer(c.timeout)
timeout := time.NewTimer(c.messageTimeout)
for {
select {
case message := <-messages:
timeout.Reset(c.timeout)
timeout.Reset(c.messageTimeout)
l.Debugf("%s received message %T", c, message)
switch msg := message.(type) {
@@ -201,7 +203,7 @@ func (c *staticClient) connect() error {
return err
}
if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil {
if err := conn.SetDeadline(time.Now().Add(c.connectTimeout)); err != nil {
conn.Close()
return err
}