Skip to content
This repository was archived by the owner on Feb 1, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,14 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
authConfig = &dockerclient.AuthConfig{}
json.Unmarshal(buf, authConfig)
}
containerConfig := cluster.BuildContainerConfig(config)

container, err := c.cluster.CreateContainer(cluster.BuildContainerConfig(config), name, authConfig)
if err := containerConfig.Validate(); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}

container, err := c.cluster.CreateContainer(containerConfig, name, authConfig)
if err != nil {
if strings.HasPrefix(err.Error(), "Conflict") {
httpError(w, err.Error(), http.StatusConflict)
Expand Down
1 change: 1 addition & 0 deletions cli/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,5 +321,6 @@ func manage(c *cli.Context) {
server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
}

cluster.NewWatchdog(cl)
log.Fatal(server.ListenAndServe())
}
3 changes: 3 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Cluster interface {
// Register an event handler for cluster-wide events.
RegisterEventHandler(h EventHandler) error

// Unregister an event handler.
UnregisterEventHandler(h EventHandler)

// FIXME: remove this method
// Return a random engine
RANDOMENGINE() (*Engine, error)
Expand Down
55 changes: 51 additions & 4 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cluster

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/samalba/dockerclient"
Expand Down Expand Up @@ -63,9 +65,10 @@ func consolidateResourceFields(c *dockerclient.ContainerConfig) {
// BuildContainerConfig creates a cluster.ContainerConfig from a dockerclient.ContainerConfig
func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
var (
affinities []string
constraints []string
env []string
affinities []string
constraints []string
reschedulePolicies []string
env []string
)

// only for tests
Expand All @@ -83,12 +86,19 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
json.Unmarshal([]byte(labels), &constraints)
}

// parse affinities/constraints from env (ex. docker run -e affinity:container==redis -e affinity:image==nginx -e constraint:region==us-east -e constraint:storage==ssd)
// parse reschedule policy from labels (ex. docker run --label 'com.docker.swarm.reschedule-policies=on-node-failure')
if labels, ok := c.Labels[SwarmLabelNamespace+".reschedule-policies"]; ok {
json.Unmarshal([]byte(labels), &reschedulePolicies)
}

// parse affinities/constraints/reschedule policies from env (ex. docker run -e affinity:container==redis -e affinity:image==nginx -e constraint:region==us-east -e constraint:storage==ssd -e reschedule:off)
for _, e := range c.Env {
if ok, key, value := parseEnv(e); ok && key == "affinity" {
affinities = append(affinities, value)
} else if ok && key == "constraint" {
constraints = append(constraints, value)
} else if ok && key == "reschedule" {
reschedulePolicies = append(reschedulePolicies, value)
} else {
env = append(env, e)
}
Expand All @@ -111,6 +121,13 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
}
}

// store reschedule policies in labels
if len(reschedulePolicies) > 0 {
if labels, err := json.Marshal(reschedulePolicies); err == nil {
c.Labels[SwarmLabelNamespace+".reschedule-policies"] = string(labels)
}
}

consolidateResourceFields(&c)

return &ContainerConfig{c}
Expand Down Expand Up @@ -186,3 +203,33 @@ func (c *ContainerConfig) HaveNodeConstraint() bool {
}
return false
}

// HasReschedulePolicy returns true if the specified policy is part of the config
func (c *ContainerConfig) HasReschedulePolicy(p string) bool {
for _, reschedulePolicy := range c.extractExprs("reschedule-policies") {
if reschedulePolicy == p {
return true
}
}
return false
}

// Validate returns an error if the config isn't valid
func (c *ContainerConfig) Validate() error {
//TODO: add validation for affinities and constraints
reschedulePolicies := c.extractExprs("reschedule-policies")
if len(reschedulePolicies) > 1 {
return errors.New("too many reschedule policies")
} else if len(reschedulePolicies) == 1 {
valid := false
for _, validReschedulePolicy := range []string{"off", "on-node-failure"} {
if reschedulePolicies[0] == validReschedulePolicy {
valid = true
}
}
if !valid {
return fmt.Errorf("invalid reschedule policy: %s", reschedulePolicies[0])
}
}
return nil
}
5 changes: 5 additions & 0 deletions cluster/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func (c *Container) Refresh() (*Container, error) {
return c.Engine.refreshContainer(c.Id, true)
}

