Skip to content

Commit 15a3e1e

Browse files
committed
Merge pull request docker-archive#1578 from aluzzardi/rescheduling
[experimental] Simple container rescheduling on node failure
2 parents 8f41ddd + 3978f9c commit 15a3e1e

File tree

13 files changed

+490
-35
lines changed

13 files changed

+490
-35
lines changed

api/handlers.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,14 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
438438
authConfig = &dockerclient.AuthConfig{}
439439
json.Unmarshal(buf, authConfig)
440440
}
441+
containerConfig := cluster.BuildContainerConfig(config)
441442

442-
container, err := c.cluster.CreateContainer(cluster.BuildContainerConfig(config), name, authConfig)
443+
if err := containerConfig.Validate(); err != nil {
444+
httpError(w, err.Error(), http.StatusInternalServerError)
445+
return
446+
}
447+
448+
container, err := c.cluster.CreateContainer(containerConfig, name, authConfig)
443449
if err != nil {
444450
if strings.HasPrefix(err.Error(), "Conflict") {
445451
httpError(w, err.Error(), http.StatusConflict)

cli/manage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,5 +321,6 @@ func manage(c *cli.Context) {
321321
server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
322322
}
323323

324+
cluster.NewWatchdog(cl)
324325
log.Fatal(server.ListenAndServe())
325326
}

cluster/cluster.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type Cluster interface {
8383
// Register an event handler for cluster-wide events.
8484
RegisterEventHandler(h EventHandler) error
8585

86+
// Unregister an event handler.
87+
UnregisterEventHandler(h EventHandler)
88+
8689
// FIXME: remove this method
8790
// Return a random engine
8891
RANDOMENGINE() (*Engine, error)

cluster/config.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package cluster
22

33
import (
44
"encoding/json"
5+
"errors"
6+
"fmt"
57
"strings"
68

79
"github.com/samalba/dockerclient"
@@ -63,9 +65,10 @@ func consolidateResourceFields(c *dockerclient.ContainerConfig) {
6365
// BuildContainerConfig creates a cluster.ContainerConfig from a dockerclient.ContainerConfig
6466
func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
6567
var (
66-
affinities []string
67-
constraints []string
68-
env []string
68+
affinities []string
69+
constraints []string
70+
reschedulePolicies []string
71+
env []string
6972
)
7073

7174
// only for tests
@@ -83,12 +86,19 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
8386
json.Unmarshal([]byte(labels), &constraints)
8487
}
8588

86-
// parse affinities/constraints from env (ex. docker run -e affinity:container==redis -e affinity:image==nginx -e constraint:region==us-east -e constraint:storage==ssd)
89+
// parse reschedule policy from labels (ex. docker run --label 'com.docker.swarm.reschedule-policies=on-node-failure')
90+
if labels, ok := c.Labels[SwarmLabelNamespace+".reschedule-policies"]; ok {
91+
json.Unmarshal([]byte(labels), &reschedulePolicies)
92+
}
93+
94+
// parse affinities/constraints/reschedule policies from env (ex. docker run -e affinity:container==redis -e affinity:image==nginx -e constraint:region==us-east -e constraint:storage==ssd -e reschedule:off)
8795
for _, e := range c.Env {
8896
if ok, key, value := parseEnv(e); ok && key == "affinity" {
8997
affinities = append(affinities, value)
9098
} else if ok && key == "constraint" {
9199
constraints = append(constraints, value)
100+
} else if ok && key == "reschedule" {
101+
reschedulePolicies = append(reschedulePolicies, value)
92102
} else {
93103
env = append(env, e)
94104
}
@@ -111,6 +121,13 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
111121
}
112122
}
113123

124+
// store reschedule policies in labels
125+
if len(reschedulePolicies) > 0 {
126+
if labels, err := json.Marshal(reschedulePolicies); err == nil {
127+
c.Labels[SwarmLabelNamespace+".reschedule-policies"] = string(labels)
128+
}
129+
}
130+
114131
consolidateResourceFields(&c)
115132

116133
return &ContainerConfig{c}
@@ -186,3 +203,33 @@ func (c *ContainerConfig) HaveNodeConstraint() bool {
186203
}
187204
return false
188205
}
206+
207+
// HasReschedulePolicy returns true if the specified policy is part of the config
208+
func (c *ContainerConfig) HasReschedulePolicy(p string) bool {
209+
for _, reschedulePolicy := range c.extractExprs("reschedule-policies") {
210+
if reschedulePolicy == p {
211+
return true
212+
}
213+
}
214+
return false
215+
}
216+
217+
// Validate returns an error if the config isn't valid
218+
func (c *ContainerConfig) Validate() error {
219+
//TODO: add validation for affinities and constraints
220+
reschedulePolicies := c.extractExprs("reschedule-policies")
221+
if len(reschedulePolicies) > 1 {
222+
return errors.New("too many reschedule policies")
223+
} else if len(reschedulePolicies) == 1 {
224+
valid := false
225+
for _, validReschedulePolicy := range []string{"off", "on-node-failure"} {
226+
if reschedulePolicies[0] == validReschedulePolicy {
227+
valid = true
228+
}
229+
}
230+
if !valid {
231+
return fmt.Errorf("invalid reschedule policy: %s", reschedulePolicies[0])
232+
}
233+
}
234+
return nil
235+
}

cluster/container.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ func (c *Container) Refresh() (*Container, error) {
2121
return c.Engine.refreshContainer(c.Id, true)
2222
}
2323

24+
// Start a container
25+
func (c *Container) Start() error {
26+
return c.Engine.client.StartContainer(c.Id, nil)
27+
}
28+
2429
// Containers represents a list a containers
2530
type Containers []*Container
2631

cluster/event.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
package cluster
22

3-
import "github.com/samalba/dockerclient"
3+
import (
4+
"errors"
5+
"sync"
6+
7+
log "github.com/Sirupsen/logrus"
8+
"github.com/samalba/dockerclient"
9+
)
410

511
// Event is exported
612
type Event struct {
@@ -12,3 +18,49 @@ type Event struct {
1218
type EventHandler interface {
1319
Handle(*Event) error
1420
}
21+
22+
// EventHandlers is a map of EventHandler
23+
type EventHandlers struct {
24+
sync.RWMutex
25+
26+
eventHandlers map[EventHandler]struct{}
27+
}
28+
29+
// NewEventHandlers returns a EventHandlers
30+
func NewEventHandlers() *EventHandlers {
31+
return &EventHandlers{
32+
eventHandlers: make(map[EventHandler]struct{}),
33+
}
34+
}
35+
36+
// Handle callbacks for the events
37+
func (eh *EventHandlers) Handle(e *Event) {
38+
eh.RLock()
39+
defer eh.RUnlock()
40+
41+
for h := range eh.eventHandlers {
42+
if err := h.Handle(e); err != nil {
43+
log.Error(err)
44+
}
45+
}
46+
}
47+
48+
// RegisterEventHandler registers an event handler.
49+
func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error {
50+
eh.Lock()
51+
defer eh.Unlock()
52+
53+
if _, ok := eh.eventHandlers[h]; ok {
54+
return errors.New("event handler already set")
55+
}
56+
eh.eventHandlers[h] = struct{}{}
57+
return nil
58+
}
59+
60+
// UnregisterEventHandler unregisters a previously registered event handler.
61+
func (eh *EventHandlers) UnregisterEventHandler(h EventHandler) {
62+
eh.Lock()
63+
defer eh.Unlock()
64+
65+
delete(eh.eventHandlers, h)
66+
}

cluster/mesos/cluster.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type Cluster struct {
2929

3030
driver *mesosscheduler.MesosSchedulerDriver
3131
dockerEnginePort string
32-
eventHandler cluster.EventHandler
32+
eventHandlers *cluster.EventHandlers
3333
master string
3434
agents map[string]*agent
3535
scheduler *scheduler.Scheduler
@@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
6767
}
6868
cluster := &Cluster{
6969
dockerEnginePort: defaultDockerEnginePort,
70+
eventHandlers: cluster.NewEventHandlers(),
7071
master: master,
7172
agents: make(map[string]*agent),
7273
scheduler: scheduler,
@@ -156,22 +157,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
156157

157158
// Handle callbacks for the events
158159
func (c *Cluster) Handle(e *cluster.Event) error {
159-
if c.eventHandler == nil {
160-
return nil
161-
}
162-
if err := c.eventHandler.Handle(e); err != nil {
163-
log.Error(err)
164-
}
160+
c.eventHandlers.Handle(e)
165161
return nil
166162
}
167163

168164
// RegisterEventHandler registers an event handler.
169165
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
170-
if c.eventHandler != nil {
171-
return errors.New("event handler already set")
172-
}
173-
c.eventHandler = h
174-
return nil
166+
return c.eventHandlers.RegisterEventHandler(h)
167+
}
168+
169+
// UnregisterEventHandler unregisters a previously registered event handler.
170+
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
171+
c.eventHandlers.UnregisterEventHandler(h)
175172
}
176173

177174
// CreateContainer for container creation in Mesos task

cluster/swarm/cluster.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (p *pendingContainer) ToContainer() *cluster.Container {
5050
type Cluster struct {
5151
sync.RWMutex
5252

53-
eventHandler cluster.EventHandler
53+
eventHandlers *cluster.EventHandlers
5454
engines map[string]*cluster.Engine
5555
pendingEngines map[string]*cluster.Engine
5656
scheduler *scheduler.Scheduler
@@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
6767
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
6868

6969
cluster := &Cluster{
70+
eventHandlers: cluster.NewEventHandlers(),
7071
engines: make(map[string]*cluster.Engine),
7172
pendingEngines: make(map[string]*cluster.Engine),
7273
scheduler: scheduler,
@@ -90,22 +91,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
9091

9192
// Handle callbacks for the events
9293
func (c *Cluster) Handle(e *cluster.Event) error {
93-
if c.eventHandler == nil {
94-
return nil
95-
}
96-
if err := c.eventHandler.Handle(e); err != nil {
97-
log.Error(err)
98-
}
94+
c.eventHandlers.Handle(e)
9995
return nil
10096
}
10197

10298
// RegisterEventHandler registers an event handler.
10399
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
104-
if c.eventHandler != nil {
105-
return errors.New("event handler already set")
106-
}
107-
c.eventHandler = h
108-
return nil
100+
return c.eventHandlers.RegisterEventHandler(h)
101+
}
102+
103+
// UnregisterEventHandler unregisters a previously registered event handler.
104+
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
105+
c.eventHandlers.UnregisterEventHandler(h)
109106
}
110107

111108
// Generate a globally (across the cluster) unique ID.
@@ -145,9 +142,12 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
145142
return nil, fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", name, name)
146143
}
147144

148-
// Associate a Swarm ID to the container we are creating.
149-
swarmID := c.generateUniqueID()
150-
config.SetSwarmID(swarmID)
145+
swarmID := config.SwarmID()
146+
if swarmID == "" {
147+
// Associate a Swarm ID to the container we are creating.
148+
swarmID = c.generateUniqueID()
149+
config.SetSwarmID(swarmID)
150+
}
151151

152152
if withImageAffinity {
153153
config.AddAffinity("image==" + config.Image)

0 commit comments

Comments
 (0)