Basic support for synchronizing multiple repositories (fixes #35)

This commit is contained in:
Jakob Borg
2014-03-29 18:53:48 +01:00
parent 2a5c0646c0
commit 5eb5a056bf
18 changed files with 492 additions and 302 deletions

View File

@@ -1,6 +1,7 @@
package protocol
import (
"bufio"
"compress/flate"
"errors"
"fmt"
@@ -37,19 +38,19 @@ var (
type Model interface {
// An index was received from the peer node
Index(nodeID string, files []FileInfo)
Index(nodeID string, repo string, files []FileInfo)
// An index update was received from the peer node
IndexUpdate(nodeID string, files []FileInfo)
IndexUpdate(nodeID string, repo string, files []FileInfo)
// A request was made by the peer node
Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error)
Request(nodeID string, repo string, name string, offset int64, size int) ([]byte, error)
// The peer node closed the connection
Close(nodeID string, err error)
}
type Connection interface {
ID() string
Index(string, []FileInfo)
Request(repo, name string, offset int64, size int) ([]byte, error)
Index(repo string, files []FileInfo)
Request(repo string, name string, offset int64, size int) ([]byte, error)
Statistics() Statistics
Option(key string) string
}
@@ -62,6 +63,7 @@ type rawConnection struct {
reader io.ReadCloser
xr *xdr.Reader
writer io.WriteCloser
wb *bufio.Writer
xw *xdr.Writer
closed chan struct{}
awaiting map[int]chan asyncResult
@@ -73,8 +75,6 @@ type rawConnection struct {
hasSentIndex bool
hasRecvdIndex bool
statisticsLock sync.Mutex
}
type asyncResult struct {
@@ -93,6 +93,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
if err != nil {
panic(err)
}
wb := bufio.NewWriter(flwr)
c := rawConnection{
id: nodeID,
@@ -100,7 +101,8 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
reader: flrd,
xr: xdr.NewReader(flrd),
writer: flwr,
xw: xdr.NewWriter(flwr),
wb: wb,
xw: xdr.NewWriter(wb),
closed: make(chan struct{}),
awaiting: make(map[int]chan asyncResult),
indexSent: make(map[string]map[string][2]int64),
@@ -245,6 +247,7 @@ type flusher interface {
}
func (c *rawConnection) flush() error {
c.wb.Flush()
if f, ok := c.writer.(flusher); ok {
return f.Flush()
}
@@ -302,7 +305,15 @@ loop:
c.close(c.xr.Error())
break loop
} else {
c.receiver.Index(c.id, im.Files)
// We run this (and the corresponding one for update, below)
// in a separate goroutine to avoid blocking the read loop.
// There is otherwise a potential deadlock where both sides
// has the model locked because it's sending a large index
// update and can't receive the large index update from the
// other side.
go c.receiver.Index(c.id, im.Repository, im.Files)
}
c.Lock()
c.hasRecvdIndex = true
@@ -315,7 +326,7 @@ loop:
c.close(c.xr.Error())
break loop
} else {
c.receiver.IndexUpdate(c.id, im.Files)
go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
}
case messageTypeRequest:
@@ -454,16 +465,11 @@ type Statistics struct {
}
func (c *rawConnection) Statistics() Statistics {
c.statisticsLock.Lock()
defer c.statisticsLock.Unlock()
stats := Statistics{
return Statistics{
At: time.Now(),
InBytesTotal: int(c.xr.Tot()),
OutBytesTotal: int(c.xw.Tot()),
}
return stats
}
func (c *rawConnection) Option(key string) string {