Skip to content

Commit 746ce62

Browse files
committed
Vendoring swarmkit for TCP hack
Also Cherry-pick moby/swarmkit#1651 to identify the issue in allocator. Signed-off-by: Madhu Venugopal <madhu@docker.com>
1 parent bc8a8c0 commit 746ce62

File tree

7 files changed

+64
-47
lines changed

7 files changed

+64
-47
lines changed

daemon/cluster/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ func (c *Cluster) startNewNode(forceNewCluster bool, localAddr, remoteAddr, list
282282
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
283283
Hostname: c.config.Name,
284284
ForceNewCluster: forceNewCluster,
285-
ListenControlAPI: filepath.Join(c.runtimeRoot, controlSocket),
285+
ListenControlAPI: "127.0.0.1:1234",
286286
ListenRemoteAPI: listenAddr,
287287
AdvertiseRemoteAPI: advertiseAddr,
288288
JoinAddr: joinAddr,

hack/vendor.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
143143
clone git github.com/docker/containerd 837e8c5e1cad013ed57f5c2090c8591c10cbbdae
144144

145145
# cluster
146-
clone git github.com/docker/swarmkit 7e63bdefb94e5bea2641e8bdebae2cfa61a0ed44
146+
clone git github.com/docker/swarmkit 5fc74cad8d204ef9818d3ef3b78d8ee962f2f852 https://github.com/mavenugo/swarmkit
147147
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
148148
clone git github.com/gogo/protobuf v0.3
149149
clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

vendor/src/github.com/docker/swarmkit/agent/node.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"crypto/tls"
55
"encoding/json"
66
"io/ioutil"
7-
"net"
87
"os"
98
"path/filepath"
109
"reflect"
@@ -527,10 +526,6 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
527526
// Using listen address instead of advertised address because this is a
528527
// local connection.
529528
addr := n.config.ListenControlAPI
530-
opts = append(opts, grpc.WithDialer(
531-
func(addr string, timeout time.Duration) (net.Conn, error) {
532-
return net.DialTimeout("unix", addr, timeout)
533-
}))
534529
conn, err := grpc.Dial(addr, opts...)
535530
if err != nil {
536531
return err
@@ -595,7 +590,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
595590
ForceNewCluster: n.config.ForceNewCluster,
596591
ProtoAddr: map[string]string{
597592
"tcp": n.config.ListenRemoteAPI,
598-
"unix": n.config.ListenControlAPI,
593+
"tcp2": n.config.ListenControlAPI,
599594
},
600595
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
601596
SecurityConfig: securityConfig,

vendor/src/github.com/docker/swarmkit/manager/allocator/allocator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ func (a *Allocator) Run(ctx context.Context) error {
125125
aaCopy := aa
126126
actor := func() error {
127127
wg.Add(1)
128+
defer wg.Done()
129+
128130
// init might return an allocator specific context
129131
// which is a child of the passed in context to hold
130132
// allocator specific state
@@ -133,10 +135,10 @@ func (a *Allocator) Run(ctx context.Context) error {
133135
// if we are failing in the init of
134136
// this allocator.
135137
aa.cancel()
136-
wg.Done()
137138
return err
138139
}
139140

141+
wg.Add(1)
140142
go func() {
141143
defer wg.Done()
142144
a.run(ctx, aaCopy)

vendor/src/github.com/docker/swarmkit/manager/allocator/network.go

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type networkContext struct {
6868
unallocatedNetworks map[string]*api.Network
6969
}
7070

71-
func (a *Allocator) doNetworkInit(ctx context.Context) error {
71+
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
7272
na, err := networkallocator.New()
7373
if err != nil {
7474
return err
@@ -81,6 +81,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
8181
unallocatedNetworks: make(map[string]*api.Network),
8282
ingressNetwork: newIngressNetwork(),
8383
}
84+
a.netCtx = nc
85+
defer func() {
86+
// Clear a.netCtx if initialization was unsuccessful.
87+
if err != nil {
88+
a.netCtx = nil
89+
}
90+
}()
8491

8592
// Check if we have the ingress network. If not found create
8693
// it before reading all network objects for allocation.
@@ -125,7 +132,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
125132
// that the we can get the preferred subnet for ingress
126133
// network.
127134
if !na.IsAllocated(nc.ingressNetwork) {
128-
if err := a.allocateNetwork(ctx, nc, nc.ingressNetwork); err != nil {
135+
if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
129136
log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
130137
}
131138

@@ -155,7 +162,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
155162
continue
156163
}
157164

158-
if err := a.allocateNetwork(ctx, nc, n); err != nil {
165+
if err := a.allocateNetwork(ctx, n); err != nil {
159166
log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
160167
}
161168
}
@@ -179,7 +186,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
179186
}
180187

181188
node.Attachment.Network = nc.ingressNetwork.Copy()
182-
if err := a.allocateNode(ctx, nc, node); err != nil {
189+
if err := a.allocateNode(ctx, node); err != nil {
183190
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID)
184191
}
185192
}
@@ -198,7 +205,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
198205
continue
199206
}
200207

201-
if err := a.allocateService(ctx, nc, s); err != nil {
208+
if err := a.allocateService(ctx, s); err != nil {
202209
log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
203210
}
204211
}
@@ -260,7 +267,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
260267
}
261268

262269
err := batch.Update(func(tx store.Tx) error {
263-
_, err := a.allocateTask(ctx, nc, tx, t)
270+
_, err := a.allocateTask(ctx, tx, t)
264271
return err
265272
})
266273
if err != nil {
@@ -274,7 +281,6 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
274281
return err
275282
}
276283

277-
a.netCtx = nc
278284
return nil
279285
}
280286

@@ -288,7 +294,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
288294
break
289295
}
290296

