Skip to content
This repository was archived by the owner on Feb 1, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 140 additions & 49 deletions cluster/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ const (
minSupportedVersion = version.Version("1.6.0")
)

type engineState int

const (
// pending means an engine added to cluster has not been validated
statePending engineState = iota
// unhealthy means an engine is unreachable
stateUnhealthy
// healthy means an engine is reachable
stateHealthy
// maintenance means an engine is under maintenance.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongluochen This is just for convenience but can we add a TODO and reference to the issue (#1508) here? Because the comment is there anyway :)

// There is no action to migrate a node into maintenance state yet.
//stateMaintenance
)

var stateText = map[engineState]string{
statePending: "Pending",
stateUnhealthy: "Unhealthy",
stateHealthy: "Healthy",
//stateMaintenance: "Maintenance",
}

// delayer offers a simple API to random delay within a given time range.
type delayer struct {
rangeMin time.Duration
Expand Down Expand Up @@ -82,7 +103,9 @@ type Engine struct {
volumes map[string]*Volume
client dockerclient.Client
eventHandler EventHandler
healthy bool
state engineState
lastError string
updatedAt time.Time
failureCount int
overcommitRatio int64
opts *EngineOpts
Expand All @@ -99,7 +122,8 @@ func NewEngine(addr string, overcommitRatio float64, opts *EngineOpts) *Engine {
containers: make(map[string]*Container),
networks: make(map[string]*Network),
volumes: make(map[string]*Volume),
healthy: true,
state: statePending,
updatedAt: time.Now(),
overcommitRatio: int64(overcommitRatio * 100),
opts: opts,
}
Expand Down Expand Up @@ -153,9 +177,6 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error {
e.RefreshVolumes()
e.RefreshNetworks()

// Start the update loop.
go e.refreshLoop()

e.emitEvent("engine_connect")

return nil
Expand All @@ -182,45 +203,114 @@ func (e *Engine) isConnected() bool {

// IsHealthy returns true if the engine is healthy
func (e *Engine) IsHealthy() bool {
return e.healthy
e.RLock()
e.RUnlock()
return e.state == stateHealthy
}

// setState sets engine state
func (e *Engine) setState(state engineState) {
e.Lock()
defer e.Unlock()
e.state = state
}

// setHealthy sets engine healthy state
func (e *Engine) setHealthy(state bool) {
// TimeToValidate returns true if a pending node is up for validation
func (e *Engine) TimeToValidate() bool {
e.Lock()
e.healthy = state
// if engine is healthy, clear failureCount
if state {
e.failureCount = 0
defer e.Unlock()
if e.state != statePending {
return false
}
e.Unlock()
sinceLastUpdate := time.Since(e.updatedAt)
// Increase check interval for a pending engine according to failureCount and cap it at 4 hours
if sinceLastUpdate > 4*time.Hour || sinceLastUpdate > time.Duration(e.failureCount)*30*time.Second {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit but I feel like 4*time.Hour and 30*time.Second should be constants here.

Suggestions:

validationLimit -> 4*time.hour
retryFactor -> 30*time.Second

return true
}
return false
}

// ValidationComplete transitions engine state from statePending to stateHealthy
func (e *Engine) ValidationComplete() {
e.Lock()
defer e.Unlock()
if e.state != statePending {
return
}
e.state = stateHealthy
e.failureCount = 0
go e.refreshLoop()
}

// setErrMsg sets error message for the engine
func (e *Engine) setErrMsg(errMsg string) {
e.Lock()
defer e.Unlock()
e.lastError = errMsg
e.updatedAt = time.Now()
}

// ErrMsg returns error message for the engine
func (e *Engine) ErrMsg() string {
e.RLock()
defer e.RUnlock()
return e.lastError
}

// HandleIDConflict handles ID duplicate with existing engine
func (e *Engine) HandleIDConflict(otherAddr string) {
e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr))
}

// Status returns the health status of the Engine: Healthy or Unhealthy
func (e *Engine) Status() string {
if e.healthy {
return "Healthy"
}
return "Unhealthy"
e.RLock()
defer e.RUnlock()
return stateText[e.state]
}

// incFailureCount increases engine's failure count, and set engine as unhealthy if threshold is crossed
func (e *Engine) incFailureCount() {
e.Lock()
defer e.Unlock()
e.failureCount++
if e.healthy && e.failureCount >= e.opts.FailureRetry {
e.healthy = false
if e.state == stateHealthy && e.failureCount >= e.opts.FailureRetry {
e.state = stateUnhealthy
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as unhealthy. Connect failed %d times", e.failureCount)
e.emitEvent("engine_disconnect")
}
e.Unlock()
}

// CheckConnectionErr checks error from client response and adjust engine healthy indicators
// UpdatedAt returns the previous updatedAt time
func (e *Engine) UpdatedAt() time.Time {
e.RLock()
defer e.RUnlock()
return e.updatedAt
}

func (e *Engine) resetFailureCount() {
e.Lock()
defer e.Unlock()
e.failureCount = 0
}

// CheckConnectionErr checks error from client response and adjusts engine healthy indicators
func (e *Engine) CheckConnectionErr(err error) {
if err == nil {
e.setHealthy(true)
e.setErrMsg("")
e.resetFailureCount()
// If current state is unhealthy, change it to healthy
if e.state == stateUnhealthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
e.emitEvent("engine_reconnect")
e.setState(stateHealthy)
}
return
}

// update engine error message
e.setErrMsg(err.Error())

// dockerclient defines ErrConnectionRefused error. but if http client is from swarm, it's not using
// dockerclient. We need string matching for these cases. Remove the first character to deal with
// case sensitive issue
Expand All @@ -235,7 +325,7 @@ func (e *Engine) CheckConnectionErr(err error) {
e.incFailureCount()
return
}
// other errors may be ambiguous. let refresh loop decide healthy or not.
// other errors may be ambiguous.
}

// Gather engine specs (CPU, memory, constraints, ...).
Expand Down Expand Up @@ -264,7 +354,19 @@ func (e *Engine) updateSpecs() error {
return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade to at least %s", e.Addr, minSupportedVersion)
}

e.ID = info.ID
e.Lock()
defer e.Unlock()
// Swarm/docker identifies engine by ID. Updating ID but not updating cluster
// index will put the cluster into inconsistent state. If this happens, the
// engine should be put to pending state for re-validation.
if e.ID == "" {
e.ID = info.ID
} else if e.ID != info.ID {
e.state = statePending
message := fmt.Sprintf("Engine (ID: %s, Addr: %s) shows up with another ID:%s. Please remove it from cluster, it can be added back.", e.ID, e.Addr, info.ID)
e.lastError = message
return fmt.Errorf(message)
}
e.Name = info.Name
e.Cpus = info.NCPU
e.Memory = info.MemTotal
Expand Down Expand Up @@ -368,9 +470,7 @@ func (e *Engine) RefreshVolumes() error {
// FIXME: unexport this method after mesos scheduler stops using it directly
func (e *Engine) RefreshContainers(full bool) error {
containers, err := e.client.ListContainers(true, false, "")
// e.CheckConnectionErr(err) is not appropriate here because refresh loop uses
// RefreshContainers function to fail/recover an engine. Adding CheckConnectionErr
// here would result in double count
e.CheckConnectionErr(err)
if err != nil {
return err
}
Expand All @@ -389,7 +489,6 @@ func (e *Engine) RefreshContainers(full bool) error {
defer e.Unlock()
e.containers = merged

log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Updated engine state")
return nil
}

Expand Down Expand Up @@ -469,6 +568,7 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string
return containers, nil
}

// refreshLoop periodically triggers engine refresh.
func (e *Engine) refreshLoop() {

for {
Expand All @@ -481,33 +581,24 @@ func (e *Engine) refreshLoop() {
return
}

if !e.IsHealthy() {
if err = e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
continue
}
e.client.StopAllMonitorEvents()
e.client.StartMonitorEvents(e.handler, nil)
}

err = e.RefreshContainers(false)
if err == nil {
// Do not check error as older daemon don't support this call
e.RefreshVolumes()
e.RefreshNetworks()
err = e.RefreshImages()
}

if err != nil {
e.failureCount++
if e.failureCount >= e.opts.FailureRetry && e.healthy {
e.emitEvent("engine_disconnect")
e.setHealthy(false)
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed %d times: %v", e.failureCount, err)
}
e.RefreshImages()
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine update succeeded")
} else {
if !e.healthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
if err := e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
continue
}
e.client.StopAllMonitorEvents()
e.client.StartMonitorEvents(e.handler, nil)
e.emitEvent("engine_reconnect")
}
e.setHealthy(true)
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine refresh failed")
}
}
}
Expand Down
Loading