|
|
|
@ -398,12 +398,11 @@ type dialer interface { |
|
|
|
|
func (srv *Server) run(dialstate dialer) { |
|
|
|
|
defer srv.loopWG.Done() |
|
|
|
|
var ( |
|
|
|
|
peers = make(map[discover.NodeID]*Peer) |
|
|
|
|
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) |
|
|
|
|
|
|
|
|
|
tasks []task |
|
|
|
|
pendingTasks []task |
|
|
|
|
peers = make(map[discover.NodeID]*Peer) |
|
|
|
|
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) |
|
|
|
|
taskdone = make(chan task, maxActiveDialTasks) |
|
|
|
|
runningTasks []task |
|
|
|
|
queuedTasks []task // tasks that can't run yet
|
|
|
|
|
) |
|
|
|
|
// Put trusted nodes into a map to speed up checks.
|
|
|
|
|
// Trusted peers are loaded on startup and cannot be
|
|
|
|
@ -412,39 +411,39 @@ func (srv *Server) run(dialstate dialer) { |
|
|
|
|
trusted[n.ID] = true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Some task list helpers.
|
|
|
|
|
// removes t from runningTasks
|
|
|
|
|
delTask := func(t task) { |
|
|
|
|
for i := range tasks { |
|
|
|
|
if tasks[i] == t { |
|
|
|
|
tasks = append(tasks[:i], tasks[i+1:]...) |
|
|
|
|
for i := range runningTasks { |
|
|
|
|
if runningTasks[i] == t { |
|
|
|
|
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
scheduleTasks := func(new []task) { |
|
|
|
|
pt := append(pendingTasks, new...) |
|
|
|
|
start := maxActiveDialTasks - len(tasks) |
|
|
|
|
if len(pt) < start { |
|
|
|
|
start = len(pt) |
|
|
|
|
// starts until max number of active tasks is satisfied
|
|
|
|
|
startTasks := func(ts []task) (rest []task) { |
|
|
|
|
i := 0 |
|
|
|
|
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { |
|
|
|
|
t := ts[i] |
|
|
|
|
glog.V(logger.Detail).Infoln("new task:", t) |
|
|
|
|
go func() { t.Do(srv); taskdone <- t }() |
|
|
|
|
runningTasks = append(runningTasks, t) |
|
|
|
|
} |
|
|
|
|
if start > 0 { |
|
|
|
|
tasks = append(tasks, pt[:start]...) |
|
|
|
|
for _, t := range pt[:start] { |
|
|
|
|
t := t |
|
|
|
|
glog.V(logger.Detail).Infoln("new task:", t) |
|
|
|
|
go func() { t.Do(srv); taskdone <- t }() |
|
|
|
|
} |
|
|
|
|
copy(pt, pt[start:]) |
|
|
|
|
pendingTasks = pt[:len(pt)-start] |
|
|
|
|
return ts[i:] |
|
|
|
|
} |
|
|
|
|
scheduleTasks := func() { |
|
|
|
|
// Start from queue first.
|
|
|
|
|
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) |
|
|
|
|
// Query dialer for new tasks and start as many as possible now.
|
|
|
|
|
if len(runningTasks) < maxActiveDialTasks { |
|
|
|
|
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) |
|
|
|
|
queuedTasks = append(queuedTasks, startTasks(nt)...) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
running: |
|
|
|
|
for { |
|
|
|
|
// Query the dialer for new tasks and launch them.
|
|
|
|
|
now := time.Now() |
|
|
|
|
nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now) |
|
|
|
|
scheduleTasks(nt) |
|
|
|
|
scheduleTasks() |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case <-srv.quit: |
|
|
|
@ -466,7 +465,7 @@ running: |
|
|
|
|
// can update its state and remove it from the active
|
|
|
|
|
// tasks list.
|
|
|
|
|
glog.V(logger.Detail).Infoln("<-taskdone:", t) |
|
|
|
|
dialstate.taskDone(t, now) |
|
|
|
|
dialstate.taskDone(t, time.Now()) |
|
|
|
|
delTask(t) |
|
|
|
|
case c := <-srv.posthandshake: |
|
|
|
|
// A connection has passed the encryption handshake so
|
|
|
|
@ -513,7 +512,7 @@ running: |
|
|
|
|
// Wait for peers to shut down. Pending connections and tasks are
|
|
|
|
|
// not handled here and will terminate soon-ish because srv.quit
|
|
|
|
|
// is closed.
|
|
|
|
|
glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks)) |
|
|
|
|
glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks)) |
|
|
|
|
for len(peers) > 0 { |
|
|
|
|
p := <-srv.delpeer |
|
|
|
|
glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p) |
|
|
|
|