lib/connections: Allow on the fly changes to rate limits (fixes #3846)

Also replaces github.com/juju/ratelimit with golang.org/x/time/rate as
the latter supports changing the rate on the fly.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3862
This commit is contained in:
Jakob Borg
2017-01-02 11:29:20 +00:00
committed by Audrius Butkevicius
parent 8c34a76f7a
commit ec62888539
19 changed files with 662 additions and 1156 deletions

View File

@@ -20,10 +20,10 @@ import (
"syscall"
"time"
"github.com/juju/ratelimit"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
"golang.org/x/time/rate"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/nat"
@@ -68,8 +68,8 @@ var (
globalLimitBps int
overLimit int32
descriptorLimit int64
sessionLimiter *ratelimit.Bucket
globalLimiter *ratelimit.Bucket
sessionLimiter *rate.Limiter
globalLimiter *rate.Limiter
statusAddr string
poolAddrs string
@@ -215,10 +215,10 @@ func main() {
}
if sessionLimitBps > 0 {
sessionLimiter = ratelimit.NewBucketWithRate(float64(sessionLimitBps), int64(2*sessionLimitBps))
sessionLimiter = rate.NewLimiter(rate.Limit(sessionLimitBps), 2*sessionLimitBps)
}
if globalLimitBps > 0 {
globalLimiter = ratelimit.NewBucketWithRate(float64(globalLimitBps), int64(2*globalLimitBps))
globalLimiter = rate.NewLimiter(rate.Limit(globalLimitBps), 2*globalLimitBps)
}
if statusAddr != "" {

View File

@@ -7,15 +7,16 @@ import (
"encoding/hex"
"fmt"
"log"
"math"
"net"
"sync"
"sync/atomic"
"time"
"github.com/juju/ratelimit"
"github.com/syncthing/syncthing/lib/relay/protocol"
"golang.org/x/time/rate"
syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/protocol"
)
var (
@@ -26,7 +27,7 @@ var (
bytesProxied int64
)
func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *ratelimit.Bucket) *session {
func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *rate.Limiter) *session {
serverkey := make([]byte, 32)
_, err := rand.Read(serverkey)
if err != nil {
@@ -108,7 +109,7 @@ type session struct {
clientkey []byte
clientid syncthingprotocol.DeviceID
rateLimit func(bytes int64)
rateLimit func(bytes int)
connsChan chan net.Conn
conns []net.Conn
@@ -268,7 +269,7 @@ func (s *session) proxy(c1, c2 net.Conn) error {
}
if s.rateLimit != nil {
s.rateLimit(int64(n))
s.rateLimit(n)
}
c2.SetWriteDeadline(time.Now().Add(networkTimeout))
@@ -283,7 +284,7 @@ func (s *session) String() string {
return fmt.Sprintf("<%s/%s>", hex.EncodeToString(s.clientkey)[:5], hex.EncodeToString(s.serverkey)[:5])
}
func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func(int64) {
func makeRateLimitFunc(sessionRateLimit, globalRateLimit *rate.Limiter) func(int) {
// This may be a case of super duper premature optimization... We build an
// optimized function to do the rate limiting here based on what we need
// to do and then use it in the loop.
@@ -298,29 +299,55 @@ func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func
if sessionRateLimit == nil {
// We only have a global limiter
return func(bytes int64) {
globalRateLimit.Wait(bytes)
return func(bytes int) {
take(bytes, globalRateLimit)
}
}
if globalRateLimit == nil {
// We only have a session limiter
return func(bytes int64) {
sessionRateLimit.Wait(bytes)
return func(bytes int) {
take(bytes, sessionRateLimit)
}
}
// We have both. Queue the bytes on both the global and session specific
// rate limiters. Wait for both in parallell, so that the actual send
// happens when both conditions are satisfied. In practice this just means
// wait the longer of the two times.
return func(bytes int64) {
t0 := sessionRateLimit.Take(bytes)
t1 := globalRateLimit.Take(bytes)
if t0 > t1 {
time.Sleep(t0)
} else {
time.Sleep(t1)
}
// rate limiters.
return func(bytes int) {
take(bytes, sessionRateLimit, globalRateLimit)
}
}
// take is a utility function to consume tokens from a set of rate.Limiters.
// Tokens are consumed in parallel on all limiters, respecting their
// individual burst sizes.
func take(tokens int, ls ...*rate.Limiter) {
// minBurst is the smallest burst size supported by all limiters.
minBurst := int(math.MaxInt32)
for _, l := range ls {
if burst := l.Burst(); burst < minBurst {
minBurst = burst
}
}
for tokens > 0 {
// chunk is how many tokens we can consume at a time
chunk := tokens
if chunk > minBurst {
chunk = minBurst
}
// maxDelay is the longest delay mandated by any of the limiters for
// the chosen chunk size.
var maxDelay time.Duration
for _, l := range ls {
res := l.ReserveN(time.Now(), chunk)
if del := res.Delay(); del > maxDelay {
maxDelay = del
}
}
time.Sleep(maxDelay)
tokens -= chunk
}
}