diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 40152558..794fa239 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -539,7 +539,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0) systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog) - // Event subscription for the API; must start early to catch the early events. The LocalDiskUpdated + // Event subscription for the API; must start early to catch the early events. The LocalChangeDetected // event might overwhelm the event reciever in some situations so we will not subscribe to it here. apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected), 1000) diff --git a/lib/events/events.go b/lib/events/events.go index 5d44bb43..f0187b9b 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -111,16 +111,20 @@ func (t EventType) MarshalText() ([]byte, error) { const BufferSize = 64 type Logger struct { - subs []*Subscription - nextID int - mutex sync.Mutex + subs []*Subscription + nextSubscriptionIDs []int + nextGlobalID int + mutex sync.Mutex } type Event struct { - ID int `json:"id"` - Time time.Time `json:"time"` - Type EventType `json:"type"` - Data interface{} `json:"data"` + // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API + SubscriptionID int `json:"id"` + // Global ID of the event across all subscriptions + GlobalID int `json:"globalID"` + Time time.Time `json:"time"` + Type EventType `json:"type"` + Data interface{} `json:"data"` } type Subscription struct { @@ -144,16 +148,21 @@ func NewLogger() *Logger { func (l *Logger) Log(t EventType, data interface{}) { l.mutex.Lock() - dl.Debugln("log", l.nextID, t, data) - l.nextID++ + dl.Debugln("log", l.nextGlobalID, t, data) + l.nextGlobalID++ + e := Event{ - ID: l.nextID, - Time: time.Now(), - Type: t, - Data: data, + GlobalID: l.nextGlobalID, + Time: time.Now(), + Type: t, + Data: data, } - for _, s := range l.subs { + + for i, s := range l.subs { if s.mask&t != 0 { + e.SubscriptionID = l.nextSubscriptionIDs[i] + l.nextSubscriptionIDs[i]++ + select { case s.events <- e: default: @@ -182,6 +191,7 @@ func (l *Logger) Subscribe(mask EventType) *Subscription { } l.subs = append(l.subs, s) + l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1) l.mutex.Unlock() return s } @@ -192,9 +202,15 @@ func (l *Logger) Unsubscribe(s *Subscription) { for i, ss := range l.subs { if s == ss { last := len(l.subs) - 1 + l.subs[i] = l.subs[last] l.subs[last] = nil l.subs = l.subs[:last] + + l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last] + l.nextSubscriptionIDs[last] = 0 + l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last] + break } } @@ -234,7 +250,7 @@ type bufferedSubscription struct { sub *Subscription buf []Event next int - cur int + cur int // Current SubscriptionID mut sync.Mutex cond *stdsync.Cond } @@ -270,7 +286,7 @@ func (s *bufferedSubscription) pollingLoop() { s.mut.Lock() s.buf[s.next] = ev s.next = (s.next + 1) % len(s.buf) - s.cur = ev.ID + s.cur = ev.SubscriptionID s.cond.Broadcast() s.mut.Unlock() } @@ -285,12 +301,12 @@ func (s *bufferedSubscription) Since(id int, into []Event) []Event { } for i := s.next; i < len(s.buf); i++ { - if s.buf[i].ID > id { + if s.buf[i].SubscriptionID > id { into = append(into, s.buf[i]) } } for i := 0; i < s.next; i++ { - if s.buf[i].ID > id { + if s.buf[i].SubscriptionID > id { into = append(into, s.buf[i]) } } diff --git a/lib/events/events_test.go b/lib/events/events_test.go index e67e0df5..deac7d91 100644 --- a/lib/events/events_test.go +++ b/lib/events/events_test.go @@ -128,7 +128,7 @@ func TestUnsubscribe(t *testing.T) { } } -func TestIDs(t *testing.T) { +func TestGlobalIDs(t *testing.T) { l := events.NewLogger() s := l.Subscribe(events.AllEvents) @@ -144,7 +144,7 @@ func TestIDs(t *testing.T) { if ev.Data.(string) != "foo" { t.Fatal("Incorrect event:", ev) } - id := ev.ID + id := ev.GlobalID ev, err = s.Poll(timeout) if err != nil { @@ -153,8 +153,48 @@ func TestIDs(t *testing.T) { if ev.Data.(string) != "bar" { t.Fatal("Incorrect event:", ev) } - if ev.ID != id+1 { - t.Fatalf("ID not incremented (%d != %d)", ev.ID, id+1) + if ev.GlobalID != id+1 { + t.Fatalf("ID not incremented (%d != %d)", ev.GlobalID, id+1) + } +} + +func TestSubscriptionIDs(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.DeviceConnected) + defer l.Unsubscribe(s) + + l.Log(events.DeviceDisconnected, "a") + l.Log(events.DeviceConnected, "b") + l.Log(events.DeviceConnected, "c") + l.Log(events.DeviceDisconnected, "d") + + ev, err := s.Poll(timeout) + if err != nil { + t.Fatal("Unexpected error:", err) + } + + if ev.GlobalID != 2 { + t.Fatal("Incorrect GlobalID:", ev.GlobalID) + } + if ev.SubscriptionID != 1 { + t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID) + } + + ev, err = s.Poll(timeout) + if err != nil { + t.Fatal("Unexpected error:", err) + } + if ev.GlobalID != 3 { + t.Fatal("Incorrect GlobalID:", ev.GlobalID) + } + if ev.SubscriptionID != 2 { + t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID) + } + + ev, err = s.Poll(timeout) + if err != events.ErrTimeout { + t.Fatal("Unexpected error:", err) } } @@ -179,10 +219,10 @@ func TestBufferedSub(t *testing.T) { for recv < 10*events.BufferSize { evs := bs.Since(recv, nil) for _, ev := range evs { - if ev.ID != recv+1 { - t.Fatalf("Incorrect ID; %d != %d", ev.ID, recv+1) + if ev.GlobalID != recv+1 { + t.Fatalf("Incorrect ID; %d != %d", ev.GlobalID, recv+1) } - recv = ev.ID + recv = ev.GlobalID } } } @@ -213,10 +253,10 @@ func BenchmarkBufferedSub(b *testing.B) { for i := 0; i < b.N; { evs = bs.Since(recv, evs[:0]) for _, ev := range evs { - if ev.ID != recv+1 { - b.Fatal("skipped event", ev.ID, recv) + if ev.GlobalID != recv+1 { + b.Fatal("skipped event", ev.GlobalID, recv) } - recv = ev.ID + recv = ev.GlobalID coord <- struct{}{} } i += len(evs) @@ -237,3 +277,26 @@ func BenchmarkBufferedSub(b *testing.B) { <-done b.ReportAllocs() } + +func TestSinceUsesSubscriptionId(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.DeviceConnected) + defer l.Unsubscribe(s) + bs := events.NewBufferedSubscription(s, 10*events.BufferSize) + + l.Log(events.DeviceConnected, "a") // SubscriptionID = 1 + l.Log(events.DeviceDisconnected, "b") + l.Log(events.DeviceDisconnected, "c") + l.Log(events.DeviceConnected, "d") // SubscriptionID = 2 + + events := bs.Since(0, nil) + if len(events) != 2 { + t.Fatal("Incorrect number of events:", len(events)) + } + + events = bs.Since(1, nil) + if len(events) != 1 { + t.Fatal("Incorrect number of events:", len(events)) + } +}