diff --git a/cmd/stdiscosrv/querysrv.go b/cmd/stdiscosrv/querysrv.go index 1bd8739f..61c32117 100644 --- a/cmd/stdiscosrv/querysrv.go +++ b/cmd/stdiscosrv/querysrv.go @@ -19,9 +19,9 @@ import ( "time" "github.com/golang/groupcache/lru" - "github.com/juju/ratelimit" "github.com/syncthing/syncthing/lib/protocol" "golang.org/x/net/context" + "golang.org/x/time/rate" ) type querysrv struct { @@ -373,14 +373,14 @@ func (s *querysrv) limit(remote net.IP) bool { bkt, ok := s.limiter.Get(key) if ok { - bkt := bkt.(*ratelimit.Bucket) - if bkt.TakeAvailable(1) != 1 { + bkt := bkt.(*rate.Limiter) + if !bkt.Allow() { // Rate limit exceeded; ignore packet return true } } else { - // One packet per ten seconds average rate, burst ten packets - s.limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst))) + // limitAvg is in packets per ten seconds. + s.limiter.Add(key, rate.NewLimiter(rate.Limit(limitAvg)/10, limitBurst)) } return false diff --git a/cmd/strelaypoolsrv/main.go b/cmd/strelaypoolsrv/main.go index 9f349822..5fc792bb 100644 --- a/cmd/strelaypoolsrv/main.go +++ b/cmd/strelaypoolsrv/main.go @@ -23,14 +23,12 @@ import ( "time" "github.com/golang/groupcache/lru" - "github.com/juju/ratelimit" - "github.com/oschwald/geoip2-golang" - "github.com/syncthing/syncthing/cmd/strelaypoolsrv/auto" "github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/tlsutil" + "golang.org/x/time/rate" ) type location struct { @@ -65,12 +63,12 @@ var ( dir string evictionTime = time.Hour debug bool - getLRUSize = 10 << 10 - getLimitBurst int64 = 10 - getLimitAvg = 1 - postLRUSize = 1 << 10 - postLimitBurst int64 = 2 - postLimitAvg = 1 + getLRUSize = 10 << 10 + getLimitBurst = 10 + getLimitAvg = 1 + postLRUSize = 1 << 10 + postLimitBurst = 2 + postLimitAvg = 1 getLimit time.Duration postLimit time.Duration permRelaysFile string @@ -99,10 +97,10 @@ func main() { flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted") flag.IntVar(&getLRUSize, "get-limit-cache", getLRUSize, "Get request limiter cache size") flag.IntVar(&getLimitAvg, "get-limit-avg", 2, "Allowed average get request rate, per 10 s") - flag.Int64Var(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests") + flag.IntVar(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests") flag.IntVar(&postLRUSize, "post-limit-cache", postLRUSize, "Post request limiter cache size") flag.IntVar(&postLimitAvg, "post-limit-avg", 2, "Allowed average post request rate, per minute") - flag.Int64Var(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests") + flag.IntVar(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests") flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays") flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.") flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database") @@ -446,7 +444,7 @@ func evict(relay relay) func() { } } -func limit(addr string, cache *lru.Cache, lock sync.RWMutex, rate time.Duration, burst int64) bool { +func limit(addr string, cache *lru.Cache, lock sync.RWMutex, intv time.Duration, burst int) bool { host, _, err := net.SplitHostPort(addr) if err != nil { return false @@ -456,14 +454,14 @@ func limit(addr string, cache *lru.Cache, lock sync.RWMutex, rate time.Duration, bkt, ok := cache.Get(host) lock.RUnlock() if ok { - bkt := bkt.(*ratelimit.Bucket) - if bkt.TakeAvailable(1) != 1 { + bkt := bkt.(*rate.Limiter) + if !bkt.Allow() { // Rate limit return true } } else { lock.Lock() - cache.Add(host, ratelimit.NewBucket(rate, burst)) + cache.Add(host, rate.NewLimiter(rate.Every(intv), burst)) lock.Unlock() } return false diff --git a/cmd/strelaysrv/main.go b/cmd/strelaysrv/main.go index 6767d65b..80738cc9 100644 --- a/cmd/strelaysrv/main.go +++ b/cmd/strelaysrv/main.go @@ -20,10 +20,10 @@ import ( "syscall" "time" - "github.com/juju/ratelimit" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/tlsutil" + "golang.org/x/time/rate" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/nat" @@ -68,8 +68,8 @@ var ( globalLimitBps int overLimit int32 descriptorLimit int64 - sessionLimiter *ratelimit.Bucket - globalLimiter *ratelimit.Bucket + sessionLimiter *rate.Limiter + globalLimiter *rate.Limiter statusAddr string poolAddrs string @@ -215,10 +215,10 @@ func main() { } if sessionLimitBps > 0 { - sessionLimiter = ratelimit.NewBucketWithRate(float64(sessionLimitBps), int64(2*sessionLimitBps)) + sessionLimiter = rate.NewLimiter(rate.Limit(sessionLimitBps), 2*sessionLimitBps) } if globalLimitBps > 0 { - globalLimiter = ratelimit.NewBucketWithRate(float64(globalLimitBps), int64(2*globalLimitBps)) + globalLimiter = rate.NewLimiter(rate.Limit(globalLimitBps), 2*globalLimitBps) } if statusAddr != "" { diff --git a/cmd/strelaysrv/session.go b/cmd/strelaysrv/session.go index 7a54d14d..0ed0d2a9 100644 --- a/cmd/strelaysrv/session.go +++ b/cmd/strelaysrv/session.go @@ -7,15 +7,16 @@ import ( "encoding/hex" "fmt" "log" + "math" "net" "sync" "sync/atomic" "time" - "github.com/juju/ratelimit" - "github.com/syncthing/syncthing/lib/relay/protocol" + "golang.org/x/time/rate" syncthingprotocol "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/relay/protocol" ) var ( @@ -26,7 +27,7 @@ var ( bytesProxied int64 ) -func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *ratelimit.Bucket) *session { +func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *rate.Limiter) *session { serverkey := make([]byte, 32) _, err := rand.Read(serverkey) if err != nil { @@ -108,7 +109,7 @@ type session struct { clientkey []byte clientid syncthingprotocol.DeviceID - rateLimit func(bytes int64) + rateLimit func(bytes int) connsChan chan net.Conn conns []net.Conn @@ -268,7 +269,7 @@ func (s *session) proxy(c1, c2 net.Conn) error { } if s.rateLimit != nil { - s.rateLimit(int64(n)) + s.rateLimit(n) } c2.SetWriteDeadline(time.Now().Add(networkTimeout)) @@ -283,7 +284,7 @@ func (s *session) String() string { return fmt.Sprintf("<%s/%s>", hex.EncodeToString(s.clientkey)[:5], hex.EncodeToString(s.serverkey)[:5]) } -func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func(int64) { +func makeRateLimitFunc(sessionRateLimit, globalRateLimit *rate.Limiter) func(int) { // This may be a case of super duper premature optimization... We build an // optimized function to do the rate limiting here based on what we need // to do and then use it in the loop. @@ -298,29 +299,55 @@ func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func if sessionRateLimit == nil { // We only have a global limiter - return func(bytes int64) { - globalRateLimit.Wait(bytes) + return func(bytes int) { + take(bytes, globalRateLimit) } } if globalRateLimit == nil { // We only have a session limiter - return func(bytes int64) { - sessionRateLimit.Wait(bytes) + return func(bytes int) { + take(bytes, sessionRateLimit) } } // We have both. Queue the bytes on both the global and session specific - // rate limiters. Wait for both in parallell, so that the actual send - // happens when both conditions are satisfied. In practice this just means - // wait the longer of the two times. - return func(bytes int64) { - t0 := sessionRateLimit.Take(bytes) - t1 := globalRateLimit.Take(bytes) - if t0 > t1 { - time.Sleep(t0) - } else { - time.Sleep(t1) - } + // rate limiters. + return func(bytes int) { + take(bytes, sessionRateLimit, globalRateLimit) + } +} + +// take is a utility function to consume tokens from a set of rate.Limiters. +// Tokens are consumed in parallel on all limiters, respecting their +// individual burst sizes. +func take(tokens int, ls ...*rate.Limiter) { + // minBurst is the smallest burst size supported by all limiters. + minBurst := int(math.MaxInt32) + for _, l := range ls { + if burst := l.Burst(); burst < minBurst { + minBurst = burst + } + } + + for tokens > 0 { + // chunk is how many tokens we can consume at a time + chunk := tokens + if chunk > minBurst { + chunk = minBurst + } + + // maxDelay is the longest delay mandated by any of the limiters for + // the chosen chunk size. + var maxDelay time.Duration + for _, l := range ls { + res := l.ReserveN(time.Now(), chunk) + if del := res.Delay(); del > maxDelay { + maxDelay = del + } + } + + time.Sleep(maxDelay) + tokens -= chunk } } diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 8619367c..d3d37881 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -637,9 +637,6 @@ func syncthingMain(runtimeOptions RuntimeOptions) { }, } - // If the read or write rate should be limited, set up a rate limiter for it. - // This will be used on connections created in the connect and listen routines. - opts := cfg.Options() if !opts.SymlinksEnabled { diff --git a/gui/default/syncthing/core/aboutModalView.html b/gui/default/syncthing/core/aboutModalView.html index e490e430..89f5d539 100644 --- a/gui/default/syncthing/core/aboutModalView.html +++ b/gui/default/syncthing/core/aboutModalView.html @@ -23,7 +23,6 @@ Jakob Borg, Audrius Butkevicius, Alexander Graf, Anderson Mesquita, Antony Male,
  • bkaradzic/go-lz4, Copyright © 2011-2012 Branimir Karadzic, 2013 Damian Gryski.
  • kardianos/osext, Copyright © 2012 Daniel Theophanes.
  • golang/snappy, Copyright © 2011 The Snappy-Go Authors.
  • -
  • juju/ratelimit, Copyright © 2015 Canonical Ltd.
  • thejerf/suture, Copyright © 2014-2015 Barracuda Networks, Inc.
  • syndtr/goleveldb, Copyright © 2012, Suryandaru Triandana
  • vitrun/qart, Copyright © The Go Authors.
  • diff --git a/lib/connections/limitedreader.go b/lib/connections/limitedreader.go deleted file mode 100644 index e35a1595..00000000 --- a/lib/connections/limitedreader.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (C) 2014 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at http://mozilla.org/MPL/2.0/. - -package connections - -import ( - "io" - - "github.com/juju/ratelimit" -) - -type LimitedReader struct { - reader io.Reader - bucket *ratelimit.Bucket -} - -func NewReadLimiter(r io.Reader, b *ratelimit.Bucket) *LimitedReader { - return &LimitedReader{ - reader: r, - bucket: b, - } -} - -func (r *LimitedReader) Read(buf []byte) (int, error) { - n, err := r.reader.Read(buf) - if r.bucket != nil { - r.bucket.Wait(int64(n)) - } - return n, err -} diff --git a/lib/connections/limitedwriter.go b/lib/connections/limitedwriter.go deleted file mode 100644 index 95bef5eb..00000000 --- a/lib/connections/limitedwriter.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (C) 2014 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at http://mozilla.org/MPL/2.0/. - -package connections - -import ( - "io" - - "github.com/juju/ratelimit" -) - -type LimitedWriter struct { - writer io.Writer - bucket *ratelimit.Bucket -} - -func NewWriteLimiter(w io.Writer, b *ratelimit.Bucket) *LimitedWriter { - return &LimitedWriter{ - writer: w, - bucket: b, - } -} - -func (w *LimitedWriter) Write(buf []byte) (int, error) { - if w.bucket != nil { - w.bucket.Wait(int64(len(buf))) - } - return w.writer.Write(buf) -} diff --git a/lib/connections/limiter.go b/lib/connections/limiter.go new file mode 100644 index 00000000..f2a50eee --- /dev/null +++ b/lib/connections/limiter.go @@ -0,0 +1,164 @@ +// Copyright (C) 2017 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package connections + +import ( + "fmt" + "io" + "sync/atomic" + + "github.com/syncthing/syncthing/lib/config" + "golang.org/x/net/context" + "golang.org/x/time/rate" +) + +// limiter manages a read and write rate limit, reacting to config changes +// as appropriate. +type limiter struct { + write *rate.Limiter + read *rate.Limiter + limitsLAN atomicBool +} + +const limiterBurstSize = 4 * 128 << 10 + +func newLimiter(cfg *config.Wrapper) *limiter { + l := &limiter{ + write: rate.NewLimiter(rate.Inf, limiterBurstSize), + read: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + cfg.Subscribe(l) + prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}} + l.CommitConfiguration(prev, cfg.RawCopy()) + return l +} + +func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader { + return &limitedReader{reader: r, limiter: lim, isLAN: isLAN} +} + +func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer { + return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN} +} + +func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error { + return nil +} + +func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { + if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps && + from.Options.MaxSendKbps == to.Options.MaxSendKbps && + from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan { + return true + } + + // The rate variables are in KiB/s in the config (despite the camel casing + // of the name). We multiply by 1024 to get bytes/s. + + if to.Options.MaxRecvKbps <= 0 { + lim.read.SetLimit(rate.Inf) + } else { + lim.read.SetLimit(1024 * rate.Limit(to.Options.MaxRecvKbps)) + } + + if to.Options.MaxSendKbps < 0 { + lim.write.SetLimit(rate.Inf) + } else { + lim.write.SetLimit(1024 * rate.Limit(to.Options.MaxSendKbps)) + } + + lim.limitsLAN.set(to.Options.LimitBandwidthInLan) + + sendLimitStr := "is unlimited" + recvLimitStr := "is unlimited" + if to.Options.MaxSendKbps > 0 { + sendLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxSendKbps) + } + if to.Options.MaxRecvKbps > 0 { + recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps) + } + l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr) + + if to.Options.LimitBandwidthInLan { + l.Infoln("Rate limits apply to LAN connections") + } else { + l.Infoln("Rate limits do not apply to LAN connections") + } + + return true +} + +func (lim *limiter) String() string { + // required by config.Committer interface + return "connections.limiter" +} + +// limitedReader is a rate limited io.Reader +type limitedReader struct { + reader io.Reader + limiter *limiter + isLAN bool +} + +func (r *limitedReader) Read(buf []byte) (int, error) { + n, err := r.reader.Read(buf) + if !r.isLAN || r.limiter.limitsLAN.get() { + take(r.limiter.read, n) + } + return n, err +} + +// limitedWriter is a rate limited io.Writer +type limitedWriter struct { + writer io.Writer + limiter *limiter + isLAN bool +} + +func (w *limitedWriter) Write(buf []byte) (int, error) { + if !w.isLAN || w.limiter.limitsLAN.get() { + take(w.limiter.write, len(buf)) + } + return w.writer.Write(buf) +} + +// take is a utility function to consume tokens from a rate.Limiter. No call +// to WaitN can be larger than the limiter burst size so we split it up into +// several calls when necessary. +func take(l *rate.Limiter, tokens int) { + if tokens < limiterBurstSize { + // This is the by far more common case so we get it out of the way + // early. + l.WaitN(context.TODO(), tokens) + return + } + + for tokens > 0 { + // Consume limiterBurstSize tokens at a time until we're done. + if tokens > limiterBurstSize { + l.WaitN(context.TODO(), limiterBurstSize) + tokens -= limiterBurstSize + } else { + l.WaitN(context.TODO(), tokens) + tokens = 0 + } + } +} + +type atomicBool int32 + +func (b *atomicBool) set(v bool) { + if v { + atomic.StoreInt32((*int32)(b), 1) + } else { + atomic.StoreInt32((*int32)(b), 0) + } +} + +func (b *atomicBool) get() bool { + return atomic.LoadInt32((*int32)(b)) != 0 +} diff --git a/lib/connections/service.go b/lib/connections/service.go index 1b2a5076..16999214 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -10,12 +10,10 @@ import ( "crypto/tls" "errors" "fmt" - "io" "net" "net/url" "time" - "github.com/juju/ratelimit" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/events" @@ -29,6 +27,7 @@ import ( _ "github.com/syncthing/syncthing/lib/upnp" "github.com/thejerf/suture" + "golang.org/x/time/rate" ) var ( @@ -37,7 +36,7 @@ var ( ) const ( - perDeviceWarningRate = 1.0 / (15 * 60) // Once per 15 minutes + perDeviceWarningIntv = 15 * time.Minute tlsHandshakeTimeout = 10 * time.Second ) @@ -80,8 +79,7 @@ type Service struct { bepProtocolName string tlsDefaultCommonName string lans []*net.IPNet - writeRateLimit *ratelimit.Bucket - readRateLimit *ratelimit.Bucket + limiter *limiter natService *nat.Service natServiceToken *suture.ServiceToken @@ -112,6 +110,7 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg * bepProtocolName: bepProtocolName, tlsDefaultCommonName: tlsDefaultCommonName, lans: lans, + limiter: newLimiter(cfg), natService: nat.NewService(myID, cfg), listenersMut: sync.NewRWMutex(), @@ -135,17 +134,6 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg * } cfg.Subscribe(service) - // The rate variables are in KiB/s in the UI (despite the camel casing - // of the name). We multiply by 1024 here to get B/s. - options := service.cfg.Options() - if options.MaxSendKbps > 0 { - service.writeRateLimit = ratelimit.NewBucketWithRate(float64(1024*options.MaxSendKbps), int64(5*1024*options.MaxSendKbps)) - } - - if options.MaxRecvKbps > 0 { - service.readRateLimit = ratelimit.NewBucketWithRate(float64(1024*options.MaxRecvKbps), int64(5*1024*options.MaxRecvKbps)) - } - // There are several moving parts here; one routine per listening address // (handled in configuration changing) to handle incoming connections, // one routine to periodically attempt outgoing connections, one routine to @@ -279,20 +267,12 @@ next: continue next } - // If rate limiting is set, and based on the address we should - // limit the connection, then we wrap it in a limiter. - - limit := s.shouldLimit(c.RemoteAddr()) - - wr := io.Writer(c) - if limit && s.writeRateLimit != nil { - wr = NewWriteLimiter(c, s.writeRateLimit) - } - - rd := io.Reader(c) - if limit && s.readRateLimit != nil { - rd = NewReadLimiter(c, s.readRateLimit) - } + // Wrap the connection in rate limiters. The limiter itself will + // keep up with config changes to the rate and whether or not LAN + // connections are limited. + isLAN := s.isLAN(c.RemoteAddr()) + wr := s.limiter.newWriteLimiter(c, isLAN) + rd := s.limiter.newReadLimiter(c, isLAN) name := fmt.Sprintf("%s-%s (%s)", c.LocalAddr(), c.RemoteAddr(), c.Type()) protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression) @@ -434,21 +414,17 @@ func (s *Service) connect() { } } -func (s *Service) shouldLimit(addr net.Addr) bool { - if s.cfg.Options().LimitBandwidthInLan { - return true - } - +func (s *Service) isLAN(addr net.Addr) bool { tcpaddr, ok := addr.(*net.TCPAddr) if !ok { - return true + return false } for _, lan := range s.lans { if lan.Contains(tcpaddr.IP) { - return false + return true } } - return !tcpaddr.IP.IsLoopback() + return tcpaddr.IP.IsLoopback() } func (s *Service) createListener(factory listenerFactory, uri *url.URL) bool { @@ -644,7 +620,7 @@ func urlsToStrings(urls []*url.URL) []string { return strings } -var warningLimiters = make(map[protocol.DeviceID]*ratelimit.Bucket) +var warningLimiters = make(map[protocol.DeviceID]*rate.Limiter) var warningLimitersMut = sync.NewMutex() func warningFor(dev protocol.DeviceID, msg string) { @@ -652,10 +628,10 @@ func warningFor(dev protocol.DeviceID, msg string) { defer warningLimitersMut.Unlock() lim, ok := warningLimiters[dev] if !ok { - lim = ratelimit.NewBucketWithRate(perDeviceWarningRate, 1) + lim = rate.NewLimiter(rate.Every(perDeviceWarningIntv), 1) warningLimiters[dev] = lim } - if lim.TakeAvailable(1) == 1 { + if lim.Allow() { l.Warnln(msg) } } diff --git a/lib/model/model.go b/lib/model/model.go index 574915c8..b6267a2f 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -2438,6 +2438,9 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool { from.Options.ListenAddresses = to.Options.ListenAddresses from.Options.RelaysEnabled = to.Options.RelaysEnabled from.Options.UnackedNotificationIDs = to.Options.UnackedNotificationIDs + from.Options.MaxRecvKbps = to.Options.MaxRecvKbps + from.Options.MaxSendKbps = to.Options.MaxSendKbps + from.Options.LimitBandwidthInLan = to.Options.LimitBandwidthInLan // All of the other generic options require restart. Or at least they may; // removing this check requires going through those options carefully and // making sure there are individual services that handle them correctly. diff --git a/vendor/github.com/juju/ratelimit/LICENSE b/vendor/github.com/juju/ratelimit/LICENSE deleted file mode 100644 index ade9307b..00000000 --- a/vendor/github.com/juju/ratelimit/LICENSE +++ /dev/null @@ -1,191 +0,0 @@ -All files in this repository are licensed as follows. If you contribute -to this repository, it is assumed that you license your contribution -under the same license unless you state otherwise. - -All files Copyright (C) 2015 Canonical Ltd. unless otherwise specified in the file. - -This software is licensed under the LGPLv3, included below. - -As a special exception to the GNU Lesser General Public License version 3 -("LGPL3"), the copyright holders of this Library give you permission to -convey to a third party a Combined Work that links statically or dynamically -to this Library without providing any Minimal Corresponding Source or -Minimal Application Code as set out in 4d or providing the installation -information set out in section 4e, provided that you comply with the other -provisions of LGPL3 and provided that you meet, for the Application the -terms and conditions of the license(s) which apply to the Application. - -Except as stated in this special exception, the provisions of LGPL3 will -continue to comply in full to this Library. If you modify this Library, you -may apply this exception to your version of this Library, but you are not -obliged to do so. If you do not wish to do so, delete this exception -statement from your version. This exception does not (and cannot) modify any -license terms which apply to the Application, with which you must still -comply. - - - GNU LESSER GENERAL PUBLIC LICENSE - Version 3, 29 June 2007 - - Copyright (C) 2007 Free Software Foundation, Inc. - Everyone is permitted to copy and distribute verbatim copies - of this license document, but changing it is not allowed. - - - This version of the GNU Lesser General Public License incorporates -the terms and conditions of version 3 of the GNU General Public -License, supplemented by the additional permissions listed below. - - 0. Additional Definitions. - - As used herein, "this License" refers to version 3 of the GNU Lesser -General Public License, and the "GNU GPL" refers to version 3 of the GNU -General Public License. - - "The Library" refers to a covered work governed by this License, -other than an Application or a Combined Work as defined below. - - An "Application" is any work that makes use of an interface provided -by the Library, but which is not otherwise based on the Library. -Defining a subclass of a class defined by the Library is deemed a mode -of using an interface provided by the Library. - - A "Combined Work" is a work produced by combining or linking an -Application with the Library. The particular version of the Library -with which the Combined Work was made is also called the "Linked -Version". - - The "Minimal Corresponding Source" for a Combined Work means the -Corresponding Source for the Combined Work, excluding any source code -for portions of the Combined Work that, considered in isolation, are -based on the Application, and not on the Linked Version. - - The "Corresponding Application Code" for a Combined Work means the -object code and/or source code for the Application, including any data -and utility programs needed for reproducing the Combined Work from the -Application, but excluding the System Libraries of the Combined Work. - - 1. Exception to Section 3 of the GNU GPL. - - You may convey a covered work under sections 3 and 4 of this License -without being bound by section 3 of the GNU GPL. - - 2. Conveying Modified Versions. - - If you modify a copy of the Library, and, in your modifications, a -facility refers to a function or data to be supplied by an Application -that uses the facility (other than as an argument passed when the -facility is invoked), then you may convey a copy of the modified -version: - - a) under this License, provided that you make a good faith effort to - ensure that, in the event an Application does not supply the - function or data, the facility still operates, and performs - whatever part of its purpose remains meaningful, or - - b) under the GNU GPL, with none of the additional permissions of - this License applicable to that copy. - - 3. Object Code Incorporating Material from Library Header Files. - - The object code form of an Application may incorporate material from -a header file that is part of the Library. You may convey such object -code under terms of your choice, provided that, if the incorporated -material is not limited to numerical parameters, data structure -layouts and accessors, or small macros, inline functions and templates -(ten or fewer lines in length), you do both of the following: - - a) Give prominent notice with each copy of the object code that the - Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the object code with a copy of the GNU GPL and this license - document. - - 4. Combined Works. - - You may convey a Combined Work under terms of your choice that, -taken together, effectively do not restrict modification of the -portions of the Library contained in the Combined Work and reverse -engineering for debugging such modifications, if you also do each of -the following: - - a) Give prominent notice with each copy of the Combined Work that - the Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the Combined Work with a copy of the GNU GPL and this license - document. - - c) For a Combined Work that displays copyright notices during - execution, include the copyright notice for the Library among - these notices, as well as a reference directing the user to the - copies of the GNU GPL and this license document. - - d) Do one of the following: - - 0) Convey the Minimal Corresponding Source under the terms of this - License, and the Corresponding Application Code in a form - suitable for, and under terms that permit, the user to - recombine or relink the Application with a modified version of - the Linked Version to produce a modified Combined Work, in the - manner specified by section 6 of the GNU GPL for conveying - Corresponding Source. - - 1) Use a suitable shared library mechanism for linking with the - Library. A suitable mechanism is one that (a) uses at run time - a copy of the Library already present on the user's computer - system, and (b) will operate properly with a modified version - of the Library that is interface-compatible with the Linked - Version. - - e) Provide Installation Information, but only if you would otherwise - be required to provide such information under section 6 of the - GNU GPL, and only to the extent that such information is - necessary to install and execute a modified version of the - Combined Work produced by recombining or relinking the - Application with a modified version of the Linked Version. (If - you use option 4d0, the Installation Information must accompany - the Minimal Corresponding Source and Corresponding Application - Code. If you use option 4d1, you must provide the Installation - Information in the manner specified by section 6 of the GNU GPL - for conveying Corresponding Source.) - - 5. Combined Libraries. - - You may place library facilities that are a work based on the -Library side by side in a single library together with other library -facilities that are not Applications and are not covered by this -License, and convey such a combined library under terms of your -choice, if you do both of the following: - - a) Accompany the combined library with a copy of the same work based - on the Library, uncombined with any other library facilities, - conveyed under the terms of this License. - - b) Give prominent notice with the combined library that part of it - is a work based on the Library, and explaining where to find the - accompanying uncombined form of the same work. - - 6. Revised Versions of the GNU Lesser General Public License. - - The Free Software Foundation may publish revised and/or new versions -of the GNU Lesser General Public License from time to time. Such new -versions will be similar in spirit to the present version, but may -differ in detail to address new problems or concerns. - - Each version is given a distinguishing version number. If the -Library as you received it specifies that a certain numbered version -of the GNU Lesser General Public License "or any later version" -applies to it, you have the option of following the terms and -conditions either of that published version or of any later version -published by the Free Software Foundation. If the Library as you -received it does not specify a version number of the GNU Lesser -General Public License, you may choose any version of the GNU Lesser -General Public License ever published by the Free Software Foundation. - - If the Library as you received it specifies that a proxy can decide -whether future versions of the GNU Lesser General Public License shall -apply, that proxy's public statement of acceptance of any version is -permanent authorization for you to choose that version for the -Library. diff --git a/vendor/github.com/juju/ratelimit/README.md b/vendor/github.com/juju/ratelimit/README.md deleted file mode 100644 index a0fdfe2b..00000000 --- a/vendor/github.com/juju/ratelimit/README.md +++ /dev/null @@ -1,117 +0,0 @@ -# ratelimit --- - import "github.com/juju/ratelimit" - -The ratelimit package provides an efficient token bucket implementation. See -http://en.wikipedia.org/wiki/Token_bucket. - -## Usage - -#### func Reader - -```go -func Reader(r io.Reader, bucket *Bucket) io.Reader -``` -Reader returns a reader that is rate limited by the given token bucket. Each -token in the bucket represents one byte. - -#### func Writer - -```go -func Writer(w io.Writer, bucket *Bucket) io.Writer -``` -Writer returns a writer that is rate limited by the given token bucket. Each -token in the bucket represents one byte. - -#### type Bucket - -```go -type Bucket struct { -} -``` - -Bucket represents a token bucket that fills at a predetermined rate. Methods on -Bucket may be called concurrently. - -#### func NewBucket - -```go -func NewBucket(fillInterval time.Duration, capacity int64) *Bucket -``` -NewBucket returns a new token bucket that fills at the rate of one token every -fillInterval, up to the given maximum capacity. Both arguments must be positive. -The bucket is initially full. - -#### func NewBucketWithQuantum - -```go -func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket -``` -NewBucketWithQuantum is similar to NewBucket, but allows the specification of -the quantum size - quantum tokens are added every fillInterval. - -#### func NewBucketWithRate - -```go -func NewBucketWithRate(rate float64, capacity int64) *Bucket -``` -NewBucketWithRate returns a token bucket that fills the bucket at the rate of -rate tokens per second up to the given maximum capacity. Because of limited -clock resolution, at high rates, the actual rate may be up to 1% different from -the specified rate. - -#### func (*Bucket) Rate - -```go -func (tb *Bucket) Rate() float64 -``` -Rate returns the fill rate of the bucket, in tokens per second. - -#### func (*Bucket) Take - -```go -func (tb *Bucket) Take(count int64) time.Duration -``` -Take takes count tokens from the bucket without blocking. It returns the time -that the caller should wait until the tokens are actually available. - -Note that if the request is irrevocable - there is no way to return tokens to -the bucket once this method commits us to taking them. - -#### func (*Bucket) TakeAvailable - -```go -func (tb *Bucket) TakeAvailable(count int64) int64 -``` -TakeAvailable takes up to count immediately available tokens from the bucket. It -returns the number of tokens removed, or zero if there are no available tokens. -It does not block. - -#### func (*Bucket) TakeMaxDuration - -```go -func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) -``` -TakeMaxDuration is like Take, except that it will only take tokens from the -bucket if the wait time for the tokens is no greater than maxWait. - -If it would take longer than maxWait for the tokens to become available, it does -nothing and reports false, otherwise it returns the time that the caller should -wait until the tokens are actually available, and reports true. - -#### func (*Bucket) Wait - -```go -func (tb *Bucket) Wait(count int64) -``` -Wait takes count tokens from the bucket, waiting until they are available. - -#### func (*Bucket) WaitMaxDuration - -```go -func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool -``` -WaitMaxDuration is like Wait except that it will only take tokens from the -bucket if it needs to wait for no greater than maxWait. It reports whether any -tokens have been removed from the bucket If no tokens have been removed, it -returns immediately. diff --git a/vendor/github.com/juju/ratelimit/ratelimit.go b/vendor/github.com/juju/ratelimit/ratelimit.go deleted file mode 100644 index 3ef32fbc..00000000 --- a/vendor/github.com/juju/ratelimit/ratelimit.go +++ /dev/null @@ -1,245 +0,0 @@ -// Copyright 2014 Canonical Ltd. -// Licensed under the LGPLv3 with static-linking exception. -// See LICENCE file for details. - -// The ratelimit package provides an efficient token bucket implementation -// that can be used to limit the rate of arbitrary things. -// See http://en.wikipedia.org/wiki/Token_bucket. -package ratelimit - -import ( - "math" - "strconv" - "sync" - "time" -) - -// Bucket represents a token bucket that fills at a predetermined rate. -// Methods on Bucket may be called concurrently. -type Bucket struct { - startTime time.Time - capacity int64 - quantum int64 - fillInterval time.Duration - - // The mutex guards the fields following it. - mu sync.Mutex - - // avail holds the number of available tokens - // in the bucket, as of availTick ticks from startTime. - // It will be negative when there are consumers - // waiting for tokens. - avail int64 - availTick int64 -} - -// NewBucket returns a new token bucket that fills at the -// rate of one token every fillInterval, up to the given -// maximum capacity. Both arguments must be -// positive. The bucket is initially full. -func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { - return NewBucketWithQuantum(fillInterval, capacity, 1) -} - -// rateMargin specifes the allowed variance of actual -// rate from specified rate. 1% seems reasonable. -const rateMargin = 0.01 - -// NewBucketWithRate returns a token bucket that fills the bucket -// at the rate of rate tokens per second up to the given -// maximum capacity. Because of limited clock resolution, -// at high rates, the actual rate may be up to 1% different from the -// specified rate. -func NewBucketWithRate(rate float64, capacity int64) *Bucket { - for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) { - fillInterval := time.Duration(1e9 * float64(quantum) / rate) - if fillInterval <= 0 { - continue - } - tb := NewBucketWithQuantum(fillInterval, capacity, quantum) - if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin { - return tb - } - } - panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64)) -} - -// nextQuantum returns the next quantum to try after q. -// We grow the quantum exponentially, but slowly, so we -// get a good fit in the lower numbers. -func nextQuantum(q int64) int64 { - q1 := q * 11 / 10 - if q1 == q { - q1++ - } - return q1 -} - -// NewBucketWithQuantum is similar to NewBucket, but allows -// the specification of the quantum size - quantum tokens -// are added every fillInterval. -func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket { - if fillInterval <= 0 { - panic("token bucket fill interval is not > 0") - } - if capacity <= 0 { - panic("token bucket capacity is not > 0") - } - if quantum <= 0 { - panic("token bucket quantum is not > 0") - } - return &Bucket{ - startTime: time.Now(), - capacity: capacity, - quantum: quantum, - avail: capacity, - fillInterval: fillInterval, - } -} - -// Wait takes count tokens from the bucket, waiting until they are -// available. -func (tb *Bucket) Wait(count int64) { - if d := tb.Take(count); d > 0 { - time.Sleep(d) - } -} - -// WaitMaxDuration is like Wait except that it will -// only take tokens from the bucket if it needs to wait -// for no greater than maxWait. It reports whether -// any tokens have been removed from the bucket -// If no tokens have been removed, it returns immediately. -func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool { - d, ok := tb.TakeMaxDuration(count, maxWait) - if d > 0 { - time.Sleep(d) - } - return ok -} - -const infinityDuration time.Duration = 0x7fffffffffffffff - -// Take takes count tokens from the bucket without blocking. It returns -// the time that the caller should wait until the tokens are actually -// available. -// -// Note that if the request is irrevocable - there is no way to return -// tokens to the bucket once this method commits us to taking them. -func (tb *Bucket) Take(count int64) time.Duration { - d, _ := tb.take(time.Now(), count, infinityDuration) - return d -} - -// TakeMaxDuration is like Take, except that -// it will only take tokens from the bucket if the wait -// time for the tokens is no greater than maxWait. -// -// If it would take longer than maxWait for the tokens -// to become available, it does nothing and reports false, -// otherwise it returns the time that the caller should -// wait until the tokens are actually available, and reports -// true. -func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) { - return tb.take(time.Now(), count, maxWait) -} - -// TakeAvailable takes up to count immediately available tokens from the -// bucket. It returns the number of tokens removed, or zero if there are -// no available tokens. It does not block. -func (tb *Bucket) TakeAvailable(count int64) int64 { - return tb.takeAvailable(time.Now(), count) -} - -// takeAvailable is the internal version of TakeAvailable - it takes the -// current time as an argument to enable easy testing. -func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { - if count <= 0 { - return 0 - } - tb.mu.Lock() - defer tb.mu.Unlock() - - tb.adjust(now) - if tb.avail <= 0 { - return 0 - } - if count > tb.avail { - count = tb.avail - } - tb.avail -= count - return count -} - -// Available returns the number of available tokens. It will be negative -// when there are consumers waiting for tokens. Note that if this -// returns greater than zero, it does not guarantee that calls that take -// tokens from the buffer will succeed, as the number of available -// tokens could have changed in the meantime. This method is intended -// primarily for metrics reporting and debugging. -func (tb *Bucket) Available() int64 { - return tb.available(time.Now()) -} - -// available is the internal version of available - it takes the current time as -// an argument to enable easy testing. -func (tb *Bucket) available(now time.Time) int64 { - tb.mu.Lock() - defer tb.mu.Unlock() - tb.adjust(now) - return tb.avail -} - -// Capacity returns the capacity that the bucket was created with. -func (tb *Bucket) Capacity() int64 { - return tb.capacity -} - -// Rate returns the fill rate of the bucket, in tokens per second. -func (tb *Bucket) Rate() float64 { - return 1e9 * float64(tb.quantum) / float64(tb.fillInterval) -} - -// take is the internal version of Take - it takes the current time as -// an argument to enable easy testing. -func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { - if count <= 0 { - return 0, true - } - tb.mu.Lock() - defer tb.mu.Unlock() - - currentTick := tb.adjust(now) - avail := tb.avail - count - if avail >= 0 { - tb.avail = avail - return 0, true - } - // Round up the missing tokens to the nearest multiple - // of quantum - the tokens won't be available until - // that tick. - endTick := currentTick + (-avail+tb.quantum-1)/tb.quantum - endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) - waitTime := endTime.Sub(now) - if waitTime > maxWait { - return 0, false - } - tb.avail = avail - return waitTime, true -} - -// adjust adjusts the current bucket capacity based on the current time. -// It returns the current tick. -func (tb *Bucket) adjust(now time.Time) (currentTick int64) { - currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval) - - if tb.avail >= tb.capacity { - return - } - tb.avail += (currentTick - tb.availTick) * tb.quantum - if tb.avail > tb.capacity { - tb.avail = tb.capacity - } - tb.availTick = currentTick - return -} diff --git a/vendor/github.com/juju/ratelimit/ratelimit_test.go b/vendor/github.com/juju/ratelimit/ratelimit_test.go deleted file mode 100644 index 62d88ded..00000000 --- a/vendor/github.com/juju/ratelimit/ratelimit_test.go +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2014 Canonical Ltd. -// Licensed under the LGPLv3 with static-linking exception. -// See LICENCE file for details. - -package ratelimit - -import ( - "math" - "testing" - "time" - - gc "gopkg.in/check.v1" -) - -func TestPackage(t *testing.T) { - gc.TestingT(t) -} - -type rateLimitSuite struct{} - -var _ = gc.Suite(rateLimitSuite{}) - -type takeReq struct { - time time.Duration - count int64 - expectWait time.Duration -} - -var takeTests = []struct { - about string - fillInterval time.Duration - capacity int64 - reqs []takeReq -}{{ - about: "serial requests", - fillInterval: 250 * time.Millisecond, - capacity: 10, - reqs: []takeReq{{ - time: 0, - count: 0, - expectWait: 0, - }, { - time: 0, - count: 10, - expectWait: 0, - }, { - time: 0, - count: 1, - expectWait: 250 * time.Millisecond, - }, { - time: 250 * time.Millisecond, - count: 1, - expectWait: 250 * time.Millisecond, - }}, -}, { - about: "concurrent requests", - fillInterval: 250 * time.Millisecond, - capacity: 10, - reqs: []takeReq{{ - time: 0, - count: 10, - expectWait: 0, - }, { - time: 0, - count: 2, - expectWait: 500 * time.Millisecond, - }, { - time: 0, - count: 2, - expectWait: 1000 * time.Millisecond, - }, { - time: 0, - count: 1, - expectWait: 1250 * time.Millisecond, - }}, -}, { - about: "more than capacity", - fillInterval: 1 * time.Millisecond, - capacity: 10, - reqs: []takeReq{{ - time: 0, - count: 10, - expectWait: 0, - }, { - time: 20 * time.Millisecond, - count: 15, - expectWait: 5 * time.Millisecond, - }}, -}, { - about: "sub-quantum time", - fillInterval: 10 * time.Millisecond, - capacity: 10, - reqs: []takeReq{{ - time: 0, - count: 10, - expectWait: 0, - }, { - time: 7 * time.Millisecond, - count: 1, - expectWait: 3 * time.Millisecond, - }, { - time: 8 * time.Millisecond, - count: 1, - expectWait: 12 * time.Millisecond, - }}, -}, { - about: "within capacity", - fillInterval: 10 * time.Millisecond, - capacity: 5, - reqs: []takeReq{{ - time: 0, - count: 5, - expectWait: 0, - }, { - time: 60 * time.Millisecond, - count: 5, - expectWait: 0, - }, { - time: 60 * time.Millisecond, - count: 1, - expectWait: 10 * time.Millisecond, - }, { - time: 80 * time.Millisecond, - count: 2, - expectWait: 10 * time.Millisecond, - }}, -}} - -var availTests = []struct { - about string - capacity int64 - fillInterval time.Duration - take int64 - sleep time.Duration - - expectCountAfterTake int64 - expectCountAfterSleep int64 -}{{ - about: "should fill tokens after interval", - capacity: 5, - fillInterval: time.Second, - take: 5, - sleep: time.Second, - expectCountAfterTake: 0, - expectCountAfterSleep: 1, -}, { - about: "should fill tokens plus existing count", - capacity: 2, - fillInterval: time.Second, - take: 1, - sleep: time.Second, - expectCountAfterTake: 1, - expectCountAfterSleep: 2, -}, { - about: "shouldn't fill before interval", - capacity: 2, - fillInterval: 2 * time.Second, - take: 1, - sleep: time.Second, - expectCountAfterTake: 1, - expectCountAfterSleep: 1, -}, { - about: "should fill only once after 1*interval before 2*interval", - capacity: 2, - fillInterval: 2 * time.Second, - take: 1, - sleep: 3 * time.Second, - expectCountAfterTake: 1, - expectCountAfterSleep: 2, -}} - -func (rateLimitSuite) TestTake(c *gc.C) { - for i, test := range takeTests { - tb := NewBucket(test.fillInterval, test.capacity) - for j, req := range test.reqs { - d, ok := tb.take(tb.startTime.Add(req.time), req.count, infinityDuration) - c.Assert(ok, gc.Equals, true) - if d != req.expectWait { - c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait) - } - } - } -} - -func (rateLimitSuite) TestTakeMaxDuration(c *gc.C) { - for i, test := range takeTests { - tb := NewBucket(test.fillInterval, test.capacity) - for j, req := range test.reqs { - if req.expectWait > 0 { - d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait-1) - c.Assert(ok, gc.Equals, false) - c.Assert(d, gc.Equals, time.Duration(0)) - } - d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait) - c.Assert(ok, gc.Equals, true) - if d != req.expectWait { - c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait) - } - } - } -} - -type takeAvailableReq struct { - time time.Duration - count int64 - expect int64 -} - -var takeAvailableTests = []struct { - about string - fillInterval time.Duration - capacity int64 - reqs []takeAvailableReq -}{{ - about: "serial requests", - fillInterval: 250 * time.Millisecond, - capacity: 10, - reqs: []takeAvailableReq{{ - time: 0, - count: 0, - expect: 0, - }, { - time: 0, - count: 10, - expect: 10, - }, { - time: 0, - count: 1, - expect: 0, - }, { - time: 250 * time.Millisecond, - count: 1, - expect: 1, - }}, -}, { - about: "concurrent requests", - fillInterval: 250 * time.Millisecond, - capacity: 10, - reqs: []takeAvailableReq{{ - time: 0, - count: 5, - expect: 5, - }, { - time: 0, - count: 2, - expect: 2, - }, { - time: 0, - count: 5, - expect: 3, - }, { - time: 0, - count: 1, - expect: 0, - }}, -}, { - about: "more than capacity", - fillInterval: 1 * time.Millisecond, - capacity: 10, - reqs: []takeAvailableReq{{ - time: 0, - count: 10, - expect: 10, - }, { - time: 20 * time.Millisecond, - count: 15, - expect: 10, - }}, -}, { - about: "within capacity", - fillInterval: 10 * time.Millisecond, - capacity: 5, - reqs: []takeAvailableReq{{ - time: 0, - count: 5, - expect: 5, - }, { - time: 60 * time.Millisecond, - count: 5, - expect: 5, - }, { - time: 70 * time.Millisecond, - count: 1, - expect: 1, - }}, -}} - -func (rateLimitSuite) TestTakeAvailable(c *gc.C) { - for i, test := range takeAvailableTests { - tb := NewBucket(test.fillInterval, test.capacity) - for j, req := range test.reqs { - d := tb.takeAvailable(tb.startTime.Add(req.time), req.count) - if d != req.expect { - c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expect) - } - } - } -} - -func (rateLimitSuite) TestPanics(c *gc.C) { - c.Assert(func() { NewBucket(0, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0") - c.Assert(func() { NewBucket(-2, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0") - c.Assert(func() { NewBucket(1, 0) }, gc.PanicMatches, "token bucket capacity is not > 0") - c.Assert(func() { NewBucket(1, -2) }, gc.PanicMatches, "token bucket capacity is not > 0") -} - -func isCloseTo(x, y, tolerance float64) bool { - return math.Abs(x-y)/y < tolerance -} - -func (rateLimitSuite) TestRate(c *gc.C) { - tb := NewBucket(1, 1) - if !isCloseTo(tb.Rate(), 1e9, 0.00001) { - c.Fatalf("got %v want 1e9", tb.Rate()) - } - tb = NewBucket(2*time.Second, 1) - if !isCloseTo(tb.Rate(), 0.5, 0.00001) { - c.Fatalf("got %v want 0.5", tb.Rate()) - } - tb = NewBucketWithQuantum(100*time.Millisecond, 1, 5) - if !isCloseTo(tb.Rate(), 50, 0.00001) { - c.Fatalf("got %v want 50", tb.Rate()) - } -} - -func checkRate(c *gc.C, rate float64) { - tb := NewBucketWithRate(rate, 1<<62) - if !isCloseTo(tb.Rate(), rate, rateMargin) { - c.Fatalf("got %g want %v", tb.Rate(), rate) - } - d, ok := tb.take(tb.startTime, 1<<62, infinityDuration) - c.Assert(ok, gc.Equals, true) - c.Assert(d, gc.Equals, time.Duration(0)) - - // Check that the actual rate is as expected by - // asking for a not-quite multiple of the bucket's - // quantum and checking that the wait time - // correct. - d, ok = tb.take(tb.startTime, tb.quantum*2-tb.quantum/2, infinityDuration) - c.Assert(ok, gc.Equals, true) - expectTime := 1e9 * float64(tb.quantum) * 2 / rate - if !isCloseTo(float64(d), expectTime, rateMargin) { - c.Fatalf("rate %g: got %g want %v", rate, float64(d), expectTime) - } -} - -func (rateLimitSuite) TestNewWithRate(c *gc.C) { - for rate := float64(1); rate < 1e6; rate += 7 { - checkRate(c, rate) - } - for _, rate := range []float64{ - 1024 * 1024 * 1024, - 1e-5, - 0.9e-5, - 0.5, - 0.9, - 0.9e8, - 3e12, - 4e18, - } { - checkRate(c, rate) - checkRate(c, rate/3) - checkRate(c, rate*1.3) - } -} - -func TestAvailable(t *testing.T) { - for i, tt := range availTests { - tb := NewBucket(tt.fillInterval, tt.capacity) - if c := tb.takeAvailable(tb.startTime, tt.take); c != tt.take { - t.Fatalf("#%d: %s, take = %d, want = %d", i, tt.about, c, tt.take) - } - if c := tb.available(tb.startTime); c != tt.expectCountAfterTake { - t.Fatalf("#%d: %s, after take, available = %d, want = %d", i, tt.about, c, tt.expectCountAfterTake) - } - if c := tb.available(tb.startTime.Add(tt.sleep)); c != tt.expectCountAfterSleep { - t.Fatalf("#%d: %s, after some time it should fill in new tokens, available = %d, want = %d", - i, tt.about, c, tt.expectCountAfterSleep) - } - } - -} - -func BenchmarkWait(b *testing.B) { - tb := NewBucket(1, 16*1024) - for i := b.N - 1; i >= 0; i-- { - tb.Wait(1) - } -} diff --git a/vendor/github.com/juju/ratelimit/reader.go b/vendor/github.com/juju/ratelimit/reader.go deleted file mode 100644 index 6403bf78..00000000 --- a/vendor/github.com/juju/ratelimit/reader.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2014 Canonical Ltd. -// Licensed under the LGPLv3 with static-linking exception. -// See LICENCE file for details. - -package ratelimit - -import "io" - -type reader struct { - r io.Reader - bucket *Bucket -} - -// Reader returns a reader that is rate limited by -// the given token bucket. Each token in the bucket -// represents one byte. -func Reader(r io.Reader, bucket *Bucket) io.Reader { - return &reader{ - r: r, - bucket: bucket, - } -} - -func (r *reader) Read(buf []byte) (int, error) { - n, err := r.r.Read(buf) - if n <= 0 { - return n, err - } - r.bucket.Wait(int64(n)) - return n, err -} - -type writer struct { - w io.Writer - bucket *Bucket -} - -// Writer returns a reader that is rate limited by -// the given token bucket. Each token in the bucket -// represents one byte. -func Writer(w io.Writer, bucket *Bucket) io.Writer { - return &writer{ - w: w, - bucket: bucket, - } -} - -func (w *writer) Write(buf []byte) (int, error) { - w.bucket.Wait(int64(len(buf))) - return w.w.Write(buf) -} diff --git a/vendor/golang.org/x/time/rate/LICENSE b/vendor/golang.org/x/time/rate/LICENSE new file mode 100644 index 00000000..6a66aea5 --- /dev/null +++ b/vendor/golang.org/x/time/rate/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 00000000..938feaff --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,371 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "fmt" + "math" + "sync" + "time" + + "golang.org/x/net/context" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + if n > lim.burst && lim.limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait + t := time.NewTimer(r.DelayFrom(now)) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + return d.Seconds() * float64(limit) +} diff --git a/vendor/manifest b/vendor/manifest index ca62695b..28d1afa1 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -181,13 +181,6 @@ "revision": "3e333950771011fed13be63e62b9f473c5e0d9bf", "branch": "master" }, - { - "importpath": "github.com/juju/ratelimit", - "repository": "https://github.com/juju/ratelimit", - "vcs": "", - "revision": "77ed1c8a01217656d2080ad51981f6e99adaa177", - "branch": "master" - }, { "importpath": "github.com/kardianos/osext", "repository": "https://github.com/kardianos/osext", @@ -408,6 +401,15 @@ "revision": "a71fd10341b064c10f4a81ceac72bcf70f26ea34", "branch": "master", "path": "/unicode/norm" + }, + { + "importpath": "golang.org/x/time/rate", + "repository": "https://go.googlesource.com/time", + "vcs": "git", + "revision": "f51c12702a4d776e4c1fa9b0fabab841babae631", + "branch": "master", + "path": "/rate", + "notests": true } ] } \ No newline at end of file