Skip to content

Commit ecd52e4

Browse files
author
Aaron Lehmann
committed
raft: Garbage collect WAL files
We currently garbage collect snapshot files (keeping only KeepOldSnapshot outdated snapshots, which defaults to 0). However, we don't garbage collect the WAL files that the snapshots replace. Delete any WALs which are so old that they only contain information that predates the oldest of the snapshots we have retained. This means that by default, we will remove old WALs once they are supplanted by a snapshot. However, if KeepOldSnapshots is set above 0, we will keep whichever WALs are necessary to catch up from the oldest of the retained snapshots. This makes sure that the old snapshots we retain are actually useful, and avoids adding an independent knob for WAL retention that might end up with an inconsistent setting. Also, fix serious brokenness in the the deletion of old snapshots (it was deleting the most recent outdated snapshots, instead of the oldest). Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
1 parent e021d14 commit ecd52e4

File tree

2 files changed

+280
-17
lines changed

2 files changed

+280
-17
lines changed

manager/state/raft/storage.go

Lines changed: 105 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sort"
1111
"strings"
1212

13+
"github.com/coreos/etcd/pkg/fileutil"
1314
"github.com/coreos/etcd/raft"
1415
"github.com/coreos/etcd/raft/raftpb"
1516
"github.com/coreos/etcd/snap"
@@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
8081
}
8182
n.wal, err = wal.Create(n.walDir(), metadata)
8283
if err != nil {
83-
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
84+
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
8485
}
8586

