Skip to content

Commit 50f82a9

Browse files
authored
Merge pull request #1748 from LK4D4/raft_transport
raft: transport package
2 parents b1d5abd + 79a5679 commit 50f82a9

File tree

13 files changed

+1467
-542
lines changed

13 files changed

+1467
-542
lines changed

log/context.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,21 @@ func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
2929
return context.WithValue(ctx, loggerKey{}, logger)
3030
}
3131

32+
// WithFields returns a new context with added fields to logger.
33+
func WithFields(ctx context.Context, fields logrus.Fields) context.Context {
34+
logger := ctx.Value(loggerKey{})
35+
36+
if logger == nil {
37+
logger = L
38+
}
39+
return WithLogger(ctx, logger.(*logrus.Entry).WithFields(fields))
40+
}
41+
42+
// WithField is convenience wrapper around WithFields.
43+
func WithField(ctx context.Context, key, value string) context.Context {
44+
return WithFields(ctx, logrus.Fields{key: value})
45+
}
46+
3247
// GetLogger retrieves the current logger from the context. If no logger is
3348
// available, the default logger is returned.
3449
func GetLogger(ctx context.Context) *logrus.Entry {

manager/controlapi/node_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,9 +373,6 @@ func TestListManagerNodes(t *testing.T) {
373373
return nil
374374
}))
375375

