Don't deadlock while sending and receiving large indexes
This commit is contained in:
parent
1aef03288a
commit
95f5e5fa9a
@ -215,6 +215,7 @@ func main() {
|
|||||||
if profiler := os.Getenv("STPROFILER"); len(profiler) > 0 {
|
if profiler := os.Getenv("STPROFILER"); len(profiler) > 0 {
|
||||||
go func() {
|
go func() {
|
||||||
l.Debugln("Starting profiler on", profiler)
|
l.Debugln("Starting profiler on", profiler)
|
||||||
|
runtime.SetBlockProfileRate(1)
|
||||||
err := http.ListenAndServe(profiler, nil)
|
err := http.ListenAndServe(profiler, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Fatalln(err)
|
l.Fatalln(err)
|
||||||
|
|||||||
@ -47,10 +47,12 @@ type Model struct {
|
|||||||
repoFiles map[string]*files.Set // repo -> files
|
repoFiles map[string]*files.Set // repo -> files
|
||||||
repoNodes map[string][]string // repo -> nodeIDs
|
repoNodes map[string][]string // repo -> nodeIDs
|
||||||
nodeRepos map[string][]string // nodeID -> repos
|
nodeRepos map[string][]string // nodeID -> repos
|
||||||
repoState map[string]repoState // repo -> state
|
|
||||||
suppressor map[string]*suppressor // repo -> suppressor
|
suppressor map[string]*suppressor // repo -> suppressor
|
||||||
rmut sync.RWMutex // protects the above
|
rmut sync.RWMutex // protects the above
|
||||||
|
|
||||||
|
repoState map[string]repoState // repo -> state
|
||||||
|
smut sync.RWMutex
|
||||||
|
|
||||||
cm *cid.Map
|
cm *cid.Map
|
||||||
|
|
||||||
protoConn map[string]protocol.Connection
|
protoConn map[string]protocol.Connection
|
||||||
@ -537,7 +539,10 @@ func (m *Model) broadcastIndexLoop() {
|
|||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
m.rmut.RLock()
|
m.rmut.RLock()
|
||||||
|
|
||||||
|
var indexWg sync.WaitGroup
|
||||||
for repo, fs := range m.repoFiles {
|
for repo, fs := range m.repoFiles {
|
||||||
|
repo := repo
|
||||||
|
|
||||||
c := fs.Changes(cid.LocalID)
|
c := fs.Changes(cid.LocalID)
|
||||||
if c == lastChange[repo] {
|
if c == lastChange[repo] {
|
||||||
continue
|
continue
|
||||||
@ -545,10 +550,14 @@ func (m *Model) broadcastIndexLoop() {
|
|||||||
lastChange[repo] = c
|
lastChange[repo] = c
|
||||||
|
|
||||||
idx := m.protocolIndex(repo)
|
idx := m.protocolIndex(repo)
|
||||||
m.saveIndex(repo, m.indexDir, idx)
|
indexWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
m.saveIndex(repo, m.indexDir, idx)
|
||||||
|
indexWg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
var indexWg sync.WaitGroup
|
|
||||||
for _, nodeID := range m.repoNodes[repo] {
|
for _, nodeID := range m.repoNodes[repo] {
|
||||||
|
nodeID := nodeID
|
||||||
if conn, ok := m.protoConn[nodeID]; ok {
|
if conn, ok := m.protoConn[nodeID]; ok {
|
||||||
indexWg.Add(1)
|
indexWg.Add(1)
|
||||||
if debug {
|
if debug {
|
||||||
@ -560,12 +569,12 @@ func (m *Model) broadcastIndexLoop() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
indexWg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.rmut.RUnlock()
|
m.rmut.RUnlock()
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
|
|
||||||
|
indexWg.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -749,15 +758,15 @@ func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) setState(repo string, state repoState) {
|
func (m *Model) setState(repo string, state repoState) {
|
||||||
m.rmut.Lock()
|
m.smut.Lock()
|
||||||
m.repoState[repo] = state
|
m.repoState[repo] = state
|
||||||
m.rmut.Unlock()
|
m.smut.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) State(repo string) string {
|
func (m *Model) State(repo string) string {
|
||||||
m.rmut.RLock()
|
m.smut.RLock()
|
||||||
state := m.repoState[repo]
|
state := m.repoState[repo]
|
||||||
m.rmut.RUnlock()
|
m.smut.RUnlock()
|
||||||
switch state {
|
switch state {
|
||||||
case RepoIdle:
|
case RepoIdle:
|
||||||
return "idle"
|
return "idle"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user