diff --git a/protocol/protocol.go b/protocol/protocol.go index e82fa756..4136b9e7 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -97,6 +97,15 @@ type rawConnection struct { outbox chan []encodable closed chan struct{} once sync.Once + + incomingIndexes chan incomingIndex +} + +type incomingIndex struct { + update bool + id string + repo string + files []FileInfo } type asyncResult struct { @@ -121,21 +130,22 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M wb := bufio.NewWriter(flwr) c := rawConnection{ - id: nodeID, - receiver: nativeModel{receiver}, - state: stateInitial, - reader: flrd, - cr: cr, - xr: xdr.NewReader(flrd), - writer: flwr, - cw: cw, - wb: wb, - xw: xdr.NewWriter(wb), - awaiting: make([]chan asyncResult, 0x1000), - idxSent: make(map[string]map[string]uint64), - outbox: make(chan []encodable), - nextID: make(chan int), - closed: make(chan struct{}), + id: nodeID, + receiver: nativeModel{receiver}, + state: stateInitial, + reader: flrd, + cr: cr, + xr: xdr.NewReader(flrd), + writer: flwr, + cw: cw, + wb: wb, + xw: xdr.NewWriter(wb), + awaiting: make([]chan asyncResult, 0x1000), + idxSent: make(map[string]map[string]uint64), + outbox: make(chan []encodable), + nextID: make(chan int), + closed: make(chan struct{}), + incomingIndexes: make(chan incomingIndex, 100), // should be enough for anyone, right? } go c.indexSerializerLoop() @@ -316,15 +326,6 @@ func (c *rawConnection) readerLoop() (err error) { } } -type incomingIndex struct { - update bool - id string - repo string - files []FileInfo -} - -var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right? - func (c *rawConnection) indexSerializerLoop() { // We must avoid blocking the reader loop when processing large indexes. // There is otherwise a potential deadlock where both sides has the model @@ -334,7 +335,7 @@ func (c *rawConnection) indexSerializerLoop() { // routine and buffered channel. for { select { - case ii := <-incomingIndexes: + case ii := <-c.incomingIndexes: if ii.update { c.receiver.IndexUpdate(ii.id, ii.repo, ii.files) } else { @@ -360,7 +361,7 @@ func (c *rawConnection) handleIndex() error { // update and can't receive the large index update from the // other side. - incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files} + c.incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files} } return nil } @@ -371,7 +372,7 @@ func (c *rawConnection) handleIndexUpdate() error { if err := c.xr.Error(); err != nil { return err } else { - incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files} + c.incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files} } return nil }