8687
n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
@@ -127,15 +128,15 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
127128
repaired := false
128129
for {
129130
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
130-
return fmt.Errorf("open wal error: %v", err)
131+
return fmt.Errorf("open WAL error: %v", err)
131132
}
132133
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
133134
if err := n.wal.Close(); err != nil {
134135
return err
135136
}
136137
// we can only repair ErrUnexpectedEOF and we never repair twice.
137138
if repaired || err != io.ErrUnexpectedEOF {
138-
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
139+
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
139140
}
140141
if !wal.Repair(n.walDir()) {
141142
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
@@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
157158

158159
var raftNode api.RaftMember
159160
if err := raftNode.Unmarshal(metadata); err != nil {
160-
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
161+
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
161162
}
162163
n.Config.ID = raftNode.RaftID
163164

@@ -274,25 +275,112 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
274275
// This means that if the current snapshot doesn't appear in the
275276
// directory for some strange reason, we won't delete anything, which
276277
// is the safe behavior.
277-
var (
278-
afterCurSnapshot bool
279-
removeErr error
280-
)
278+
curSnapshotIdx := -1
279+
var removeErr error
280+
281281
for i, snapFile := range snapshots {
282-
if afterCurSnapshot {
283-
if uint64(len(snapshots)-i) <= keepOldSnapshots {
284-
return removeErr
285-
}
286-
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
287-
if err != nil && removeErr == nil {
288-
removeErr = err
282+
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
283+
if uint64(i-curSnapshotIdx) > keepOldSnapshots {
284+
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
285+
if err != nil && removeErr == nil {
286+
removeErr = err
287+
}
289288
}
290289
} else if snapFile == curSnapshot {
291-
afterCurSnapshot = true
290+
curSnapshotIdx = i
291+
}
292+
}
293+
294+
if removeErr != nil {
295+
return removeErr
296+
}
297+
298+
// Remove any WAL files that only contain data from before the oldest
299+
// remaining snapshot.
300+
dirents, err = ioutil.ReadDir(n.snapDir())
301+
if err != nil {
302+
return err
303+
}
304+
305+
var remainingSnapshots []string
306+
for _, dirent := range dirents {
307+
if strings.HasSuffix(dirent.Name(), ".snap") {
308+
remainingSnapshots = append(remainingSnapshots, dirent.Name())
309+
}
310+
}
311+
312+
if len(remainingSnapshots) == 0 {
313+
return nil
314+
}
315+
316+
// Sort snapshot filenames in lexical order
317+
sort.Sort(sort.StringSlice(remainingSnapshots))
318+
319+
// Parse index out of snapshot filename
320+
var snapTerm, snapIndex uint64
321+
_, err = fmt.Sscanf(remainingSnapshots[0], "%016x-%016x.snap", &snapTerm, &snapIndex)
322+
if err != nil {
323+
return fmt.Errorf("malformed snapshot filename %s: %v", remainingSnapshots[0], err)
324+
}
325+
326+
// List the WALs
327+
dirents, err = ioutil.ReadDir(n.walDir())
328+
if err != nil {
329+
return err
330+
}
331+
332+
var wals []string
333+
for _, dirent := range dirents {
334+
if strings.HasSuffix(dirent.Name(), ".wal") {
335+
wals = append(wals, dirent.Name())
336+
}
337+
}
338+
339+
// Sort WAL filenames in lexical order
340+
sort.Sort(sort.StringSlice(wals))
341+
342+
found := false
343+
deleteUntil := -1
344+
345+
for i, walName := range wals {
346+
var walSeq, walIndex uint64
347+
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
348+
if err != nil {
349+
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
350+
}
351+
352+
if walIndex >= snapIndex {
353+
deleteUntil = i - 1
354+
found = true
355+
break
292356
}
293357
}
294358

295-
return removeErr
359+
// If all WAL files started with indices below the oldest snapshot's
360+
// index, we can delete all but the newest WAL file.
361+
if !found && len(wals) != 0 {
362+
deleteUntil = len(wals) - 1
363+
}
364+
365+
for i := 0; i < deleteUntil; i++ {
366+
walPath := filepath.Join(n.walDir(), wals[i])
367+
l, err := fileutil.NewLock(walPath)
368+
if err != nil {
369+
continue
370+
}
371+
err = l.TryLock()
372+
if err != nil {
373+
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
374+
}
375+
err = os.Remove(walPath)
376+
l.Unlock()
377+
l.Destroy()
378+
if err != nil {
379+
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
380+
}
381+
}
382+
383+
return nil
296384
}
297385

298386
func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {

manager/state/raft/storage_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55
"io/ioutil"
66
"path/filepath"
77
"testing"
8+
"time"
89

910
"github.com/docker/swarmkit/api"
1011
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
12+
"github.com/docker/swarmkit/manager/state/store"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
15+
"golang.org/x/net/context"
1316
)
1417

1518
func TestRaftSnapshot(t *testing.T) {
@@ -243,3 +246,175 @@ func TestRaftSnapshotRestart(t *testing.T) {
243246
require.NoError(t, err)
244247
raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values)
245248
}
249+
250+
func TestGCWAL(t *testing.T) {
251+
// Additional log entries from cluster setup, leader election
252+
extraLogEntries := 5
253+
// Number of large entries to propose
254+
proposals := 47
255+
256+
// Bring up a 3 node cluster
257+
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
258+
259+
for i := 0; i != proposals; i++ {
260+
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
261+
assert.NoError(t, err, "failed to propose value")
262+
}
263+
264+
time.Sleep(250 * time.Millisecond)
265+
266+
// Snapshot should have been triggered just before the WAL rotated, so
267+
// both WAL files should be preserved
268+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
269+
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
270+
if err != nil {
271+
return err
272+
}
273+
if len(dirents) != 1 {
274+
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
275+
}
276+
277+
dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
278+
if err != nil {
279+
return err
280+
}
281+
if len(dirents) != 2 {
282+
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
283+
}
284+
return nil
285+
}))
286+
287+
raftutils.TeardownCluster(t, nodes)
288+
289+
// Repeat this test, but trigger the snapshot after the WAL has rotated
290+
proposals++
291+
nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
292+
defer raftutils.TeardownCluster(t, nodes)
293+
294+
for i := 0; i != proposals; i++ {
295+
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
296+
assert.NoError(t, err, "failed to propose value")
297+
}
298+
299+
time.Sleep(250 * time.Millisecond)
300+
301+
// This time only one WAL file should be saved.
302+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
303+
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
304+
if err != nil {
305+
return err
306+
}
307+
if len(dirents) != 1 {
308+
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
309+
}
310+
311+
dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
312+
if err != nil {
313+
return err
314+
}
315+
if len(dirents) != 1 {
316+
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
317+
}
318+
return nil
319+
}))
320+
321+
// Restart the whole cluster
322+
for _, node := range nodes {
323+
node.Server.Stop()
324+
node.Shutdown()
325+
}
326+
327+
raftutils.AdvanceTicks(clockSource, 5)
328+
329+
i := 0
330+
for k, node := range nodes {
331+
nodes[k] = raftutils.RestartNode(t, clockSource, node, false)
332+
i++
333+
}
334+
raftutils.WaitForCluster(t, clockSource, nodes)
335+
336+
// Is the data intact after restart?
337+
for _, node := range nodes {
338+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
339+
var err error
340+
node.MemoryStore().View(func(tx store.ReadTx) {
341+
var allNodes []*api.Node
342+
allNodes, err = store.FindNodes(tx, store.All)
343+
if err != nil {
344+
return
345+
}
346+
if len(allNodes) != proposals {
347+
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
348+
return
349+
}
350+
})
351+
return err
352+
}))
353+
}
354+
355+
// It should still be possible to propose values
356+
_, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode")
357+
assert.NoError(t, err, "failed to propose value")
358+
359+
for _, node := range nodes {
360+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
361+
var err error
362+
node.MemoryStore().View(func(tx store.ReadTx) {
363+
var allNodes []*api.Node
364+
allNodes, err = store.FindNodes(tx, store.All)
365+
if err != nil {
366+
return
367+
}
368+
if len(allNodes) != proposals+1 {
369+
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
370+
return
371+
}
372+
})
373+
return err
374+
}))
375+
}
376+
}
377+
378+
// proposeHugeValue proposes a 1.4MB value to a raft test cluster
379+
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
380+
nodeIDStr := "id1"
381+
if len(nodeID) != 0 {
382+
nodeIDStr = nodeID[0]
383+
}
384+
a := make([]byte, 1400000)
385+
for i := 0; i != len(a); i++ {
386+
a[i] = 'a'
387+
}
388+
node := &api.Node{
389+
ID: nodeIDStr,
390+
Spec: api.NodeSpec{
391+
Annotations: api.Annotations{
392+
Name: nodeIDStr,
393+
Labels: map[string]string{
394+
"largestring": string(a),
395+
},
396+
},
397+
},
398+
}
399+
400+
storeActions := []*api.StoreAction{
401+
{
402+
Action: api.StoreActionKindCreate,
403+
Target: &api.StoreAction_Node{
404+
Node: node,
405+
},
406+
},
407+
}
408+
409+
ctx, _ := context.WithTimeout(context.Background(), time)
410+
411+
err := raftNode.ProposeValue(ctx, storeActions, func() {
412+
err := raftNode.MemoryStore().ApplyStoreActions(storeActions)
413+
assert.NoError(t, err, "error applying actions")
414+
})
415+
if err != nil {
416+
return nil, err
417+
}
418+
419+
return node, nil
420+
}

0 commit comments

Comments
 (0)