diff --git a/protocol/protocol.go b/protocol/protocol.go index 3d011fbd..5dbc20cb 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -11,7 +11,6 @@ import ( "io" "sync" "time" - "github.com/calmh/syncthing/xdr" ) @@ -96,6 +95,15 @@ type rawConnection struct { outbox chan []encodable closed chan struct{} once sync.Once + + incomingIndexes chan incomingIndex +} + +type incomingIndex struct { + update bool + id NodeID + repo string + files []FileInfo } type asyncResult struct { @@ -116,19 +124,20 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M wb := bufio.NewWriterSize(cw, 65536) c := rawConnection{ - id: nodeID, - name: name, - receiver: nativeModel{receiver}, - state: stateInitial, - cr: cr, - xr: xdr.NewReader(rb), - cw: cw, - wb: wb, - xw: xdr.NewWriter(wb), - awaiting: make([]chan asyncResult, 0x1000), - outbox: make(chan []encodable), - nextID: make(chan int), - closed: make(chan struct{}), + id: nodeID, + name: name, + receiver: nativeModel{receiver}, + state: stateInitial, + cr: cr, + xr: xdr.NewReader(rb), + cw: cw, + wb: wb, + xw: xdr.NewWriter(wb), + awaiting: make([]chan asyncResult, 0x1000), + outbox: make(chan []encodable), + nextID: make(chan int), + closed: make(chan struct{}), + incomingIndexes: make(chan incomingIndex, 100), // should be enough for anyone, right? } go c.indexSerializerLoop() @@ -307,15 +316,6 @@ func (c *rawConnection) readerLoop() (err error) { } } -type incomingIndex struct { - update bool - id NodeID - repo string - files []FileInfo -} - -var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right? - func (c *rawConnection) indexSerializerLoop() { // We must avoid blocking the reader loop when processing large indexes. // There is otherwise a potential deadlock where both sides has the model @@ -325,7 +325,7 @@ func (c *rawConnection) indexSerializerLoop() { // routine and buffered channel. for { select { - case ii := <-incomingIndexes: + case ii := <-c.incomingIndexes: if ii.update { if debug { l.Debugf("calling IndexUpdate(%v, %v, %d files)", ii.id, ii.repo, len(ii.files)) @@ -360,7 +360,7 @@ func (c *rawConnection) handleIndex() error { if debug { l.Debugf("queueing Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) } - incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files} + c.incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files} } return nil } @@ -374,7 +374,7 @@ func (c *rawConnection) handleIndexUpdate() error { if debug { l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) } - incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files} + c.incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files} } return nil }