Discover & main tracing

This commit is contained in:
Jakob Borg
2014-03-09 09:15:36 +01:00
parent 52ee7d5724
commit 2553ba0463
5 changed files with 73 additions and 57 deletions

View File

@@ -44,8 +44,6 @@ type Model struct {
delete bool
initmut sync.Mutex // protects rwRunning and delete
trace map[string]bool
sup suppressor
parallelRequests int
@@ -84,7 +82,6 @@ func NewModel(dir string, maxChangeBw int) *Model {
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
sup: suppressor{threshold: int64(maxChangeBw)},
fq: NewFileQueue(),
dq: make(chan scanner.File),
@@ -109,11 +106,6 @@ func (m *Model) LimitRate(kbps int) {
}()
}
// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
func (m *Model) Trace(t string) {
m.trace[t] = true
}
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
@@ -286,8 +278,8 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
m.imut.Lock()
defer m.imut.Unlock()
if m.trace["net"] {
debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
if debugNet {
dlog.Printf("IDX(in): %s: %d files", nodeID, len(fs))
}
repo := make(map[string]scanner.File)
@@ -314,8 +306,8 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
m.imut.Lock()
defer m.imut.Unlock()
if m.trace["net"] {
debugf("NET IDXUP(in): %s: %d files", nodeID, len(files))
if debugNet {
dlog.Printf("IDXUP(in): %s: %d files", nodeID, len(files))
}
m.rmut.Lock()
@@ -336,12 +328,12 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
}
func (m *Model) indexUpdate(repo map[string]scanner.File, f scanner.File) {
if m.trace["idx"] {
if debugIdx {
var flagComment string
if f.Flags&protocol.FlagDeleted != 0 {
flagComment = " (deleted)"
}
debugf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
dlog.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
}
if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
@@ -355,8 +347,8 @@ func (m *Model) indexUpdate(repo map[string]scanner.File, f scanner.File) {
// Close removes the peer from the model and closes the underlying connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) {
if m.trace["net"] {
debugf("NET: %s: %v", node, err)
if debugNet {
dlog.Printf("%s: %v", node, err)
}
if err == protocol.ErrClusterHash {
warnf("Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node)
@@ -405,8 +397,8 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by
return nil, ErrInvalid
}
if m.trace["net"] && nodeID != "<local>" {
debugf("NET REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size)
if debugNet && nodeID != "<local>" {
dlog.Printf("REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size)
}
fn := path.Join(m.dir, name)
fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
@@ -509,6 +501,9 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
go func() {
idx := m.ProtocolIndex()
if debugNet {
dlog.Printf("IDX(out/initial): %s: %d files", nodeID, len(idx))
}
protoConn.Index("default", idx)
}()
@@ -522,14 +517,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
for i := 0; i < m.parallelRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
debugln("PULL: Starting", nodeID, i)
if debugPull {
dlog.Println("starting puller:", nodeID, i)
}
for {
m.pmut.RLock()
if _, ok := m.protoConn[nodeID]; !ok {
if m.trace["pull"] {
debugln("PULL: Exiting", nodeID, i)
if debugPull {
dlog.Println("stopping puller:", nodeID, i)
}
m.pmut.RUnlock()
return
@@ -538,8 +533,8 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
qb, ok := m.fq.Get(nodeID)
if ok {
if m.trace["pull"] {
debugln("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
if debugPull {
dlog.Println("request: out", nodeID, i, qb.name, qb.block.Offset)
}
data, _ := protoConn.Request("default", qb.name, qb.block.Offset, int(qb.block.Size))
m.fq.Done(qb.name, qb.block.Offset, data)
@@ -560,12 +555,12 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo {
for _, f := range m.local {
mf := fileInfoFromFile(f)
if m.trace["idx"] {
if debugIdx {
var flagComment string
if mf.Flags&protocol.FlagDeleted != 0 {
flagComment = " (deleted)"
}
debugf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
dlog.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
}
index = append(index, mf)
}
@@ -583,8 +578,8 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
}
if m.trace["net"] {
debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
if debugNet {
dlog.Printf("REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
return nc.Request("default", name, offset, size)
@@ -611,8 +606,8 @@ func (m *Model) broadcastIndexLoop() {
m.pmut.RLock()
for _, node := range m.protoConn {
node := node
if m.trace["net"] {
debugf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
if debugNet {
dlog.Printf("IDX(out/loop): %s: %d files", node.ID(), len(idx))
}
go func() {
node.Index("default", idx)
@@ -823,8 +818,8 @@ func (m *Model) recomputeNeedForFile(gf scanner.File, toAdd []addOrder, toDelete
// Don't have the file, so don't need to delete it
return toAdd, toDelete
}
if m.trace["need"] {
debugf("NEED: lf:%v gf:%v", lf, gf)
if debugNeed {
dlog.Printf("need: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
@@ -865,8 +860,8 @@ func (m *Model) WhoHas(name string) []string {
func (m *Model) deleteLoop() {
for file := range m.dq {
if m.trace["file"] {
debugln("FILE: Delete", file.Name)
if debugPull {
dlog.Println("delete", file.Name)
}
path := path.Clean(path.Join(m.dir, file.Name))
err := os.Remove(path)