Files
syncthing-arm/lib/events/events.go
T

448 lines
9.9 KiB
Go
Raw Normal View History

2014-11-16 21:13:20 +01:00
// Copyright (C) 2014 The Syncthing Authors.
2014-09-29 21:43:32 +02:00
//
2015-03-07 21:36:35 +01:00
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
2014-07-25 14:50:14 +02:00
2014-07-13 21:07:24 +02:00
// Package events provides event subscription and polling functionality.
package events
import (
2017-11-22 23:25:55 +00:00
"encoding/json"
2014-07-13 21:07:24 +02:00
"errors"
"runtime"
2014-07-13 21:07:24 +02:00
"time"
2015-04-22 23:54:31 +01:00
2015-08-06 11:29:25 +02:00
"github.com/syncthing/syncthing/lib/sync"
2014-07-13 21:07:24 +02:00
)
2015-01-18 02:12:06 +01:00
type EventType int
2014-07-13 21:07:24 +02:00
const (
Starting EventType = 1 << iota
2014-07-13 21:07:24 +02:00
StartupComplete
DeviceDiscovered
DeviceConnected
DeviceDisconnected
DeviceRejected
2015-08-23 21:56:10 +02:00
DevicePaused
DeviceResumed
LocalChangeDetected
2016-12-21 16:35:20 +00:00
RemoteChangeDetected
2014-07-13 21:07:24 +02:00
LocalIndexUpdated
RemoteIndexUpdated
ItemStarted
2015-02-01 17:31:19 +00:00
ItemFinished
2014-07-17 13:38:36 +02:00
StateChanged
FolderRejected
2014-09-06 16:31:23 +01:00
ConfigSaved
2014-11-16 23:18:59 +00:00
DownloadProgress
RemoteDownloadProgress
FolderSummary
FolderCompletion
FolderErrors
2015-08-26 23:49:06 +01:00
FolderScanProgress
FolderPaused
FolderResumed
FolderWatchStateChanged
2016-05-04 19:38:12 +00:00
ListenAddressesChanged
LoginAttempt
2014-07-13 21:07:24 +02:00
2014-10-06 23:03:24 +01:00
AllEvents = (1 << iota) - 1
2014-07-13 21:07:24 +02:00
)
var runningTests = false
const eventLogTimeout = 15 * time.Millisecond
2014-07-13 21:07:24 +02:00
func (t EventType) String() string {
switch t {
2014-07-17 13:38:36 +02:00
case Starting:
return "Starting"
2014-07-13 21:07:24 +02:00
case StartupComplete:
return "StartupComplete"
case DeviceDiscovered:
return "DeviceDiscovered"
case DeviceConnected:
return "DeviceConnected"
case DeviceDisconnected:
return "DeviceDisconnected"
case DeviceRejected:
return "DeviceRejected"
case LocalChangeDetected:
return "LocalChangeDetected"
2016-12-21 16:35:20 +00:00
case RemoteChangeDetected:
return "RemoteChangeDetected"
2014-07-13 21:07:24 +02:00
case LocalIndexUpdated:
return "LocalIndexUpdated"
case RemoteIndexUpdated:
return "RemoteIndexUpdated"
case ItemStarted:
return "ItemStarted"
2015-02-01 17:31:19 +00:00
case ItemFinished:
return "ItemFinished"
2014-07-17 13:38:36 +02:00
case StateChanged:
return "StateChanged"
case FolderRejected:
return "FolderRejected"
2014-09-06 16:31:23 +01:00
case ConfigSaved:
return "ConfigSaved"
2014-11-16 23:18:59 +00:00
case DownloadProgress:
return "DownloadProgress"
case RemoteDownloadProgress:
return "RemoteDownloadProgress"
case FolderSummary:
return "FolderSummary"
case FolderCompletion:
return "FolderCompletion"
case FolderErrors:
return "FolderErrors"
2015-08-23 21:56:10 +02:00
case DevicePaused:
return "DevicePaused"
case DeviceResumed:
return "DeviceResumed"
2015-08-26 23:49:06 +01:00
case FolderScanProgress:
return "FolderScanProgress"
case FolderPaused:
return "FolderPaused"
case FolderResumed:
return "FolderResumed"
2016-05-04 19:38:12 +00:00
case ListenAddressesChanged:
return "ListenAddressesChanged"
case LoginAttempt:
return "LoginAttempt"
case FolderWatchStateChanged:
return "FolderWatchStateChanged"
2014-07-13 21:07:24 +02:00
default:
return "Unknown"
}
}
func (t EventType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
2017-11-22 23:25:55 +00:00
func (t *EventType) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
*t = UnmarshalEventType(s)
return nil
}
func UnmarshalEventType(s string) EventType {
switch s {
case "Starting":
return Starting
case "StartupComplete":
return StartupComplete
case "DeviceDiscovered":
return DeviceDiscovered
case "DeviceConnected":
return DeviceConnected
case "DeviceDisconnected":
return DeviceDisconnected
case "DeviceRejected":
return DeviceRejected
case "LocalChangeDetected":
return LocalChangeDetected
case "RemoteChangeDetected":
return RemoteChangeDetected
case "LocalIndexUpdated":
return LocalIndexUpdated
case "RemoteIndexUpdated":
return RemoteIndexUpdated
case "ItemStarted":
return ItemStarted
case "ItemFinished":
return ItemFinished
case "StateChanged":
return StateChanged
case "FolderRejected":
return FolderRejected
case "ConfigSaved":
return ConfigSaved
case "DownloadProgress":
return DownloadProgress
case "RemoteDownloadProgress":
return RemoteDownloadProgress
case "FolderSummary":
return FolderSummary
case "FolderCompletion":
return FolderCompletion
case "FolderErrors":
return FolderErrors
case "DevicePaused":
return DevicePaused
case "DeviceResumed":
return DeviceResumed
case "FolderScanProgress":
return FolderScanProgress
case "FolderPaused":
return FolderPaused
case "FolderResumed":
return FolderResumed
case "ListenAddressesChanged":
return ListenAddressesChanged
case "LoginAttempt":
return LoginAttempt
case "FolderWatchStateChanged":
return FolderWatchStateChanged
default:
return 0
}
}
2014-07-13 21:07:24 +02:00
const BufferSize = 64
type Logger struct {
subs []*Subscription
nextSubscriptionIDs []int
nextGlobalID int
timeout *time.Timer
mutex sync.Mutex
2014-07-13 21:07:24 +02:00
}
type Event struct {
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
SubscriptionID int `json:"id"`
// Global ID of the event across all subscriptions
GlobalID int `json:"globalID"`
Time time.Time `json:"time"`
Type EventType `json:"type"`
Data interface{} `json:"data"`
2014-07-13 21:07:24 +02:00
}
type Subscription struct {
mask EventType
events chan Event
timeout *time.Timer
2014-07-13 21:07:24 +02:00
}
var Default = NewLogger()
var (
ErrTimeout = errors.New("timeout")
ErrClosed = errors.New("closed")
)
func NewLogger() *Logger {
l := &Logger{
mutex: sync.NewMutex(),
timeout: time.NewTimer(time.Second),
}
// Make sure the timer is in the stopped state and hasn't fired anything
// into the channel.
if !l.timeout.Stop() {
<-l.timeout.C
2014-07-13 21:07:24 +02:00
}
return l
2014-07-13 21:07:24 +02:00
}
func (l *Logger) Log(t EventType, data interface{}) {
l.mutex.Lock()
l.nextGlobalID++
dl.Debugln("log", l.nextGlobalID, t, data)
2014-07-13 21:07:24 +02:00
e := Event{
GlobalID: l.nextGlobalID,
Time: time.Now(),
Type: t,
Data: data,
2014-07-13 21:07:24 +02:00
}
for i, s := range l.subs {
2014-07-13 21:07:24 +02:00
if s.mask&t != 0 {
e.SubscriptionID = l.nextSubscriptionIDs[i]
l.nextSubscriptionIDs[i]++
l.timeout.Reset(eventLogTimeout)
timedOut := false
2014-07-13 21:07:24 +02:00
select {
case s.events <- e:
case <-l.timeout.C:
2014-10-06 23:03:24 +01:00
// if s.events is not ready, drop the event
timedOut = true
}
// If stop returns false it already sent something to the
// channel. If we didn't already read it above we must do so now
// or we get a spurious timeout on the next loop.
if !l.timeout.Stop() && !timedOut {
<-l.timeout.C
2014-07-13 21:07:24 +02:00
}
}
}
l.mutex.Unlock()
}
func (l *Logger) Subscribe(mask EventType) *Subscription {
l.mutex.Lock()
dl.Debugln("subscribe", mask)
2015-11-17 12:03:18 +01:00
2014-07-13 21:07:24 +02:00
s := &Subscription{
mask: mask,
events: make(chan Event, BufferSize),
timeout: time.NewTimer(0),
2014-07-13 21:07:24 +02:00
}
2015-11-17 12:03:18 +01:00
// We need to create the timeout timer in the stopped, non-fired state so
// that Subscription.Poll() can safely reset it and select on the timeout
// channel. This ensures the timer is stopped and the channel drained.
if runningTests {
// Make the behavior stable when running tests to avoid randomly
// varying test coverage. This ensures, in practice if not in
// theory, that the timer fires and we take the true branch of the
// next if.
runtime.Gosched()
}
2015-11-17 12:03:18 +01:00
if !s.timeout.Stop() {
<-s.timeout.C
}
l.subs = append(l.subs, s)
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
2014-07-13 21:07:24 +02:00
l.mutex.Unlock()
return s
}
func (l *Logger) Unsubscribe(s *Subscription) {
l.mutex.Lock()
dl.Debugln("unsubscribe")
for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1
l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]
l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
l.nextSubscriptionIDs[last] = 0
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
break
}
}
2014-07-13 21:07:24 +02:00
close(s.events)
l.mutex.Unlock()
}
// Poll returns an event from the subscription or an error if the poll times
// out of the event channel is closed. Poll should not be called concurrently
// from multiple goroutines for a single subscription.
2014-07-13 21:07:24 +02:00
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
dl.Debugln("poll", timeout)
2014-07-25 14:50:14 +02:00
2015-11-17 12:03:18 +01:00
s.timeout.Reset(timeout)
2015-08-24 09:38:39 +02:00
2014-07-13 21:07:24 +02:00
select {
case e, ok := <-s.events:
if !ok {
return e, ErrClosed
}
if runningTests {
// Make the behavior stable when running tests to avoid randomly
// varying test coverage. This ensures, in practice if not in
// theory, that the timer fires and we take the true branch of
// the next if.
s.timeout.Reset(0)
runtime.Gosched()
}
2015-11-17 12:03:18 +01:00
if !s.timeout.Stop() {
// The timeout must be stopped and possibly drained to be ready
// for reuse in the next call.
<-s.timeout.C
}
2014-07-13 21:07:24 +02:00
return e, nil
case <-s.timeout.C:
2014-07-13 21:07:24 +02:00
return Event{}, ErrTimeout
}
}
func (s *Subscription) C() <-chan Event {
return s.events
}
type bufferedSubscription struct {
2014-07-13 21:07:24 +02:00
sub *Subscription
buf []Event
next int
cur int // Current SubscriptionID
2014-07-13 21:07:24 +02:00
mut sync.Mutex
cond *sync.TimeoutCond
2014-07-13 21:07:24 +02:00
}
type BufferedSubscription interface {
Since(id int, into []Event, timeout time.Duration) []Event
}
func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
bs := &bufferedSubscription{
2014-07-13 21:07:24 +02:00
sub: s,
buf: make([]Event, size),
2015-04-22 23:54:31 +01:00
mut: sync.NewMutex(),
2014-07-13 21:07:24 +02:00
}
bs.cond = sync.NewTimeoutCond(bs.mut)
2014-07-13 21:07:24 +02:00
go bs.pollingLoop()
return bs
}
func (s *bufferedSubscription) pollingLoop() {
for ev := range s.sub.C() {
2014-07-13 21:07:24 +02:00
s.mut.Lock()
s.buf[s.next] = ev
s.next = (s.next + 1) % len(s.buf)
s.cur = ev.SubscriptionID
2014-07-13 21:07:24 +02:00
s.cond.Broadcast()
s.mut.Unlock()
}
}
func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
2014-07-13 21:07:24 +02:00
s.mut.Lock()
defer s.mut.Unlock()
// Check once first before generating the TimeoutCondWaiter
if id >= s.cur {
waiter := s.cond.SetupWait(timeout)
defer waiter.Stop()
for id >= s.cur {
if eventsAvailable := waiter.Wait(); !eventsAvailable {
// Timed out
return into
}
}
2014-07-13 21:07:24 +02:00
}
for i := s.next; i < len(s.buf); i++ {
if s.buf[i].SubscriptionID > id {
2014-07-13 21:07:24 +02:00
into = append(into, s.buf[i])
}
}
for i := 0; i < s.next; i++ {
if s.buf[i].SubscriptionID > id {
2014-07-13 21:07:24 +02:00
into = append(into, s.buf[i])
}
}
return into
}
// Error returns a string pointer suitable for JSON marshalling errors. It
2015-11-11 21:20:34 -05:00
// retains the "null on success" semantics, but ensures the error result is a
// string regardless of the underlying concrete error type.
func Error(err error) *string {
if err == nil {
return nil
}
str := err.Error()
return &str
}