diff --git a/model.go b/model.go index 0daacb35..ddfcf859 100644 --- a/model.go +++ b/model.go @@ -88,9 +88,7 @@ func (m *Model) printStatsLoop() { func (m *Model) printConnectionStats() { for node, conn := range m.nodes { stats := conn.Statistics() - if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 { - infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000) - } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 { + if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 { infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec)) } } diff --git a/protocol/protocol.go b/protocol/protocol.go index 041f055b..485fd022 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -46,17 +46,16 @@ type Model interface { type Connection struct { sync.RWMutex - ID string - receiver Model - reader io.Reader - mreader *marshalReader - writer io.Writer - mwriter *marshalWriter - closed bool - awaiting map[int]chan asyncResult - nextId int - peerLatency time.Duration - indexSent map[string]int64 + ID string + receiver Model + reader io.Reader + mreader *marshalReader + writer io.Writer + mwriter *marshalWriter + closed bool + awaiting map[int]chan asyncResult + nextId int + indexSent map[string]int64 lastStatistics Statistics statisticsLock sync.Mutex @@ -132,7 +131,7 @@ func (c *Connection) Index(idx []FileInfo) { c.nextId = (c.nextId + 1) & 0xfff c.Unlock() if err != nil || c.mwriter.err != nil { - c.close() + c.Close() return } } @@ -150,13 +149,13 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt c.mwriter.writeRequest(request{name, offset, size, hash}) if c.mwriter.err != nil { c.Unlock() - c.close() + c.Close() return nil, c.mwriter.err } err := c.flush() if err != nil { c.Unlock() - c.close() + c.Close() return nil, err } c.nextId = (c.nextId + 1) & 0xfff @@ -169,27 +168,26 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt return res.val, res.err } -func (c *Connection) Ping() (time.Duration, bool) { +func (c *Connection) Ping() bool { c.Lock() if c.closed { c.Unlock() - return 0, false + return false } rc := make(chan asyncResult, 1) c.awaiting[c.nextId] = rc - t0 := time.Now() c.mwriter.writeHeader(header{0, c.nextId, messageTypePing}) err := c.flush() if err != nil || c.mwriter.err != nil { c.Unlock() - c.close() - return 0, false + c.Close() + return false } c.nextId = (c.nextId + 1) & 0xfff c.Unlock() - _, ok := <-rc - return time.Since(t0), ok + res, ok := <-rc + return ok && res.err == nil } func (c *Connection) Stop() { @@ -206,7 +204,7 @@ func (c *Connection) flush() error { return nil } -func (c *Connection) close() { +func (c *Connection) Close() { c.Lock() if c.closed { c.Unlock() @@ -232,12 +230,12 @@ func (c *Connection) readerLoop() { for !c.isClosed() { hdr := c.mreader.readHeader() if c.mreader.err != nil { - c.close() + c.Close() break } if hdr.version != 0 { log.Printf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version) - c.close() + c.Close() break } @@ -249,7 +247,7 @@ func (c *Connection) readerLoop() { case messageTypeIndex: files := c.mreader.readIndex() if c.mreader.err != nil { - c.close() + c.Close() } else { c.receiver.Index(c.ID, files) } @@ -257,7 +255,7 @@ func (c *Connection) readerLoop() { case messageTypeIndexUpdate: files := c.mreader.readIndex() if c.mreader.err != nil { - c.close() + c.Close() } else { c.receiver.IndexUpdate(c.ID, files) } @@ -269,7 +267,7 @@ func (c *Connection) readerLoop() { data := c.mreader.readResponse() if c.mreader.err != nil { - c.close() + c.Close() } else { c.Lock() rc, ok := c.awaiting[hdr.msgID] @@ -288,7 +286,7 @@ func (c *Connection) readerLoop() { err := c.flush() c.Unlock() if err != nil || c.mwriter.err != nil { - c.close() + c.Close() } case messageTypePong: @@ -307,7 +305,7 @@ func (c *Connection) readerLoop() { default: log.Printf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType) - c.close() + c.Close() } } } @@ -315,7 +313,7 @@ func (c *Connection) readerLoop() { func (c *Connection) processRequest(msgID int) { req := c.mreader.readRequest() if c.mreader.err != nil { - c.close() + c.Close() } else { go func() { data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) @@ -326,36 +324,27 @@ func (c *Connection) processRequest(msgID int) { c.Unlock() buffers.Put(data) if c.mwriter.err != nil || err != nil { - c.close() + c.Close() } }() } } func (c *Connection) pingerLoop() { - var rc = make(chan time.Duration, 1) + var rc = make(chan bool, 1) for !c.isClosed() { - c.lastReceiveLock.RLock() - lr := c.lastReceive - c.lastReceiveLock.RUnlock() - - if time.Since(lr) > pingIdleTime { - go func() { - t, ok := c.Ping() - if ok { - rc <- t - } - }() - select { - case lat := <-rc: - c.Lock() - c.peerLatency = (c.peerLatency + lat) / 2 - c.Unlock() - case <-time.After(pingTimeout): - c.close() + time.Sleep(pingIdleTime / 2) + go func() { + rc <- c.Ping() + }() + select { + case ok := <-rc: + if !ok { + c.Close() } + case <-time.After(pingTimeout): + c.Close() } - time.Sleep(time.Second) } } @@ -365,7 +354,6 @@ type Statistics struct { InBytesPerSec int OutBytesTotal int OutBytesPerSec int - Latency time.Duration } func (c *Connection) Statistics() Statistics { @@ -381,7 +369,6 @@ func (c *Connection) Statistics() Statistics { InBytesPerSec: int(float64(rt-c.lastStatistics.InBytesTotal) / secs), OutBytesTotal: wt, OutBytesPerSec: int(float64(wt-c.lastStatistics.OutBytesTotal) / secs), - Latency: c.peerLatency, } c.lastStatistics = stats return stats diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index 9c1fbf1d..62e75b45 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -46,10 +46,10 @@ func TestPing(t *testing.T) { c0 := NewConnection("c0", ar, bw, nil) c1 := NewConnection("c1", br, aw, nil) - if _, ok := c0.Ping(); !ok { + if ok := c0.Ping(); !ok { t.Error("c0 ping failed") } - if _, ok := c1.Ping(); !ok { + if ok := c1.Ping(); !ok { t.Error("c1 ping failed") } } @@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) { c0 := NewConnection("c0", ar, ebw, m0) NewConnection("c1", br, eaw, m1) - _, res := c0.Ping() + res := c0.Ping() if (i < 4 || j < 4) && res { t.Errorf("Unexpected ping success; i=%d, j=%d", i, j) } else if (i >= 8 && j >= 8) && !res { @@ -190,7 +190,7 @@ func TestClose(t *testing.T) { c0 := NewConnection("c0", ar, bw, m0) NewConnection("c1", br, aw, m1) - c0.close() + c0.Close() ok := c0.isClosed() if !ok { @@ -199,7 +199,7 @@ func TestClose(t *testing.T) { // None of these should panic, some should return an error - _, ok = c0.Ping() + ok = c0.Ping() if ok { t.Error("Ping should not return true") }