291-
if err := a.allocateNetwork(ctx, nc, n); err != nil {
297+
if err := a.allocateNetwork(ctx, n); err != nil {
292298
log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
293299
break
294300
}
@@ -309,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
309315
break
310316
}
311317

312-
if err := a.allocateService(ctx, nc, s); err != nil {
318+
if err := a.allocateService(ctx, s); err != nil {
313319
log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
314320
break
315321
}
@@ -320,7 +326,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
320326
break
321327
}
322328

323-
if err := a.allocateService(ctx, nc, s); err != nil {
329+
if err := a.allocateService(ctx, s); err != nil {
324330
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
325331
break
326332
}
@@ -335,18 +341,18 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
335341
// it's still there.
336342
delete(nc.unallocatedServices, s.ID)
337343
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
338-
a.doNodeAlloc(ctx, nc, ev)
344+
a.doNodeAlloc(ctx, ev)
339345
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
340-
a.doTaskAlloc(ctx, nc, ev)
346+
a.doTaskAlloc(ctx, ev)
341347
case state.EventCommit:
342-
a.procUnallocatedNetworks(ctx, nc)
343-
a.procUnallocatedServices(ctx, nc)
344-
a.procUnallocatedTasksNetwork(ctx, nc)
348+
a.procUnallocatedNetworks(ctx)
349+
a.procUnallocatedServices(ctx)
350+
a.procUnallocatedTasksNetwork(ctx)
345351
return
346352
}
347353
}
348354

349-
func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
355+
func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
350356
var (
351357
isDelete bool
352358
node *api.Node
@@ -362,6 +368,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
362368
node = v.Node.Copy()
363369
}
364370

371+
nc := a.netCtx
372+
365373
if isDelete {
366374
if nc.nwkAllocator.IsNodeAllocated(node) {
367375
if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
@@ -377,7 +385,7 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
377385
}
378386

379387
node.Attachment.Network = nc.ingressNetwork.Copy()
380-
if err := a.allocateNode(ctx, nc, node); err != nil {
388+
if err := a.allocateNode(ctx, node); err != nil {
381389
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
382390
}
383391
}
@@ -464,7 +472,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
464472
taskUpdateNetworks(t, networks)
465473
}
466474

467-
func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
475+
func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
468476
var (
469477
isDelete bool
470478
t *api.Task
@@ -480,6 +488,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
480488
t = v.Task.Copy()
481489
}
482490

491+
nc := a.netCtx
492+
483493
// If the task has stopped running or it's being deleted then
484494
// we should free the network resources associated with the
485495
// task right away.
@@ -530,7 +540,9 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
530540
nc.unallocatedTasks[t.ID] = t
531541
}
532542

533-
func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *api.Node) error {
543+
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
544+
nc := a.netCtx
545+
534546
if err := nc.nwkAllocator.AllocateNode(node); err != nil {
535547
return err
536548
}
@@ -563,7 +575,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *
563575
return nil
564576
}
565577