// Start a container
func (c *Container) Start() error {
return c.Engine.client.StartContainer(c.Id, nil)
}

// Containers represents a list a containers
type Containers []*Container

Expand Down
54 changes: 53 additions & 1 deletion cluster/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package cluster

import "github.com/samalba/dockerclient"
import (
"errors"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/samalba/dockerclient"
)

// Event is exported
type Event struct {
Expand All @@ -12,3 +18,49 @@ type Event struct {
type EventHandler interface {
Handle(*Event) error
}

// EventHandlers is a map of EventHandler
type EventHandlers struct {
sync.RWMutex

eventHandlers map[EventHandler]struct{}
}

// NewEventHandlers returns a EventHandlers
func NewEventHandlers() *EventHandlers {
return &EventHandlers{
eventHandlers: make(map[EventHandler]struct{}),
}
}

// Handle callbacks for the events
func (eh *EventHandlers) Handle(e *Event) {
eh.RLock()
defer eh.RUnlock()

for h := range eh.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
}

// RegisterEventHandler registers an event handler.
func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error {
eh.Lock()
defer eh.Unlock()

if _, ok := eh.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
eh.eventHandlers[h] = struct{}{}
return nil
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (eh *EventHandlers) UnregisterEventHandler(h EventHandler) {
eh.Lock()
defer eh.Unlock()

delete(eh.eventHandlers, h)
}
21 changes: 9 additions & 12 deletions cluster/mesos/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Cluster struct {

driver *mesosscheduler.MesosSchedulerDriver
dockerEnginePort string
eventHandler cluster.EventHandler
eventHandlers *cluster.EventHandlers
master string
agents map[string]*agent
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
}
cluster := &Cluster{
dockerEnginePort: defaultDockerEnginePort,
eventHandlers: cluster.NewEventHandlers(),
master: master,
agents: make(map[string]*agent),
scheduler: scheduler,
Expand Down Expand Up @@ -156,22 +157,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st

// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
}
c.eventHandlers.Handle(e)
return nil
}

// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
return errors.New("event handler already set")
}
c.eventHandler = h
return nil
return c.eventHandlers.RegisterEventHandler(h)
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.eventHandlers.UnregisterEventHandler(h)
}

// CreateContainer for container creation in Mesos task
Expand Down
30 changes: 15 additions & 15 deletions cluster/swarm/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p *pendingContainer) ToContainer() *cluster.Container {
type Cluster struct {
sync.RWMutex

eventHandler cluster.EventHandler
eventHandlers *cluster.EventHandlers
engines map[string]*cluster.Engine
pendingEngines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
Expand All @@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")

cluster := &Cluster{
eventHandlers: cluster.NewEventHandlers(),
engines: make(map[string]*cluster.Engine),
pendingEngines: make(map[string]*cluster.Engine),
scheduler: scheduler,
Expand All @@ -90,22 +91,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery

// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
}
c.eventHandlers.Handle(e)
return nil
}

// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
return errors.New("event handler already set")
}
c.eventHandler = h
return nil
return c.eventHandlers.RegisterEventHandler(h)
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.eventHandlers.UnregisterEventHandler(h)
}

// Generate a globally (across the cluster) unique ID.
Expand Down Expand Up @@ -145,9 +142,12 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
return nil, fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", name, name)
}

// Associate a Swarm ID to the container we are creating.
swarmID := c.generateUniqueID()
config.SetSwarmID(swarmID)
swarmID := config.SwarmID()
if swarmID == "" {
// Associate a Swarm ID to the container we are creating.
swarmID = c.generateUniqueID()
config.SetSwarmID(swarmID)
}

if withImageAffinity {
config.AddAffinity("image==" + config.Image)
Expand Down
Loading