The Great Rewrite (fixes #36, #61, #94, #101)

Rewrite of the file model and pulling mechanism. Needs lots of cleanup
and bugfixes, now...
This commit is contained in:
Jakob Borg
2014-03-28 14:36:57 +01:00
parent 3700eb1e61
commit f87b1520e8
47 changed files with 2137 additions and 1902 deletions

View File

@@ -46,16 +46,24 @@ type Model interface {
Close(nodeID string, err error)
}
type Connection struct {
type Connection interface {
ID() string
Index(string, []FileInfo)
Request(repo, name string, offset int64, size int) ([]byte, error)
Statistics() Statistics
Option(key string) string
}
type rawConnection struct {
sync.RWMutex
id string
receiver Model
reader io.Reader
reader io.ReadCloser
xr *xdr.Reader
writer io.Writer
writer io.WriteCloser
xw *xdr.Writer
closed bool
closed chan struct{}
awaiting map[int]chan asyncResult
nextID int
indexSent map[string]map[string][2]int64
@@ -79,20 +87,21 @@ const (
pingIdleTime = 5 * time.Minute
)
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) *Connection {
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) Connection {
flrd := flate.NewReader(reader)
flwr, err := flate.NewWriter(writer, flate.BestSpeed)
if err != nil {
panic(err)
}
c := Connection{
c := rawConnection{
id: nodeID,
receiver: receiver,
receiver: nativeModel{receiver},
reader: flrd,
xr: xdr.NewReader(flrd),
writer: flwr,
xw: xdr.NewWriter(flwr),
closed: make(chan struct{}),
awaiting: make(map[int]chan asyncResult),
indexSent: make(map[string]map[string][2]int64),
}
@@ -122,16 +131,20 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
}()
}
return &c
return wireFormatConnection{&c}
}
func (c *Connection) ID() string {
func (c *rawConnection) ID() string {
return c.id
}
// Index writes the list of file information to the connected peer node
func (c *Connection) Index(repo string, idx []FileInfo) {
func (c *rawConnection) Index(repo string, idx []FileInfo) {
c.Lock()
if c.isClosed() {
c.Unlock()
return
}
var msgType int
if c.indexSent[repo] == nil {
// This is the first time we send an index.
@@ -170,9 +183,9 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *Connection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
c.Lock()
if c.closed {
if c.isClosed() {
c.Unlock()
return nil, ErrClosed
}
@@ -201,9 +214,9 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
return res.val, res.err
}
func (c *Connection) ping() bool {
func (c *rawConnection) ping() bool {
c.Lock()
if c.closed {
if c.isClosed() {
c.Unlock()
return false
}
@@ -231,38 +244,45 @@ type flusher interface {
Flush() error
}
func (c *Connection) flush() error {
func (c *rawConnection) flush() error {
if f, ok := c.writer.(flusher); ok {
return f.Flush()
}
return nil
}
func (c *Connection) close(err error) {
func (c *rawConnection) close(err error) {
c.Lock()
if c.closed {
select {
case <-c.closed:
c.Unlock()
return
default:
}
c.closed = true
close(c.closed)
for _, ch := range c.awaiting {
close(ch)
}
c.awaiting = nil
c.writer.Close()
c.reader.Close()
c.Unlock()
c.receiver.Close(c.id, err)
}
func (c *Connection) isClosed() bool {
c.RLock()
defer c.RUnlock()
return c.closed
func (c *rawConnection) isClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
func (c *Connection) readerLoop() {
func (c *rawConnection) readerLoop() {
loop:
for {
for !c.isClosed() {
var hdr header
hdr.decodeXDR(c.xr)
if c.xr.Error() != nil {
@@ -381,7 +401,7 @@ loop:
}
}
func (c *Connection) processRequest(msgID int, req RequestMessage) {
func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
c.Lock()
@@ -398,27 +418,31 @@ func (c *Connection) processRequest(msgID int, req RequestMessage) {
}
}
func (c *Connection) pingerLoop() {
func (c *rawConnection) pingerLoop() {
var rc = make(chan bool, 1)
ticker := time.Tick(pingIdleTime / 2)
for {
time.Sleep(pingIdleTime / 2)
select {
case <-ticker:
c.RLock()
ready := c.hasRecvdIndex && c.hasSentIndex
c.RUnlock()
c.RLock()
ready := c.hasRecvdIndex && c.hasSentIndex
c.RUnlock()
if ready {
go func() {
rc <- c.ping()
}()
select {
case ok := <-rc:
if !ok {
c.close(fmt.Errorf("ping failure"))
if ready {
go func() {
rc <- c.ping()
}()
select {
case ok := <-rc:
if !ok {
c.close(fmt.Errorf("ping failure"))
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("ping timeout"))
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("ping timeout"))
}
case <-c.closed:
return
}
}
}
@@ -429,7 +453,7 @@ type Statistics struct {
OutBytesTotal int
}
func (c *Connection) Statistics() Statistics {
func (c *rawConnection) Statistics() Statistics {
c.statisticsLock.Lock()
defer c.statisticsLock.Unlock()
@@ -442,7 +466,7 @@ func (c *Connection) Statistics() Statistics {
return stats
}
func (c *Connection) Option(key string) string {
func (c *rawConnection) Option(key string) string {
c.optionsLock.Lock()
defer c.optionsLock.Unlock()
return c.peerOptions[key]