376-
// Switch the raft node used by the server
377-
ts.Server.raft = nodes[2].Node
378-
379376
// Stop node 1 (leader)
380377
nodes[1].Server.Stop()
381378
nodes[1].ShutdownRaft()
@@ -390,6 +387,16 @@ func TestListManagerNodes(t *testing.T) {
390387
// Wait for the re-election to occur
391388
raftutils.WaitForCluster(t, clockSource, newCluster)
392389

390+
var leaderNode *raftutils.TestNode
391+
for _, node := range newCluster {
392+
if node.IsLeader() {
393+
leaderNode = node
394+
}
395+
}
396+
397+
// Switch the raft node used by the server
398+
ts.Server.raft = leaderNode.Node
399+
393400
// Node 1 should not be the leader anymore
394401
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
395402
r, err = ts.Client.ListNodes(context.Background(), &api.ListNodesRequest{})

manager/state/raft/membership/cluster.go

Lines changed: 32 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@ package membership
22

33
import (
44
"errors"
5-
"fmt"
65
"sync"
76

8-
"google.golang.org/grpc"
9-
107
"github.com/coreos/etcd/raft/raftpb"
118
"github.com/docker/swarmkit/api"
129
"github.com/docker/swarmkit/watch"
1310
"github.com/gogo/protobuf/proto"
14-
"golang.org/x/net/context"
1511
)
1612

1713
var (
@@ -25,101 +21,39 @@ var (
2521
ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode")
2622
// ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change
2723
ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
24+
// ErrMemberRemoved is thrown when a node was removed from the cluster
25+
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
2826
)
2927

30-
// deferredConn used to store removed members connection for some time.
31-
// We need this in case if removed node is redirector or endpoint of ControlAPI call.
32-
type deferredConn struct {
33-
tick int
34-
conn *grpc.ClientConn
35-
}
36-
3728
// Cluster represents a set of active
3829
// raft Members
3930
type Cluster struct {
40-
mu sync.RWMutex
41-
members map[uint64]*Member
42-
deferedConns map[*deferredConn]struct{}
31+
mu sync.RWMutex
32+
members map[uint64]*Member
4333

4434
// removed contains the list of removed Members,
4535
// those ids cannot be reused
46-
removed map[uint64]bool
47-
heartbeatTicks int
36+
removed map[uint64]bool
4837

4938
PeersBroadcast *watch.Queue
5039
}
5140

5241
// Member represents a raft Cluster Member
5342
type Member struct {
5443
*api.RaftMember
55-
56-
Conn *grpc.ClientConn
57-
tick int
58-
active bool
59-
lastSeenHost string
60-
}
61-
62-
// HealthCheck sends a health check RPC to the member and returns the response.
63-
func (member *Member) HealthCheck(ctx context.Context) error {
64-
healthClient := api.NewHealthClient(member.Conn)
65-
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
66-
if err != nil {
67-
return err
68-
}
69-
if resp.Status != api.HealthCheckResponse_SERVING {
70-
return fmt.Errorf("health check returned status %s", resp.Status.String())
71-
}
72-
return nil
7344
}
7445

7546
// NewCluster creates a new Cluster neighbors list for a raft Member.
76-
// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
77-
func NewCluster(heartbeatTicks int) *Cluster {
47+
func NewCluster() *Cluster {
7848
// TODO(abronan): generate Cluster ID for federation
7949

8050
return &Cluster{
8151
members: make(map[uint64]*Member),
8252
removed: make(map[uint64]bool),
83-
deferedConns: make(map[*deferredConn]struct{}),
84-
heartbeatTicks: heartbeatTicks,
8553
PeersBroadcast: watch.NewQueue(),
8654
}
8755
}
8856

89-
func (c *Cluster) handleInactive() {
90-
for _, m := range c.members {
91-
if !m.active {
92-
continue
93-
}
94-
m.tick++
95-
if m.tick > c.heartbeatTicks {
96-
m.active = false
97-
if m.Conn != nil {
98-
m.Conn.Close()
99-
}
100-
}
101-
}
102-
}
103-
104-
func (c *Cluster) handleDeferredConns() {
105-
for dc := range c.deferedConns {
106-
dc.tick++
107-
if dc.tick > c.heartbeatTicks {
108-
dc.conn.Close()
109-
delete(c.deferedConns, dc)
110-
}
111-
}
112-
}
113-
114-
// Tick increases ticks for all members. After heartbeatTicks node marked as
115-
// inactive.
116-
func (c *Cluster) Tick() {
117-
c.mu.Lock()
118-
defer c.mu.Unlock()
119-
c.handleInactive()
120-
c.handleDeferredConns()
121-
}
122-
12357
// Members returns the list of raft Members in the Cluster.
12458
func (c *Cluster) Members() map[uint64]*Member {
12559
members := make(map[uint64]*Member)
@@ -168,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error {
168102
if c.removed[member.RaftID] {
169103
return ErrIDRemoved
170104
}
171-
member.active = true
172-
member.tick = 0
173105

174106
c.members[member.RaftID] = member
175107

@@ -187,55 +119,47 @@ func (c *Cluster) RemoveMember(id uint64) error {
187119
return c.clearMember(id)
188120
}
189121

190-
// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
191-
// to the removed list.
192-
func (c *Cluster) ClearMember(id uint64) error {
122+
// UpdateMember updates member address.
123+
func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error {
193124
c.mu.Lock()
194125
defer c.mu.Unlock()
195126

196-
return c.clearMember(id)
197-
}
198-
199-
func (c *Cluster) clearMember(id uint64) error {
200-
m, ok := c.members[id]
201-
if ok {
202-
if m.Conn != nil {
203-
// defer connection close to after heartbeatTicks
204-
dConn := &deferredConn{conn: m.Conn}
205-
c.deferedConns[dConn] = struct{}{}
206-
}
207-
delete(c.members, id)
127+
if c.removed[id] {
128+
return ErrIDRemoved
208129
}
209-
c.broadcastUpdate()
210-
return nil
211-
}
212-
213-
// ReplaceMemberConnection replaces the member's GRPC connection.
214-
func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error {
215-
c.mu.Lock()
216-
defer c.mu.Unlock()
217130

218131
oldMember, ok := c.members[id]
219132
if !ok {
220133
return ErrIDNotFound
221134
}
222135

223-
if !force && oldConn.Conn != oldMember.Conn {
224-
// The connection was already replaced. Don't do it again.
225-
newConn.Conn.Close()
226-
return nil
136+
if oldMember.NodeID != m.NodeID {
137+
// Should never happen; this is a sanity check
138+
return errors.New("node ID mismatch match on node update")
227139
}
228140

229-
if oldMember.Conn != nil {
230-
oldMember.Conn.Close()
141+
if oldMember.Addr == m.Addr {
142+
// nothing to do
143+
return nil
231144
}
145+
oldMember.RaftMember = m
146+
return nil
147+
}
148+
149+
// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
150+
// to the removed list.
151+
func (c *Cluster) ClearMember(id uint64) error {
152+
c.mu.Lock()
153+
defer c.mu.Unlock()
232154

233-
newMember := *oldMember
234-
newMember.RaftMember = oldMember.RaftMember.Copy()
235-
newMember.RaftMember.Addr = newAddr
236-
newMember.Conn = newConn.Conn
237-
c.members[id] = &newMember
155+
return c.clearMember(id)
156+
}
238157

158+
func (c *Cluster) clearMember(id uint64) error {
159+
if _, ok := c.members[id]; ok {
160+
delete(c.members, id)
161+
c.broadcastUpdate()
162+
}
239163
return nil
240164
}
241165

@@ -249,60 +173,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
249173
// Clear resets the list of active Members and removed Members.
250174
func (c *Cluster) Clear() {
251175
c.mu.Lock()
252-
for _, member := range c.members {
253-
if member.Conn != nil {
254-
member.Conn.Close()
255-
}
256-
}
257-
258-
for dc := range c.deferedConns {
259-
dc.conn.Close()
260-
}
261176

262177
c.members = make(map[uint64]*Member)
263178
c.removed = make(map[uint64]bool)
264-
c.deferedConns = make(map[*deferredConn]struct{})
265179
c.mu.Unlock()
266180
}
267181

268-
// ReportActive reports that member is active (called ProcessRaftMessage),
269-
func (c *Cluster) ReportActive(id uint64, sourceHost string) {
270-
c.mu.Lock()
271-
defer c.mu.Unlock()
272-
m, ok := c.members[id]
273-
if !ok {
274-
return
275-
}
276-
m.tick = 0
277-
m.active = true
278-
if sourceHost != "" {
279-
m.lastSeenHost = sourceHost
280-
}
281-
}
282-
283-
// Active returns true if node is active.
284-
func (c *Cluster) Active(id uint64) bool {
285-
c.mu.RLock()
286-
defer c.mu.RUnlock()
287-
m, ok := c.members[id]
288-
if !ok {
289-
return false
290-
}
291-
return m.active
292-
}
293-
294-
// LastSeenHost returns the last observed source address that the specified
295-
// member connected from.
296-
func (c *Cluster) LastSeenHost(id uint64) string {
297-
c.mu.RLock()
298-
defer c.mu.RUnlock()
299-
m, ok := c.members[id]
300-
if ok {
301-
return m.lastSeenHost
302-
}
303-
return ""
304-
}
305-
306182
// ValidateConfigurationChange takes a proposed ConfChange and
307183
// ensures that it is valid.
308184
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
@@ -334,34 +210,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
334210
}
335211
return nil
336212
}
337-
338-
// CanRemoveMember checks if removing a Member would not result in a loss
339-
// of quorum, this check is needed before submitting a configuration change
340-
// that might block or harm the Cluster on Member recovery
341-
func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
342-
members := c.Members()
343-
nreachable := 0 // reachable managers after removal
344-
345-
for _, m := range members {
346-
if m.RaftID == id {
347-
continue
348-
}
349-
350-
// Local node from where the remove is issued
351-
if m.RaftID == from {
352-
nreachable++
353-
continue
354-
}
355-
356-
if c.Active(m.RaftID) {
357-
nreachable++
358-
}
359-
}
360-
361-
nquorum := (len(members)-1)/2 + 1
362-
if nreachable < nquorum {
363-
return false
364-
}
365-
366-
return true
367-
}

manager/state/raft/membership/cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func newTestMember(id uint64) *membership.Member {
4242
}
4343

4444
func newTestCluster(members []*membership.Member, removed []*membership.Member) *membership.Cluster {
45-
c := membership.NewCluster(3)
45+
c := membership.NewCluster()
4646
for _, m := range members {
4747
c.AddMember(m)
4848
}
@@ -79,7 +79,7 @@ func TestClusterMember(t *testing.T) {
7979
}
8080

8181
func TestMembers(t *testing.T) {
82-
cls := membership.NewCluster(1)
82+
cls := membership.NewCluster()
8383
defer cls.Clear()
8484
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 1}})
8585
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 5}})

0 commit comments

Comments
 (0)