Move top level packages to internal.

This commit is contained in:
Jakob Borg
2014-09-22 21:42:11 +02:00
parent fbdbd722b1
commit 14817e31f6
174 changed files with 252 additions and 254 deletions

17
internal/events/debug.go Normal file
View File

@@ -0,0 +1,17 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package events
import (
"os"
"strings"
"github.com/syncthing/syncthing/internal/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "events") || os.Getenv("STTRACE") == "all"
dl = logger.DefaultLogger
)

237
internal/events/events.go Normal file
View File

@@ -0,0 +1,237 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
// Package events provides event subscription and polling functionality.
package events
import (
"errors"
"sync"
"time"
)
type EventType uint64
const (
Ping = 1 << iota
Starting
StartupComplete
NodeDiscovered
NodeConnected
NodeDisconnected
NodeRejected
LocalIndexUpdated
RemoteIndexUpdated
ItemStarted
StateChanged
RepoRejected
ConfigSaved
AllEvents = ^EventType(0)
)
func (t EventType) String() string {
switch t {
case Ping:
return "Ping"
case Starting:
return "Starting"
case StartupComplete:
return "StartupComplete"
case NodeDiscovered:
return "NodeDiscovered"
case NodeConnected:
return "NodeConnected"
case NodeDisconnected:
return "NodeDisconnected"
case NodeRejected:
return "NodeRejected"
case LocalIndexUpdated:
return "LocalIndexUpdated"
case RemoteIndexUpdated:
return "RemoteIndexUpdated"
case ItemStarted:
return "ItemStarted"
case StateChanged:
return "StateChanged"
case RepoRejected:
return "RepoRejected"
case ConfigSaved:
return "ConfigSaved"
default:
return "Unknown"
}
}
func (t EventType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
const BufferSize = 64
type Logger struct {
subs map[int]*Subscription
nextId int
mutex sync.Mutex
}
type Event struct {
ID int `json:"id"`
Time time.Time `json:"time"`
Type EventType `json:"type"`
Data interface{} `json:"data"`
}
type Subscription struct {
mask EventType
id int
events chan Event
mutex sync.Mutex
}
var Default = NewLogger()
var (
ErrTimeout = errors.New("timeout")
ErrClosed = errors.New("closed")
)
func NewLogger() *Logger {
return &Logger{
subs: make(map[int]*Subscription),
}
}
func (l *Logger) Log(t EventType, data interface{}) {
l.mutex.Lock()
if debug {
dl.Debugln("log", l.nextId, t.String(), data)
}
e := Event{
ID: l.nextId,
Time: time.Now(),
Type: t,
Data: data,
}
l.nextId++
for _, s := range l.subs {
if s.mask&t != 0 {
select {
case s.events <- e:
default:
//log.Println("Dropping event:", e)
}
}
}
l.mutex.Unlock()
}
func (l *Logger) Subscribe(mask EventType) *Subscription {
l.mutex.Lock()
if debug {
dl.Debugln("subscribe", mask)
}
s := &Subscription{
mask: mask,
id: l.nextId,
events: make(chan Event, BufferSize),
}
l.nextId++
l.subs[s.id] = s
l.mutex.Unlock()
return s
}
func (l *Logger) Unsubscribe(s *Subscription) {
l.mutex.Lock()
if debug {
dl.Debugln("unsubsribe")
}
delete(l.subs, s.id)
close(s.events)
l.mutex.Unlock()
}
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if debug {
dl.Debugln("poll", timeout)
}
to := time.After(timeout)
select {
case e, ok := <-s.events:
if !ok {
return e, ErrClosed
}
return e, nil
case <-to:
return Event{}, ErrTimeout
}
}
type BufferedSubscription struct {
sub *Subscription
buf []Event
next int
cur int
mut sync.Mutex
cond *sync.Cond
}
func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
bs := &BufferedSubscription{
sub: s,
buf: make([]Event, size),
}
bs.cond = sync.NewCond(&bs.mut)
go bs.pollingLoop()
return bs
}
func (s *BufferedSubscription) pollingLoop() {
for {
ev, err := s.sub.Poll(60 * time.Second)
if err == ErrTimeout {
continue
}
if err == ErrClosed {
return
}
if err != nil {
panic("unexpected error: " + err.Error())
}
s.mut.Lock()
s.buf[s.next] = ev
s.next = (s.next + 1) % len(s.buf)
s.cur = ev.ID
s.cond.Broadcast()
s.mut.Unlock()
}
}
func (s *BufferedSubscription) Since(id int, into []Event) []Event {
s.mut.Lock()
defer s.mut.Unlock()
for id >= s.cur {
s.cond.Wait()
}
for i := s.next; i < len(s.buf); i++ {
if s.buf[i].ID > id {
into = append(into, s.buf[i])
}
}
for i := 0; i < s.next; i++ {
if s.buf[i].ID > id {
into = append(into, s.buf[i])
}
}
return into
}

