Limit number of connections (fixes #23)
This commit is contained in:
parent
567aaf87c6
commit
20b925abec
@ -107,6 +107,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) {
|
|||||||
|
|
||||||
switch msg := message.(type) {
|
switch msg := message.(type) {
|
||||||
case protocol.JoinRelayRequest:
|
case protocol.JoinRelayRequest:
|
||||||
|
if atomic.LoadInt32(&overLimit) > 0 {
|
||||||
|
protocol.WriteMessage(conn, protocol.RelayFull{})
|
||||||
|
if debug {
|
||||||
|
log.Println("Refusing join request from", id, "due to being over limits")
|
||||||
|
}
|
||||||
|
conn.Close()
|
||||||
|
limitCheckTimer.Reset(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
outboxesMut.RLock()
|
outboxesMut.RLock()
|
||||||
_, ok := outboxes[id]
|
_, ok := outboxes[id]
|
||||||
outboxesMut.RUnlock()
|
outboxesMut.RUnlock()
|
||||||
@ -223,6 +233,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) {
|
|||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if atomic.LoadInt32(&overLimit) > 0 && !hasSessions(id) {
|
||||||
|
if debug {
|
||||||
|
log.Println("Dropping", id, "as it has no sessions and we are over our limits")
|
||||||
|
}
|
||||||
|
protocol.WriteMessage(conn, protocol.RelayFull{})
|
||||||
|
conn.Close()
|
||||||
|
|
||||||
|
limitCheckTimer.Reset(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
case <-timeoutTicker.C:
|
case <-timeoutTicker.C:
|
||||||
// We should receive a error from the reader loop, which will cause
|
// We should receive a error from the reader loop, which will cause
|
||||||
// us to quit this loop.
|
// us to quit this loop.
|
||||||
|
|||||||
@ -10,10 +10,13 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/juju/ratelimit"
|
"github.com/juju/ratelimit"
|
||||||
|
"github.com/syncthing/syncthing/lib/osutil"
|
||||||
"github.com/syncthing/syncthing/lib/relay/protocol"
|
"github.com/syncthing/syncthing/lib/relay/protocol"
|
||||||
"github.com/syncthing/syncthing/lib/tlsutil"
|
"github.com/syncthing/syncthing/lib/tlsutil"
|
||||||
|
|
||||||
@ -31,8 +34,12 @@ var (
|
|||||||
pingInterval time.Duration = time.Minute
|
pingInterval time.Duration = time.Minute
|
||||||
messageTimeout time.Duration = time.Minute
|
messageTimeout time.Duration = time.Minute
|
||||||
|
|
||||||
|
limitCheckTimer *time.Timer
|
||||||
|
|
||||||
sessionLimitBps int
|
sessionLimitBps int
|
||||||
globalLimitBps int
|
globalLimitBps int
|
||||||
|
overLimit int32
|
||||||
|
descriptorLimit int64
|
||||||
sessionLimiter *ratelimit.Bucket
|
sessionLimiter *ratelimit.Bucket
|
||||||
globalLimiter *ratelimit.Bucket
|
globalLimiter *ratelimit.Bucket
|
||||||
|
|
||||||
@ -72,6 +79,17 @@ func main() {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maxDescriptors, err := osutil.MaximizeOpenFileLimit()
|
||||||
|
if maxDescriptors > 0 {
|
||||||
|
// Assume that 20% of FD's are leaked/unaccounted for.
|
||||||
|
descriptorLimit = int64(maxDescriptors*80) / 100
|
||||||
|
log.Println("Connection limit", descriptorLimit)
|
||||||
|
|
||||||
|
go monitorLimits()
|
||||||
|
} else if err != nil && runtime.GOOS != "windows" {
|
||||||
|
log.Println("Assuming no connection limit, due to error retrievign rlimits:", err)
|
||||||
|
}
|
||||||
|
|
||||||
sessionAddress = addr.IP[:]
|
sessionAddress = addr.IP[:]
|
||||||
sessionPort = uint16(addr.Port)
|
sessionPort = uint16(addr.Port)
|
||||||
|
|
||||||
@ -142,3 +160,16 @@ func main() {
|
|||||||
|
|
||||||
listener(listen, tlsCfg)
|
listener(listen, tlsCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func monitorLimits() {
|
||||||
|
limitCheckTimer = time.NewTimer(time.Minute)
|
||||||
|
for range limitCheckTimer.C {
|
||||||
|
if atomic.LoadInt64(&numConnections)+atomic.LoadInt64(&numProxies) > descriptorLimit {
|
||||||
|
atomic.StoreInt32(&overLimit, 1)
|
||||||
|
log.Println("Gone past our connection limits. Starting to refuse new/drop idle connections.")
|
||||||
|
} else if atomic.CompareAndSwapInt32(&overLimit, 1, 0) {
|
||||||
|
log.Println("Dropped below our connection limits. Accepting new connections.")
|
||||||
|
}
|
||||||
|
limitCheckTimer.Reset(time.Minute)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -86,6 +86,19 @@ func dropSessions(id syncthingprotocol.DeviceID) {
|
|||||||
sessionMut.RUnlock()
|
sessionMut.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hasSessions(id syncthingprotocol.DeviceID) bool {
|
||||||
|
sessionMut.RLock()
|
||||||
|
has := false
|
||||||
|
for _, session := range activeSessions {
|
||||||
|
if session.HasParticipant(id) {
|
||||||
|
has = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sessionMut.RUnlock()
|
||||||
|
return has
|
||||||
|
}
|
||||||
|
|
||||||
type session struct {
|
type session struct {
|
||||||
mut sync.Mutex
|
mut sync.Mutex
|
||||||
|
|
||||||
@ -127,7 +140,7 @@ func (s *session) Serve() {
|
|||||||
s.mut.Lock()
|
s.mut.Lock()
|
||||||
s.conns = append(s.conns, conn)
|
s.conns = append(s.conns, conn)
|
||||||
s.mut.Unlock()
|
s.mut.Unlock()
|
||||||
// We're the only ones mutating% s.conns, hence we are free to read it.
|
// We're the only ones mutating s.conns, hence we are free to read it.
|
||||||
if len(s.conns) < 2 {
|
if len(s.conns) < 2 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user