Enforce ClusterConfiguration at start, then no ordering
This commit is contained in:
45
protocol.go
45
protocol.go
@@ -31,8 +31,7 @@ const (
|
||||
|
||||
const (
|
||||
stateInitial = iota
|
||||
stateCCRcvd
|
||||
stateIdxRcvd
|
||||
stateReady
|
||||
)
|
||||
|
||||
// FileInfo flags
|
||||
@@ -103,7 +102,6 @@ type rawConnection struct {
|
||||
id DeviceID
|
||||
name string
|
||||
receiver Model
|
||||
state int
|
||||
|
||||
cr *countingReader
|
||||
cw *countingWriter
|
||||
@@ -155,7 +153,6 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
|
||||
id: deviceID,
|
||||
name: name,
|
||||
receiver: nativeModel{receiver},
|
||||
state: stateInitial,
|
||||
cr: cr,
|
||||
cw: cw,
|
||||
outbox: make(chan hdrMsg),
|
||||
@@ -285,6 +282,7 @@ func (c *rawConnection) readerLoop() (err error) {
|
||||
c.close(err)
|
||||
}()
|
||||
|
||||
state := stateInitial
|
||||
for {
|
||||
select {
|
||||
case <-c.closed:
|
||||
@@ -298,47 +296,54 @@ func (c *rawConnection) readerLoop() (err error) {
|
||||
}
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case ClusterConfigMessage:
|
||||
if state != stateInitial {
|
||||
return fmt.Errorf("protocol error: cluster config message in state %d", state)
|
||||
}
|
||||
go c.receiver.ClusterConfig(c.id, msg)
|
||||
state = stateReady
|
||||
|
||||
case IndexMessage:
|
||||
switch hdr.msgType {
|
||||
case messageTypeIndex:
|
||||
if c.state < stateCCRcvd {
|
||||
return fmt.Errorf("protocol error: index message in state %d", c.state)
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: index message in state %d", state)
|
||||
}
|
||||
c.handleIndex(msg)
|
||||
c.state = stateIdxRcvd
|
||||
state = stateReady
|
||||
|
||||
case messageTypeIndexUpdate:
|
||||
if c.state < stateIdxRcvd {
|
||||
return fmt.Errorf("protocol error: index update message in state %d", c.state)
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: index update message in state %d", state)
|
||||
}
|
||||
c.handleIndexUpdate(msg)
|
||||
state = stateReady
|
||||
}
|
||||
|
||||
case RequestMessage:
|
||||
if c.state < stateIdxRcvd {
|
||||
return fmt.Errorf("protocol error: request message in state %d", c.state)
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: request message in state %d", state)
|
||||
}
|
||||
// Requests are handled asynchronously
|
||||
go c.handleRequest(hdr.msgID, msg)
|
||||
|
||||
case ResponseMessage:
|
||||
if c.state < stateIdxRcvd {
|
||||
return fmt.Errorf("protocol error: response message in state %d", c.state)
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: response message in state %d", state)
|
||||
}
|
||||
c.handleResponse(hdr.msgID, msg)
|
||||
|
||||
case pingMessage:
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: ping message in state %d", state)
|
||||
}
|
||||
c.send(hdr.msgID, messageTypePong, pongMessage{})
|
||||
|
||||
case pongMessage:
|
||||
c.handlePong(hdr.msgID)
|
||||
|
||||
case ClusterConfigMessage:
|
||||
if c.state != stateInitial {
|
||||
return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: pong message in state %d", state)
|
||||
}
|
||||
go c.receiver.ClusterConfig(c.id, msg)
|
||||
c.state = stateCCRcvd
|
||||
c.handlePong(hdr.msgID)
|
||||
|
||||
case CloseMessage:
|
||||
return errors.New(msg.Reason)
|
||||
|
||||
Reference in New Issue
Block a user