View File

@@ -0,0 +1,178 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package events_test
import (
"fmt"
"testing"
"time"
"github.com/syncthing/syncthing/internal/events"
)
var timeout = 100 * time.Millisecond
func TestNewLogger(t *testing.T) {
l := events.NewLogger()
if l == nil {
t.Fatal("Unexpected nil Logger")
}
}
func TestSubscriber(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(0)
if s == nil {
t.Fatal("Unexpected nil Subscription")
}
}
func TestTimeout(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(0)
_, err := s.Poll(timeout)
if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err)
}
}
func TestEventBeforeSubscribe(t *testing.T) {
l := events.NewLogger()
l.Log(events.NodeConnected, "foo")
s := l.Subscribe(0)
_, err := s.Poll(timeout)
if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err)
}
}
func TestEventAfterSubscribe(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(events.AllEvents)
l.Log(events.NodeConnected, "foo")
ev, err := s.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err)
}
if ev.Type != events.NodeConnected {
t.Error("Incorrect event type", ev.Type)
}
switch v := ev.Data.(type) {
case string:
if v != "foo" {
t.Error("Incorrect Data string", v)
}
default:
t.Errorf("Incorrect Data type %#v", v)
}
}
func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(events.NodeDisconnected)
l.Log(events.NodeConnected, "foo")
_, err := s.Poll(timeout)
if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err)
}
}
func TestBufferOverflow(t *testing.T) {
l := events.NewLogger()
_ = l.Subscribe(events.AllEvents)
t0 := time.Now()
for i := 0; i < events.BufferSize*2; i++ {
l.Log(events.NodeConnected, "foo")
}
if time.Since(t0) > timeout {
t.Fatalf("Logging took too long")
}
}
func TestUnsubscribe(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(events.AllEvents)
l.Log(events.NodeConnected, "foo")
_, err := s.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err)
}
l.Unsubscribe(s)
l.Log(events.NodeConnected, "foo")
_, err = s.Poll(timeout)
if err != events.ErrClosed {
t.Fatal("Unexpected non-Closed error:", err)
}
}
func TestIDs(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(events.AllEvents)
l.Log(events.NodeConnected, "foo")
l.Log(events.NodeConnected, "bar")
ev, err := s.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err)
}
if ev.Data.(string) != "foo" {
t.Fatal("Incorrect event:", ev)
}
id := ev.ID
ev, err = s.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err)
}
if ev.Data.(string) != "bar" {
t.Fatal("Incorrect event:", ev)
}
if !(ev.ID > id) {
t.Fatalf("ID not incremented (%d !> %d)", ev.ID, id)
}
}
func TestBufferedSub(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(events.AllEvents)
bs := events.NewBufferedSubscription(s, 10*events.BufferSize)
go func() {
for i := 0; i < 10*events.BufferSize; i++ {
l.Log(events.NodeConnected, fmt.Sprintf("event-%d", i))
if i%30 == 0 {
// Give the buffer routine time to pick up the events
time.Sleep(20 * time.Millisecond)
}
}
}()
recv := 0
for recv < 10*events.BufferSize {
evs := bs.Since(recv, nil)
for _, ev := range evs {
if ev.ID != recv+1 {
t.Fatalf("Incorrect ID; %d != %d", ev.ID, recv+1)
}
recv = ev.ID
}
}
}