This commit is contained in:
Jakob Borg
2014-02-24 13:29:30 +01:00
parent e7bf3ac108
commit 434a0ccf2a
9 changed files with 35 additions and 37 deletions

View File

@@ -31,7 +31,8 @@ const (
)
var (
ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash")
ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
ErrClosed = errors.New("connection closed")
)
type Model interface {
@@ -56,7 +57,7 @@ type Connection struct {
xw *xdr.Writer
closed bool
awaiting map[int]chan asyncResult
nextId int
nextID int
indexSent map[string]map[string][2]int64
peerOptions map[string]string
myOptions map[string]string
@@ -68,8 +69,6 @@ type Connection struct {
statisticsLock sync.Mutex
}
var ErrClosed = errors.New("Connection closed")
type asyncResult struct {
val []byte
err error
@@ -105,7 +104,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
c.myOptions = options
go func() {
c.Lock()
header{0, c.nextId, messageTypeOptions}.encodeXDR(c.xw)
header{0, c.nextID, messageTypeOptions}.encodeXDR(c.xw)
var om OptionsMessage
for k, v := range options {
om.Options = append(om.Options, Option{k, v})
@@ -118,7 +117,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
if err != nil {
log.Println("Warning: Write error during initial handshake:", err)
}
c.nextId++
c.nextID++
c.Unlock()
}()
}
@@ -155,12 +154,12 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
idx = diff
}
header{0, c.nextId, msgType}.encodeXDR(c.xw)
header{0, c.nextID, msgType}.encodeXDR(c.xw)
_, err := IndexMessage{repo, idx}.encodeXDR(c.xw)
if err == nil {
err = c.flush()
}
c.nextId = (c.nextId + 1) & 0xfff
c.nextID = (c.nextID + 1) & 0xfff
c.hasSentIndex = true
c.Unlock()
@@ -178,8 +177,8 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
return nil, ErrClosed
}
rc := make(chan asyncResult)
c.awaiting[c.nextId] = rc
header{0, c.nextId, messageTypeRequest}.encodeXDR(c.xw)
c.awaiting[c.nextID] = rc
header{0, c.nextID, messageTypeRequest}.encodeXDR(c.xw)
_, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
if err == nil {
err = c.flush()
@@ -189,7 +188,7 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
c.close(err)
return nil, err
}
c.nextId = (c.nextId + 1) & 0xfff
c.nextID = (c.nextID + 1) & 0xfff
c.Unlock()
res, ok := <-rc
@@ -206,8 +205,8 @@ func (c *Connection) ping() bool {
return false
}
rc := make(chan asyncResult, 1)
c.awaiting[c.nextId] = rc
header{0, c.nextId, messageTypePing}.encodeXDR(c.xw)
c.awaiting[c.nextID] = rc
header{0, c.nextID, messageTypePing}.encodeXDR(c.xw)
err := c.flush()
if err != nil {
c.Unlock()
@@ -218,7 +217,7 @@ func (c *Connection) ping() bool {
c.close(c.xw.Error())
return false
}
c.nextId = (c.nextId + 1) & 0xfff
c.nextID = (c.nextID + 1) & 0xfff
c.Unlock()
res, ok := <-rc
@@ -268,7 +267,7 @@ loop:
break loop
}
if hdr.version != 0 {
c.close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.id, hdr.version))
c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
break loop
}
@@ -371,7 +370,7 @@ loop:
}
default:
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
break loop
}
}
@@ -410,10 +409,10 @@ func (c *Connection) pingerLoop() {
select {
case ok := <-rc:
if !ok {
c.close(fmt.Errorf("Ping failure"))
c.close(fmt.Errorf("ping failure"))
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("Ping timeout"))
c.close(fmt.Errorf("ping timeout"))
}
}
}