diff --git a/lib/config/config.go b/lib/config/config.go index a72c361c..9d7820b8 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -797,3 +797,12 @@ func filterURLSchemePrefix(addrs []string, prefix string) []string { } return addrs } + +// mapDeviceConfigs returns a map of device ID to device configuration for the given configuration. +func (cfg *Configuration) DeviceMap() map[protocol.DeviceID]DeviceConfiguration { + m := make(map[protocol.DeviceID]DeviceConfiguration, len(cfg.Devices)) + for _, dev := range cfg.Devices { + m[dev.DeviceID] = dev + } + return m +} diff --git a/lib/config/deviceconfiguration.go b/lib/config/deviceconfiguration.go index 9bb0e4d7..2011a14b 100644 --- a/lib/config/deviceconfiguration.go +++ b/lib/config/deviceconfiguration.go @@ -20,6 +20,8 @@ type DeviceConfiguration struct { Paused bool `xml:"paused" json:"paused"` AllowedNetworks []string `xml:"allowedNetwork,omitempty" json:"allowedNetworks"` AutoAcceptFolders bool `xml:"autoAcceptFolders" json:"autoAcceptFolders"` + MaxSendKbps int `xml:"maxSendKbps" json:"maxSendKbps"` + MaxRecvKbps int `xml:"maxRecvKbps" json:"maxRecvKbps"` } func NewDeviceConfiguration(id protocol.DeviceID, name string) DeviceConfiguration { diff --git a/lib/connections/limiter.go b/lib/connections/limiter.go index 4bd4c537..119aa39d 100644 --- a/lib/connections/limiter.go +++ b/lib/connections/limiter.go @@ -12,6 +12,8 @@ import ( "sync/atomic" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/sync" "golang.org/x/net/context" "golang.org/x/time/rate" ) @@ -19,30 +21,94 @@ import ( // limiter manages a read and write rate limit, reacting to config changes // as appropriate. type limiter struct { - write *rate.Limiter - read *rate.Limiter - limitsLAN atomicBool + write *rate.Limiter + read *rate.Limiter + limitsLAN atomicBool + deviceReadLimiters map[protocol.DeviceID]*rate.Limiter + deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter + mu sync.Mutex } const limiterBurstSize = 4 * 128 << 10 func newLimiter(cfg *config.Wrapper) *limiter { l := &limiter{ - write: rate.NewLimiter(rate.Inf, limiterBurstSize), - read: rate.NewLimiter(rate.Inf, limiterBurstSize), + write: rate.NewLimiter(rate.Inf, limiterBurstSize), + read: rate.NewLimiter(rate.Inf, limiterBurstSize), + mu: sync.NewMutex(), + deviceReadLimiters: make(map[protocol.DeviceID]*rate.Limiter), + deviceWriteLimiters: make(map[protocol.DeviceID]*rate.Limiter), } + cfg.Subscribe(l) prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}} + l.CommitConfiguration(prev, cfg.RawCopy()) return l } -func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader { - return &limitedReader{reader: r, limiter: lim, isLAN: isLAN} +// This function sets limiters according to corresponding DeviceConfiguration +func (lim *limiter) setLimitsLocked(device config.DeviceConfiguration) bool { + readLimiter := lim.getReadLimiterLocked(device.DeviceID) + writeLimiter := lim.getWriteLimiterLocked(device.DeviceID) + + // limiters for this device are created so we can store previous rates for logging + previousReadLimit := readLimiter.Limit() + previousWriteLimit := writeLimiter.Limit() + currentReadLimit := rate.Limit(device.MaxRecvKbps) * 1024 + currentWriteLimit := rate.Limit(device.MaxSendKbps) * 1024 + if device.MaxSendKbps <= 0 { + currentWriteLimit = rate.Inf + } + if device.MaxRecvKbps <= 0 { + currentReadLimit = rate.Inf + } + // Nothing about this device has changed. Start processing next device + if previousWriteLimit == currentWriteLimit && previousReadLimit == currentReadLimit { + return false + } + + readLimiter.SetLimit(currentReadLimit) + writeLimiter.SetLimit(currentWriteLimit) + + return true } -func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer { - return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN} +// This function handles removing, adding and updating of device limiters. +func (lim *limiter) processDevicesConfigurationLocked(from, to config.Configuration) { + seen := make(map[protocol.DeviceID]struct{}) + + // Mark devices which should not be removed, create new limiters if needed and assign new limiter rate + for _, dev := range to.Devices { + if dev.DeviceID == to.MyID { + // This limiter was created for local device. Should skip this device + continue + } + seen[dev.DeviceID] = struct{}{} + + if lim.setLimitsLocked(dev) { + readLimitStr := "is unlimited" + if dev.MaxRecvKbps > 0 { + readLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxRecvKbps) + } + writeLimitStr := "is unlimited" + if dev.MaxSendKbps > 0 { + writeLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxSendKbps) + } + + l.Infof("Device %s send rate %s, receive rate %s", dev.DeviceID, readLimitStr, writeLimitStr) + } + } + + // Delete remote devices which were removed in new configuration + for _, dev := range from.Devices { + if _, ok := seen[dev.DeviceID]; !ok { + l.Debugf("deviceID: %s should be removed", dev.DeviceID) + + delete(lim.deviceWriteLimiters, dev.DeviceID) + delete(lim.deviceReadLimiters, dev.DeviceID) + } + } } func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error { @@ -50,6 +116,13 @@ func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error { } func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { + // to ensure atomic update of configuration + lim.mu.Lock() + defer lim.mu.Unlock() + + // Delete, add or update limiters for devices + lim.processDevicesConfigurationLocked(from, to) + if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps && from.Options.MaxSendKbps == to.Options.MaxSendKbps && from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan { @@ -58,7 +131,6 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { // The rate variables are in KiB/s in the config (despite the camel casing // of the name). We multiply by 1024 to get bytes/s. - if to.Options.MaxRecvKbps <= 0 { lim.read.SetLimit(rate.Inf) } else { @@ -81,7 +153,7 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { if to.Options.MaxRecvKbps > 0 { recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps) } - l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr) + l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr) if to.Options.LimitBandwidthInLan { l.Infoln("Rate limits apply to LAN connections") @@ -97,53 +169,74 @@ func (lim *limiter) String() string { return "connections.limiter" } +func (lim *limiter) getLimiters(remoteID protocol.DeviceID, c internalConn, isLAN bool) (io.Reader, io.Writer) { + lim.mu.Lock() + wr := lim.newLimitedWriterLocked(remoteID, c, isLAN) + rd := lim.newLimitedReaderLocked(remoteID, c, isLAN) + lim.mu.Unlock() + return rd, wr +} + +func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader { + return &limitedReader{reader: r, limiter: lim, deviceLimiter: lim.getReadLimiterLocked(remoteID), isLAN: isLAN} +} + +func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer { + return &limitedWriter{writer: w, limiter: lim, deviceLimiter: lim.getWriteLimiterLocked(remoteID), isLAN: isLAN} +} + // limitedReader is a rate limited io.Reader type limitedReader struct { - reader io.Reader - limiter *limiter - isLAN bool + reader io.Reader + limiter *limiter + deviceLimiter *rate.Limiter + isLAN bool } func (r *limitedReader) Read(buf []byte) (int, error) { n, err := r.reader.Read(buf) if !r.isLAN || r.limiter.limitsLAN.get() { - take(r.limiter.read, n) + take(r.limiter.read, r.deviceLimiter, n) } return n, err } // limitedWriter is a rate limited io.Writer type limitedWriter struct { - writer io.Writer - limiter *limiter - isLAN bool + writer io.Writer + limiter *limiter + deviceLimiter *rate.Limiter + isLAN bool } func (w *limitedWriter) Write(buf []byte) (int, error) { if !w.isLAN || w.limiter.limitsLAN.get() { - take(w.limiter.write, len(buf)) + take(w.limiter.write, w.deviceLimiter, len(buf)) } return w.writer.Write(buf) } -// take is a utility function to consume tokens from a rate.Limiter. No call -// to WaitN can be larger than the limiter burst size so we split it up into +// take is a utility function to consume tokens from a overall rate.Limiter and deviceLimiter. +// No call to WaitN can be larger than the limiter burst size so we split it up into // several calls when necessary. -func take(l *rate.Limiter, tokens int) { +func take(overallLimiter, deviceLimiter *rate.Limiter, tokens int) { if tokens < limiterBurstSize { // This is the by far more common case so we get it out of the way // early. - l.WaitN(context.TODO(), tokens) + deviceLimiter.WaitN(context.TODO(), tokens) + overallLimiter.WaitN(context.TODO(), tokens) return } for tokens > 0 { // Consume limiterBurstSize tokens at a time until we're done. if tokens > limiterBurstSize { - l.WaitN(context.TODO(), limiterBurstSize) + deviceLimiter.WaitN(context.TODO(), limiterBurstSize) + overallLimiter.WaitN(context.TODO(), limiterBurstSize) tokens -= limiterBurstSize } else { - l.WaitN(context.TODO(), tokens) + deviceLimiter.WaitN(context.TODO(), tokens) + overallLimiter.WaitN(context.TODO(), tokens) tokens = 0 } } @@ -162,3 +255,26 @@ func (b *atomicBool) set(v bool) { func (b *atomicBool) get() bool { return atomic.LoadInt32((*int32)(b)) != 0 } + +// Utility functions for atomic operations on device limiters map +func (lim *limiter) getWriteLimiterLocked(deviceID protocol.DeviceID) *rate.Limiter { + limiter, ok := lim.deviceWriteLimiters[deviceID] + + if !ok { + limiter = rate.NewLimiter(rate.Inf, limiterBurstSize) + lim.deviceWriteLimiters[deviceID] = limiter + } + + return limiter +} + +func (lim *limiter) getReadLimiterLocked(deviceID protocol.DeviceID) *rate.Limiter { + limiter, ok := lim.deviceReadLimiters[deviceID] + + if !ok { + limiter = rate.NewLimiter(rate.Inf, limiterBurstSize) + lim.deviceReadLimiters[deviceID] = limiter + } + + return limiter +} diff --git a/lib/connections/limiter_test.go b/lib/connections/limiter_test.go new file mode 100644 index 00000000..c8a4f654 --- /dev/null +++ b/lib/connections/limiter_test.go @@ -0,0 +1,205 @@ +// Copyright (C) 2017 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package connections + +import ( + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/protocol" + "golang.org/x/time/rate" + "math/rand" + "testing" +) + +var device1, device2, device3, device4 protocol.DeviceID +var dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration + +func init() { + device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ") + device2, _ = protocol.DeviceIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY") + device3, _ = protocol.DeviceIDFromString("LGFPDIT-7SKNNJL-VJZA4FC-7QNCRKA-CE753K7-2BW5QDK-2FOZ7FR-FEP57QJ") + device4, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2") +} + +func initConfig() *config.Wrapper { + cfg := config.Wrap("/dev/null", config.New(device1)) + dev1Conf = config.NewDeviceConfiguration(device1, "device1") + dev2Conf = config.NewDeviceConfiguration(device2, "device2") + dev3Conf = config.NewDeviceConfiguration(device3, "device3") + dev4Conf = config.NewDeviceConfiguration(device4, "device4") + + dev2Conf.MaxRecvKbps = rand.Int() % 100000 + dev2Conf.MaxSendKbps = rand.Int() % 100000 + + waiter, _ := cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf}) + waiter.Wait() + return cfg +} + +func TestLimiterInit(t *testing.T) { + cfg := initConfig() + lim := newLimiter(cfg) + + device2ReadLimit := dev2Conf.MaxRecvKbps + device2WriteLimit := dev2Conf.MaxSendKbps + + expectedR := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(device2ReadLimit*1024), limiterBurstSize), + device3: rate.NewLimiter(rate.Inf, limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + + expectedW := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(device2WriteLimit*1024), limiterBurstSize), + device3: rate.NewLimiter(rate.Inf, limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + + actualR := lim.deviceReadLimiters + actualW := lim.deviceWriteLimiters + + checkActualAndExpected(t, actualR, actualW, expectedR, expectedW) +} + +func TestSetDeviceLimits(t *testing.T) { + cfg := initConfig() + lim := newLimiter(cfg) + + // should still be inf/inf because this is local device + dev1ReadLimit := rand.Int() % 100000 + dev1WriteLimit := rand.Int() % 100000 + dev1Conf.MaxRecvKbps = dev1ReadLimit + dev1Conf.MaxSendKbps = dev1WriteLimit + + dev2ReadLimit := rand.Int() % 100000 + dev2WriteLimit := rand.Int() % 100000 + dev2Conf.MaxRecvKbps = dev2ReadLimit + dev2Conf.MaxSendKbps = dev2WriteLimit + + dev3ReadLimit := rand.Int() % 10000 + dev3Conf.MaxRecvKbps = dev3ReadLimit + + waiter, _ := cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf}) + waiter.Wait() + + expectedR := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2ReadLimit*1024), limiterBurstSize), + device3: rate.NewLimiter(rate.Limit(dev3ReadLimit*1024), limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + expectedW := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2WriteLimit*1024), limiterBurstSize), + device3: rate.NewLimiter(rate.Inf, limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + + actualR := lim.deviceReadLimiters + actualW := lim.deviceWriteLimiters + + checkActualAndExpected(t, actualR, actualW, expectedR, expectedW) +} + +func TestRemoveDevice(t *testing.T) { + cfg := initConfig() + lim := newLimiter(cfg) + + waiter, _ := cfg.RemoveDevice(device3) + waiter.Wait() + expectedR := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + expectedW := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + } + actualR := lim.deviceReadLimiters + actualW := lim.deviceWriteLimiters + + checkActualAndExpected(t, actualR, actualW, expectedR, expectedW) +} + +func TestAddDevice(t *testing.T) { + cfg := initConfig() + lim := newLimiter(cfg) + + addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU") + addDevConf := config.NewDeviceConfiguration(addedDevice, "addedDevice") + addDevConf.MaxRecvKbps = 120 + addDevConf.MaxSendKbps = 240 + + waiter, _ := cfg.SetDevice(addDevConf) + waiter.Wait() + + expectedR := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize), + device3: rate.NewLimiter(rate.Inf, limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize), + } + + expectedW := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize), + device3: rate.NewLimiter(rate.Inf, limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize), + } + actualR := lim.deviceReadLimiters + actualW := lim.deviceWriteLimiters + + checkActualAndExpected(t, actualR, actualW, expectedR, expectedW) +} + +func TestAddAndRemove(t *testing.T) { + cfg := initConfig() + lim := newLimiter(cfg) + + addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU") + addDevConf := config.NewDeviceConfiguration(addedDevice, "addedDevice") + addDevConf.MaxRecvKbps = 120 + addDevConf.MaxSendKbps = 240 + + waiter, _ := cfg.SetDevice(addDevConf) + waiter.Wait() + waiter, _ = cfg.RemoveDevice(device3) + waiter.Wait() + + expectedR := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize), + } + + expectedW := map[protocol.DeviceID]*rate.Limiter{ + device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize), + device4: rate.NewLimiter(rate.Inf, limiterBurstSize), + addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize), + } + actualR := lim.deviceReadLimiters + actualW := lim.deviceWriteLimiters + + checkActualAndExpected(t, actualR, actualW, expectedR, expectedW) +} + +func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) { + t.Helper() + if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) { + t.Errorf("Map lengths differ!") + } + + for key, val := range expectedR { + if _, ok := actualR[key]; !ok { + t.Errorf("Device %s not found in limiter", key) + } + + if val.Limit() != actualR[key].Limit() { + t.Errorf("Read limits for device %s differ actual: %f, expected: %f", key, actualR[key].Limit(), val.Limit()) + } + if expectedW[key].Limit() != actualW[key].Limit() { + t.Errorf("Write limits for device %s differ actual: %f, expected: %f", key, actualW[key].Limit(), expectedW[key].Limit()) + } + } +} diff --git a/lib/connections/service.go b/lib/connections/service.go index 13e226ec..8bcd2f45 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -266,8 +266,7 @@ next: // keep up with config changes to the rate and whether or not LAN // connections are limited. isLAN := s.isLAN(c.RemoteAddr()) - wr := s.limiter.newWriteLimiter(c, isLAN) - rd := s.limiter.newReadLimiter(c, isLAN) + rd, wr := s.limiter.getLimiters(remoteID, c, isLAN) protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, c.String(), deviceCfg.Compression) modelConn := completeConn{c, protoConn} diff --git a/lib/model/model.go b/lib/model/model.go index 19d707f5..79bc4078 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -2625,8 +2625,8 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool { // clean residue device state that is not part of any folder. // Pausing a device, unpausing is handled by the connection service. - fromDevices := mapDeviceConfigs(from.Devices) - toDevices := mapDeviceConfigs(to.Devices) + fromDevices := from.DeviceMap() + toDevices := to.DeviceMap() for deviceID, toCfg := range toDevices { fromCfg, ok := fromDevices[deviceID] if !ok || fromCfg.Paused == toCfg.Paused { @@ -2715,16 +2715,6 @@ func mapDevices(devices []protocol.DeviceID) map[protocol.DeviceID]struct{} { return m } -// mapDeviceConfigs returns a map of device ID to device configuration for the given -// slice of folder configurations. -func mapDeviceConfigs(devices []config.DeviceConfiguration) map[protocol.DeviceID]config.DeviceConfiguration { - m := make(map[protocol.DeviceID]config.DeviceConfiguration, len(devices)) - for _, dev := range devices { - m[dev.DeviceID] = dev - } - return m -} - // Skips `skip` elements and retrieves up to `get` elements from a given slice. // Returns the resulting slice, plus how much elements are left to skip or // copy to satisfy the values which were provided, given the slice is not