566-
func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error {
578+
func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
579+
nc := a.netCtx
580+
567581
if s.Spec.Endpoint != nil {
568582
// service has user-defined endpoint
569583
if s.Endpoint == nil {
@@ -648,7 +662,9 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
648662
return nil
649663
}
650664

651-
func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *api.Network) error {
665+
func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
666+
nc := a.netCtx
667+
652668
if err := nc.nwkAllocator.Allocate(n); err != nil {
653669
nc.unallocatedNetworks[n.ID] = n
654670
return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
@@ -670,7 +686,7 @@ func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *
670686
return nil
671687
}
672688

673-
func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx store.Tx, t *api.Task) (*api.Task, error) {
689+
func (a *Allocator) allocateTask(ctx context.Context, tx store.Tx, t *api.Task) (*api.Task, error) {
674690
taskUpdated := false
675691

676692
// Get the latest task state from the store before updating.
@@ -679,6 +695,8 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
679695
return nil, fmt.Errorf("could not find task %s while trying to update network allocation", t.ID)
680696
}
681697

698+
nc := a.netCtx
699+
682700
// We might be here even if a task allocation has already
683701
// happened but wasn't successfully committed to store. In such
684702
// cases skip allocation and go straight ahead to updating the
@@ -738,10 +756,11 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
738756
return storeT, nil
739757
}
740758

741-
func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkContext) {
759+
func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
760+
nc := a.netCtx
742761
for _, n := range nc.unallocatedNetworks {
743762
if !nc.nwkAllocator.IsAllocated(n) {
744-
if err := a.allocateNetwork(ctx, nc, n); err != nil {
763+
if err := a.allocateNetwork(ctx, n); err != nil {
745764
log.G(ctx).Debugf("Failed allocation of unallocated network %s: %v", n.ID, err)
746765
continue
747766
}
@@ -751,10 +770,11 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
751770
}
752771
}
753772

754-
func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
773+
func (a *Allocator) procUnallocatedServices(ctx context.Context) {
774+
nc := a.netCtx
755775
for _, s := range nc.unallocatedServices {
756776
if !nc.nwkAllocator.IsServiceAllocated(s) {
757-
if err := a.allocateService(ctx, nc, s); err != nil {
777+
if err := a.allocateService(ctx, s); err != nil {
758778
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
759779
continue
760780
}
@@ -764,15 +784,16 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkCont
764784
}
765785
}
766786

767-
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *networkContext) {
787+
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
788+
nc := a.netCtx
768789
tasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
769790

770791
committed, err := a.store.Batch(func(batch *store.Batch) error {
771792
for _, t := range nc.unallocatedTasks {
772793
var allocatedT *api.Task
773794
err := batch.Update(func(tx store.Tx) error {
774795
var err error
775-
allocatedT, err = a.allocateTask(ctx, nc, tx, t)
796+
allocatedT, err = a.allocateTask(ctx, tx, t)
776797
return err
777798
})
778799

vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ func (na *NetworkAllocator) allocatePools(n *api.Network) (map[string]string, er
738738
ic.Subnet = poolIP.String()
739739
}
740740

741-
if ic.Gateway == "" {
741+
if ic.Gateway == "" && gwIP != nil {
742742
ic.Gateway = gwIP.IP.String()
743743
}
744744

vendor/src/github.com/docker/swarmkit/manager/manager.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,7 @@ func New(config *Config) (*Manager, error) {
141141
tcpAddr = net.JoinHostPort("0.0.0.0", tcpAddrPort)
142142
}
143143

144-
err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)
145-
if err != nil {
146-
return nil, errors.Wrap(err, "failed to create socket directory")
147-
}
148-
149-
err = os.MkdirAll(config.StateDir, 0700)
144+
err := os.MkdirAll(config.StateDir, 0700)
150145
if err != nil {
151146
return nil, errors.Wrap(err, "failed to create state directory")
152147
}
@@ -164,7 +159,11 @@ func New(config *Config) (*Manager, error) {
164159
listeners = make(map[string]net.Listener)
165160

166161
for proto, addr := range config.ProtoAddr {
167-
l, err := net.Listen(proto, addr)
162+
p := proto
163+
if proto == "tcp2" {
164+
p = "tcp"
165+
}
166+
l, err := net.Listen(p, addr)
168167

169168
// A unix socket may fail to bind if the file already
170169
// exists. Try replacing the file.
@@ -549,12 +548,12 @@ func (m *Manager) serveListener(ctx context.Context, errServe chan error, proto
549548
logrus.Fields{
550549
"proto": lis.Addr().Network(),
551550
"addr": lis.Addr().String()}))
552-
if proto == "unix" {
551+
if proto == "tcp2" {
553552
log.G(ctx).Info("Listening for local connections")
554553
// we need to disallow double closes because UnixListener.Close
555554
// can delete unix-socket file of newer listener. grpc calls
556555
// Close twice indeed: in Serve and in Stop.
557-
errServe <- m.localserver.Serve(&closeOnceListener{Listener: lis})
556+
errServe <- m.localserver.Serve(lis)
558557
} else {
559558
log.G(ctx).Info("Listening for connections")
560559
errServe <- m.server.Serve(lis)

0 commit comments

Comments
 (0)