diff --git a/model/filemonitor.go b/model/filemonitor.go index b1b7c5a1..76ac2cab 100644 --- a/model/filemonitor.go +++ b/model/filemonitor.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "log" "os" "path" "sync" @@ -24,6 +25,10 @@ type fileMonitor struct { } func (m *fileMonitor) FileBegins(cc <-chan content) error { + if m.model.trace["file"] { + log.Printf("FILE: FileBegins: " + m.name) + } + tmp := tempName(m.path, m.global.Modified) dir := path.Dir(tmp) @@ -104,6 +109,10 @@ func (m *fileMonitor) copyRemoteBlocks(cc <-chan content, outFile *os.File, writ } func (m *fileMonitor) FileDone() error { + if m.model.trace["file"] { + log.Printf("FILE: FileDone: " + m.name) + } + m.writeDone.Wait() tmp := tempName(m.path, m.global.Modified) @@ -118,7 +127,7 @@ func (m *fileMonitor) FileDone() error { err := hashCheck(tmp, m.global.Blocks) if err != nil { - return fmt.Errorf("%s: %s (tmp) (deleting)", path.Base(m.name), err.Error()) + return err } err = os.Chtimes(tmp, time.Unix(m.global.Modified, 0), time.Unix(m.global.Modified, 0)) @@ -136,7 +145,7 @@ func (m *fileMonitor) FileDone() error { return err } - go m.model.updateLocalLocked(m.global) + m.model.updateLocal(m.global) return nil } diff --git a/model/filequeue.go b/model/filequeue.go index ff7620f0..8086ee6f 100644 --- a/model/filequeue.go +++ b/model/filequeue.go @@ -14,9 +14,10 @@ type Monitor interface { type FileQueue struct { files queuedFileList - lock sync.Mutex sorted bool + fmut sync.Mutex // protects files and sorted availability map[string][]string + amut sync.Mutex // protects availability } type queuedFile struct { @@ -57,8 +58,14 @@ type queuedBlock struct { } func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) { - q.lock.Lock() - defer q.lock.Unlock() + q.fmut.Lock() + defer q.fmut.Unlock() + + for _, f := range q.files { + if f.name == name { + panic("re-adding added file " + f.name) + } + } q.files = append(q.files, queuedFile{ name: name, @@ -72,15 +79,15 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) { } func (q *FileQueue) Len() int { - q.lock.Lock() - defer q.lock.Unlock() + q.fmut.Lock() + defer q.fmut.Unlock() return len(q.files) } func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { - q.lock.Lock() - defer q.lock.Unlock() + q.fmut.Lock() + defer q.fmut.Unlock() if !q.sorted { sort.Sort(q.files) @@ -90,7 +97,11 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { for i := range q.files { qf := &q.files[i] - if len(q.availability[qf.name]) == 0 { + q.amut.Lock() + av := q.availability[qf.name] + q.amut.Unlock() + + if len(av) == 0 { // Noone has the file we want; abort. if qf.remaining != len(qf.blocks) { // We have already started on this file; close it down @@ -103,7 +114,7 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { return queuedBlock{}, false } - for _, ni := range q.availability[qf.name] { + for _, ni := range av { // Find and return the next block in the queue if ni == nodeID { for j, b := range qf.blocks { @@ -127,8 +138,8 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { } func (q *FileQueue) Done(file string, offset int64, data []byte) { - q.lock.Lock() - defer q.lock.Unlock() + q.fmut.Lock() + defer q.fmut.Unlock() c := content{ offset: offset, @@ -167,8 +178,8 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) { } func (q *FileQueue) Queued(file string) bool { - q.lock.Lock() - defer q.lock.Unlock() + q.fmut.Lock() + defer q.fmut.Unlock() for _, qf := range q.files { if qf.name == file { @@ -179,8 +190,8 @@ func (q *FileQueue) Queued(file string) bool { } func (q *FileQueue) QueuedFiles() (files []string) { - q.lock.Lock() - defer q.lock.Unlock() + q.fmut.Lock() + defer q.fmut.Unlock() for _, qf := range q.files { files = append(files, qf.name) @@ -202,8 +213,9 @@ func (q *FileQueue) deleteFile(n string) { } func (q *FileQueue) SetAvailable(file, node string) { - q.lock.Lock() - defer q.lock.Unlock() + q.amut.Lock() + defer q.amut.Unlock() + if q.availability == nil { q.availability = make(map[string][]string) } @@ -211,8 +223,9 @@ func (q *FileQueue) SetAvailable(file, node string) { } func (q *FileQueue) AddAvailable(file, node string) { - q.lock.Lock() - defer q.lock.Unlock() + q.amut.Lock() + defer q.amut.Unlock() + if q.availability == nil { q.availability = make(map[string][]string) } @@ -220,8 +233,9 @@ func (q *FileQueue) AddAvailable(file, node string) { } func (q *FileQueue) RemoveAvailable(toRemove string) { - q.lock.Lock() - defer q.lock.Unlock() + q.amut.Lock() + defer q.amut.Unlock() + for file, nodes := range q.availability { for i, node := range nodes { if node == toRemove { diff --git a/model/model.go b/model/model.go index 188d2c3f..ea83e6b5 100644 --- a/model/model.go +++ b/model/model.go @@ -28,33 +28,41 @@ import ( ) type Model struct { - sync.RWMutex dir string global map[string]File // the latest version of each file as it exists in the cluster + gmut sync.RWMutex // protects global local map[string]File // the files we currently have locally on disk + lmut sync.RWMutex // protects local remote map[string]map[string]File + rmut sync.RWMutex // protects remote protoConn map[string]Connection rawConn map[string]io.Closer - fq FileQueue // queue for files to fetch - dq chan File // queue for files to delete + pmut sync.RWMutex // protects protoConn and rawConn - updatedLocal int64 // timestamp of last update to local - updateGlobal int64 // timestamp of last update to remote + fq FileQueue // queue for files to fetch + dq chan File // queue for files to delete + updatedLocal int64 // timestamp of last update to local + updateGlobal int64 // timestamp of last update to remote lastIdxBcast time.Time lastIdxBcastRequest time.Time + umut sync.RWMutex // provides updated* and lastIdx* rwRunning bool delete bool + initmut sync.Mutex // protects rwRunning and delete trace map[string]bool fileLastChanged map[string]time.Time fileWasSuppressed map[string]int + fmut sync.Mutex // protects fileLastChanged and fileWasSuppressed parallellRequests int limitRequestRate chan struct{} + + imut sync.Mutex // protects Index } type Connection interface { @@ -116,8 +124,6 @@ func (m *Model) LimitRate(kbps int) { // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace. func (m *Model) Trace(t string) { - m.Lock() - defer m.Unlock() m.trace[t] = true } @@ -125,8 +131,8 @@ func (m *Model) Trace(t string) { // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. func (m *Model) StartRW(del bool, threads int) { - m.Lock() - defer m.Unlock() + m.initmut.Lock() + defer m.initmut.Unlock() if m.rwRunning { panic("starting started model") @@ -145,8 +151,8 @@ func (m *Model) StartRW(del bool, threads int) { // Generation returns an opaque integer that is guaranteed to increment on // every change to the local repository or global model. func (m *Model) Generation() int64 { - m.RLock() - defer m.RUnlock() + m.umut.RLock() + defer m.umut.RUnlock() return m.updatedLocal + m.updateGlobal } @@ -162,8 +168,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { RemoteAddr() net.Addr } - m.RLock() - defer m.RUnlock() + m.pmut.RLock() var res = make(map[string]ConnectionInfo) for node, conn := range m.protoConn { @@ -175,14 +180,15 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { } res[node] = ci } + + m.pmut.RUnlock() return res } // LocalSize returns the number of files, deleted files and total bytes for all // files in the global model. func (m *Model) GlobalSize() (files, deleted, bytes int) { - m.RLock() - defer m.RUnlock() + m.gmut.RLock() for _, f := range m.global { if f.Flags&protocol.FlagDeleted == 0 { @@ -192,14 +198,15 @@ func (m *Model) GlobalSize() (files, deleted, bytes int) { deleted++ } } + + m.gmut.RUnlock() return } // LocalSize returns the number of files, deleted files and total bytes for all // files in the local repository. func (m *Model) LocalSize() (files, deleted, bytes int) { - m.RLock() - defer m.RUnlock() + m.lmut.RLock() for _, f := range m.local { if f.Flags&protocol.FlagDeleted == 0 { @@ -209,14 +216,16 @@ func (m *Model) LocalSize() (files, deleted, bytes int) { deleted++ } } + + m.lmut.RUnlock() return } // InSyncSize returns the number and total byte size of the local files that // are in sync with the global model. func (m *Model) InSyncSize() (files, bytes int) { - m.RLock() - defer m.RUnlock() + m.gmut.RLock() + m.lmut.RLock() for n, f := range m.local { if gf, ok := m.global[n]; ok && f.Equals(gf) { @@ -224,27 +233,31 @@ func (m *Model) InSyncSize() (files, bytes int) { bytes += f.Size() } } + + m.lmut.RUnlock() + m.gmut.RUnlock() return } // NeedFiles returns the list of currently needed files and the total size. func (m *Model) NeedFiles() (files []File, bytes int) { - m.RLock() - defer m.RUnlock() + m.gmut.RLock() for _, n := range m.fq.QueuedFiles() { f := m.global[n] files = append(files, f) bytes += f.Size() } + + m.gmut.RUnlock() return } // Index is called when a new node is connected and we receive their full index. // Implements the protocol.Model interface. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { - m.Lock() - defer m.Unlock() + m.imut.Lock() + defer m.imut.Unlock() if m.trace["net"] { log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs)) @@ -254,7 +267,10 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { for _, f := range fs { m.indexUpdate(repo, f) } + + m.rmut.Lock() m.remote[nodeID] = repo + m.rmut.Unlock() m.recomputeGlobal() m.recomputeNeed() @@ -263,22 +279,25 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { // IndexUpdate is called for incremental updates to connected nodes' indexes. // Implements the protocol.Model interface. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { - m.Lock() - defer m.Unlock() + m.imut.Lock() + defer m.imut.Unlock() if m.trace["net"] { log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs)) } + m.rmut.Lock() repo, ok := m.remote[nodeID] if !ok { log.Printf("WARNING: Index update from node %s that does not have an index", nodeID) + m.rmut.Unlock() return } for _, f := range fs { m.indexUpdate(repo, f) } + m.rmut.Unlock() m.recomputeGlobal() m.recomputeNeed() @@ -304,8 +323,8 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) { // Close removes the peer from the model and closes the underlyign connection if possible. // Implements the protocol.Model interface. func (m *Model) Close(node string, err error) { - m.Lock() - defer m.Unlock() + m.pmut.Lock() + m.rmut.Lock() conn, ok := m.rawConn[node] if ok { @@ -317,6 +336,9 @@ func (m *Model) Close(node string, err error) { delete(m.rawConn, node) m.fq.RemoveAvailable(node) + m.rmut.Unlock() + m.pmut.Unlock() + m.recomputeGlobal() m.recomputeNeed() } @@ -325,10 +347,14 @@ func (m *Model) Close(node string, err error) { // Implements the protocol.Model interface. func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) { // Verify that the requested file exists in the local and global model. - m.RLock() + m.lmut.RLock() lf, localOk := m.local[name] + m.lmut.RUnlock() + + m.gmut.RLock() _, globalOk := m.global[name] - m.RUnlock() + m.gmut.RUnlock() + if !localOk || !globalOk { log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) return nil, ErrNoSuchFile @@ -365,33 +391,40 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b // ReplaceLocal replaces the local repository index with the given list of files. // Change suppression is applied to files changing too often. func (m *Model) ReplaceLocal(fs []File) { - m.Lock() - defer m.Unlock() - var updated bool var newLocal = make(map[string]File) + m.lmut.RLock() for _, f := range fs { newLocal[f.Name] = f if ef := m.local[f.Name]; !ef.Equals(f) { updated = true } } + m.lmut.RUnlock() if m.markDeletedLocals(newLocal) { updated = true } + m.lmut.RLock() if len(newLocal) != len(m.local) { updated = true } + m.lmut.RUnlock() if updated { + m.lmut.Lock() m.local = newLocal + m.lmut.Unlock() + m.recomputeGlobal() m.recomputeNeed() + + m.umut.Lock() m.updatedLocal = time.Now().Unix() m.lastIdxBcastRequest = time.Now() + m.umut.Unlock() } } @@ -399,13 +432,12 @@ func (m *Model) ReplaceLocal(fs []File) { // in protocol data types. Does not track deletes, should only be used to seed // the local index from a cache file at startup. func (m *Model) SeedLocal(fs []protocol.FileInfo) { - m.Lock() - defer m.Unlock() - + m.lmut.Lock() m.local = make(map[string]File) for _, f := range fs { m.local[f.Name] = fileFromFileInfo(f) } + m.lmut.Unlock() m.recomputeGlobal() m.recomputeNeed() @@ -413,19 +445,12 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) { // ConnectedTo returns true if we are connected to the named node. func (m *Model) ConnectedTo(nodeID string) bool { - m.RLock() - defer m.RUnlock() + m.pmut.RLock() _, ok := m.protoConn[nodeID] + m.pmut.RUnlock() return ok } -// ProtocolIndex returns the current local index in protocol data types. -func (m *Model) ProtocolIndex() []protocol.FileInfo { - m.RLock() - defer m.RUnlock() - return m.protocolIndex() -} - // RepoID returns a unique ID representing the current repository location. func (m *Model) RepoID() string { return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir))) @@ -436,16 +461,13 @@ func (m *Model) RepoID() string { // repository changes. func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { nodeID := protoConn.ID() - m.Lock() + m.pmut.Lock() m.protoConn[nodeID] = protoConn m.rawConn[nodeID] = rawConn - m.Unlock() - - m.RLock() - idx := m.protocolIndex() - m.RUnlock() + m.pmut.Unlock() go func() { + idx := m.ProtocolIndex() protoConn.Index(idx) }() @@ -457,15 +479,15 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { log.Println("PULL: Starting", nodeID, i) } for { - m.RLock() + m.pmut.RLock() if _, ok := m.protoConn[nodeID]; !ok { if m.trace["pull"] { log.Println("PULL: Exiting", nodeID, i) } - m.RUnlock() + m.pmut.RUnlock() return } - m.RUnlock() + m.pmut.RUnlock() qb, ok := m.fq.Get(nodeID) if ok { @@ -484,6 +506,7 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { } func (m *Model) shouldSuppressChange(name string) bool { + m.fmut.Lock() sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name]) if sup { m.fileWasSuppressed[name]++ @@ -491,6 +514,7 @@ func (m *Model) shouldSuppressChange(name string) bool { m.fileWasSuppressed[name] = 0 m.fileLastChanged[name] = time.Now() } + m.fmut.Unlock() return sup } @@ -505,10 +529,13 @@ func shouldSuppressChange(lastChange time.Time, numChanges int) bool { return false } -// protocolIndex returns the current local index in protocol data types. +// ProtocolIndex returns the current local index in protocol data types. // Must be called with the read lock held. -func (m *Model) protocolIndex() []protocol.FileInfo { +func (m *Model) ProtocolIndex() []protocol.FileInfo { var index []protocol.FileInfo + + m.lmut.RLock() + for _, f := range m.local { mf := fileInfoFromFile(f) if m.trace["idx"] { @@ -520,13 +547,16 @@ func (m *Model) protocolIndex() []protocol.FileInfo { } index = append(index, mf) } + + m.lmut.RUnlock() return index } func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) { - m.RLock() + m.pmut.RLock() nc, ok := m.protoConn[nodeID] - m.RUnlock() + m.pmut.RUnlock() + if !ok { return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID) } @@ -540,18 +570,23 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, ha func (m *Model) broadcastIndexLoop() { for { - m.RLock() + m.umut.RLock() bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast) holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime - m.RUnlock() + m.umut.RUnlock() maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay if bcastRequested && (holdtimeExceeded || maxDelayExceeded) { - m.Lock() + idx := m.ProtocolIndex() + var indexWg sync.WaitGroup indexWg.Add(len(m.protoConn)) - idx := m.protocolIndex() + + m.umut.Lock() m.lastIdxBcast = time.Now() + m.umut.Unlock() + + m.pmut.RLock() for _, node := range m.protoConn { node := node if m.trace["net"] { @@ -562,7 +597,8 @@ func (m *Model) broadcastIndexLoop() { indexWg.Done() }() } - m.Unlock() + m.pmut.RUnlock() + indexWg.Wait() } time.Sleep(idxBcastHoldtime) @@ -577,6 +613,10 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool { // had the newest version available according to the global table and if so // note the file as having been deleted. var updated bool + + m.gmut.RLock() + m.lmut.RLock() + for n, f := range m.local { if _, ok := newLocal[n]; !ok { if gf := m.global[n]; !gf.NewerThan(f) { @@ -590,22 +630,34 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool { } } } + + m.lmut.RUnlock() + m.gmut.RUnlock() + return updated } -func (m *Model) updateLocalLocked(f File) { - m.Lock() - m.updateLocal(f) - m.Unlock() -} - func (m *Model) updateLocal(f File) { + var updated bool + + m.lmut.Lock() if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) { m.local[f.Name] = f + updated = true + } + m.lmut.Unlock() + + if updated { m.recomputeGlobal() - m.recomputeNeed() + // We don't recomputeNeed here for two reasons: + // - a need shouldn't have arisen due to having a newer local file + // - recomputeNeed might call into fq.Add but we might have been called by + // fq which would be a deadlock on fq + + m.umut.Lock() m.updatedLocal = time.Now().Unix() m.lastIdxBcastRequest = time.Now() + m.umut.Unlock() } } @@ -613,10 +665,13 @@ func (m *Model) updateLocal(f File) { func (m *Model) recomputeGlobal() { var newGlobal = make(map[string]File) + m.lmut.RLock() for n, f := range m.local { newGlobal[n] = f } + m.lmut.RUnlock() + m.rmut.RLock() var highestMod int64 for nodeID, fs := range m.remote { for n, nf := range fs { @@ -631,9 +686,11 @@ func (m *Model) recomputeGlobal() { } } } + m.rmut.RUnlock() // Figure out if anything actually changed + m.gmut.RLock() var updated bool if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) { updated = true @@ -645,22 +702,39 @@ func (m *Model) recomputeGlobal() { } } } + m.gmut.RUnlock() if updated { - m.updateGlobal = time.Now().Unix() + m.gmut.Lock() + m.umut.Lock() m.global = newGlobal + m.updateGlobal = time.Now().Unix() + m.umut.Unlock() + m.gmut.Unlock() } } -// Must be called with the write lock held. func (m *Model) recomputeNeed() { + type addOrder struct { + n string + remote []Block + fm *fileMonitor + } + var toDelete []File + var toAdd []addOrder + + m.gmut.RLock() for n, gf := range m.global { if m.fq.Queued(n) { continue } + + m.lmut.RLock() lf, ok := m.local[n] + m.lmut.RUnlock() + if !ok || gf.NewerThan(lf) { if gf.Flags&protocol.FlagInvalid != 0 { // Never attempt to sync invalid files @@ -689,29 +763,27 @@ func (m *Model) recomputeNeed() { model: m, localBlocks: local, } - m.fq.Add(n, remote, &fm) + toAdd = append(toAdd, addOrder{n, remote, &fm}) } } } - go func() { - for _, gf := range toDelete { - // The receive side needs the lock, which we are holding - m.dq <- gf - } - }() + m.gmut.RUnlock() + + for _, ao := range toAdd { + m.fq.Add(ao.n, ao.remote, ao.fm) + } + for _, gf := range toDelete { + m.dq <- gf + } } func (m *Model) WhoHas(name string) []string { - m.RLock() - defer m.RUnlock() - return m.whoHas(name) -} - -// Must be called with the read lock held. -func (m *Model) whoHas(name string) []string { var remote []string + m.gmut.RLock() + m.rmut.RLock() + gf := m.global[name] for node, files := range m.remote { if file, ok := files[name]; ok && file.Equals(gf) { @@ -719,6 +791,8 @@ func (m *Model) whoHas(name string) []string { } } + m.rmut.RUnlock() + m.gmut.RUnlock() return remote } @@ -732,7 +806,8 @@ func (m *Model) deleteLoop() { if err != nil { log.Printf("WARNING: %s: %v", file.Name, err) } - m.updateLocalLocked(file) + + m.updateLocal(file) } } diff --git a/model/walk.go b/model/walk.go index 0dd99fe7..91dfa8c6 100644 --- a/model/walk.go +++ b/model/walk.go @@ -116,9 +116,9 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. } modified := fi.ModTime().Unix() - m.RLock() + m.lmut.RLock() hf, ok := m.local[rn] - m.RUnlock() + m.lmut.RUnlock() if ok && hf.Modified == modified { if nf := uint32(info.Mode()); nf != hf.Flags { @@ -127,7 +127,6 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. } *res = append(*res, hf) } else { - m.Lock() if m.shouldSuppressChange(rn) { if m.trace["file"] { log.Println("FILE: SUPPRESS:", rn, m.fileWasSuppressed[rn], time.Since(m.fileLastChanged[rn])) @@ -138,10 +137,8 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. hf.Version++ *res = append(*res, hf) } - m.Unlock() return nil } - m.Unlock() if m.trace["file"] { log.Printf("FILE: Hash %q", p) diff --git a/protocol/protocol.go b/protocol/protocol.go index 8eb9d067..993df877 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -341,14 +341,15 @@ func (c *Connection) processRequest(msgID int, req request) { c.Lock() c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) c.mwriter.writeResponse(data) - err := c.flush() + err := c.mwriter.err + if err == nil { + err = c.flush() + } c.Unlock() buffers.Put(data) if err != nil { c.close(err) - } else if c.mwriter.err != nil { - c.close(c.mwriter.err) } }