Publish more event details

This commit is contained in:
Jakob Borg
2014-07-17 13:38:36 +02:00
parent ec212f73eb
commit f88a7a8e6a
5 changed files with 101 additions and 46 deletions

View File

@@ -33,6 +33,21 @@ const (
RepoCleaning
)
func (s repoState) String() string {
switch s {
case RepoIdle:
return "idle"
case RepoScanning:
return "scanning"
case RepoCleaning:
return "cleaning"
case RepoSyncing:
return "syncing"
default:
return "unknown"
}
}
// Somewhat arbitrary amount of bytes that we choose to let represent the size
// of an unsynchronized directory entry or a deleted file. We need it to be
// larger than zero so that it's visible that there is some amount of bytes to
@@ -57,8 +72,9 @@ type Model struct {
suppressor map[string]*suppressor // repo -> suppressor
rmut sync.RWMutex // protects the above
repoState map[string]repoState // repo -> state
smut sync.RWMutex
repoState map[string]repoState // repo -> state
repoStateChanged map[string]time.Time // repo -> time when state changed
smut sync.RWMutex
protoConn map[protocol.NodeID]protocol.Connection
rawConn map[protocol.NodeID]io.Closer
@@ -84,22 +100,23 @@ var (
// for file data without altering the local repository in any way.
func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string, db *leveldb.DB) *Model {
m := &Model{
indexDir: indexDir,
cfg: cfg,
db: db,
clientName: clientName,
clientVersion: clientVersion,
repoCfgs: make(map[string]config.RepositoryConfiguration),
repoFiles: make(map[string]*files.Set),
repoNodes: make(map[string][]protocol.NodeID),
nodeRepos: make(map[protocol.NodeID][]string),
repoState: make(map[string]repoState),
suppressor: make(map[string]*suppressor),
protoConn: make(map[protocol.NodeID]protocol.Connection),
rawConn: make(map[protocol.NodeID]io.Closer),
nodeVer: make(map[protocol.NodeID]string),
sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
indexDir: indexDir,
cfg: cfg,
db: db,
clientName: clientName,
clientVersion: clientVersion,
repoCfgs: make(map[string]config.RepositoryConfiguration),
repoFiles: make(map[string]*files.Set),
repoNodes: make(map[string][]protocol.NodeID),
nodeRepos: make(map[protocol.NodeID][]string),
repoState: make(map[string]repoState),
repoStateChanged: make(map[string]time.Time),
suppressor: make(map[string]*suppressor),
protoConn: make(map[protocol.NodeID]protocol.Connection),
rawConn: make(map[protocol.NodeID]io.Closer),
nodeVer: make(map[protocol.NodeID]string),
sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
}
var timeout = 20 * 60 // seconds
@@ -322,16 +339,20 @@ func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInf
}
m.rmut.RLock()
if r, ok := m.repoFiles[repo]; ok {
r, ok := m.repoFiles[repo]
m.rmut.RUnlock()
if ok {
r.Replace(nodeID, fs)
} else {
l.Fatalf("Index for nonexistant repo %q", repo)
}
m.rmut.RUnlock()
events.Default.Log(events.RemoteIndexUpdated, map[string]string{
"node": nodeID.String(),
"repo": repo,
events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
"node": nodeID.String(),
"repo": repo,
"items": len(fs),
"version": r.LocalVersion(nodeID),
})
}
@@ -348,16 +369,21 @@ func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.F
}
m.rmut.RLock()
if r, ok := m.repoFiles[repo]; ok {
r, ok := m.repoFiles[repo]
m.rmut.RUnlock()
m.rmut.RLock()
if ok {
r.Update(nodeID, fs)
} else {
l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
}
m.rmut.RUnlock()
events.Default.Log(events.RemoteIndexUpdated, map[string]string{
"node": nodeID.String(),
"repo": repo,
events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
"node": nodeID.String(),
"repo": repo,
"items": len(fs),
"version": r.LocalVersion(nodeID),
})
}
@@ -620,7 +646,13 @@ func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
m.rmut.RLock()
m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
m.rmut.RUnlock()
events.Default.Log(events.LocalIndexUpdated, map[string]string{"repo": repo})
events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
"repo": repo,
"name": f.Name,
"modified": time.Unix(f.Modified, 0),
"flags": fmt.Sprintf("0%o", f.Flags),
"size": f.Size(),
})
}
func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
@@ -797,26 +829,30 @@ func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessag
func (m *Model) setState(repo string, state repoState) {
m.smut.Lock()
m.repoState[repo] = state
oldState := m.repoState[repo]
changed, ok := m.repoStateChanged[repo]
if state != oldState {
m.repoState[repo] = state
m.repoStateChanged[repo] = time.Now()
eventData := map[string]interface{}{
"repo": repo,
"to": state.String(),
}
if ok {
eventData["duration"] = time.Since(changed).Seconds()
eventData["from"] = oldState.String()
}
events.Default.Log(events.StateChanged, eventData)
}
m.smut.Unlock()
}
func (m *Model) State(repo string) string {
func (m *Model) State(repo string) (string, time.Time) {
m.smut.RLock()
state := m.repoState[repo]
changed := m.repoStateChanged[repo]
m.smut.RUnlock()
switch state {
case RepoIdle:
return "idle"
case RepoScanning:
return "scanning"
case RepoCleaning:
return "cleaning"
case RepoSyncing:
return "syncing"
default:
return "unknown"
}
return state.String(), changed
}
func (m *Model) Override(repo string) {