Skip to content

Commit 24ed09e

Browse files
author
Aaron Lehmann
committed
raft: Garbage collect WAL files
We currently garbage collect snapshot files (keeping only KeepOldSnapshot outdated snapshots, which defaults to 0). However, we don't garbage collect the WAL files that the snapshots replace. Delete any WALs which are so old that they only contain information that predates the oldest of the snapshots we have retained. This means that by default, we will remove old WALs once they are supplanted by a snapshot. However, if KeepOldSnapshots is set above 0, we will keep whichever WALs are necessary to catch up from the oldest of the retained snapshots. This makes sure that the old snapshots we retain are actually useful, and avoids adding an independent knob for WAL retention that might end up with an inconsistent setting. Also, fix serious brokenness in the the deletion of old snapshots (it was deleting the most recent outdated snapshots, instead of the oldest). Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
1 parent e021d14 commit 24ed09e

File tree

2 files changed

+285
-17
lines changed

2 files changed

+285
-17
lines changed

manager/state/raft/storage.go

Lines changed: 105 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sort"
1111
"strings"
1212

13+
"github.com/coreos/etcd/pkg/fileutil"
1314
"github.com/coreos/etcd/raft"
1415
"github.com/coreos/etcd/raft/raftpb"
1516
"github.com/coreos/etcd/snap"
@@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
8081
}
8182
n.wal, err = wal.Create(n.walDir(), metadata)
8283
if err != nil {
83-
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
84+
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
8485
}
8586

8687
n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
@@ -127,15 +128,15 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
127128
repaired := false
128129
for {
129130
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
130-
return fmt.Errorf("open wal error: %v", err)
131+
return fmt.Errorf("open WAL error: %v", err)
131132
}
132133
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
133134
if err := n.wal.Close(); err != nil {
134135
return err
135136
}
136137
// we can only repair ErrUnexpectedEOF and we never repair twice.
137138
if repaired || err != io.ErrUnexpectedEOF {
138-
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
139+
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
139140
}
140141
if !wal.Repair(n.walDir()) {
141142
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
@@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
157158

158159
var raftNode api.RaftMember
159160
if err := raftNode.Unmarshal(metadata); err != nil {
160-
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
161+
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
161162
}
162163
n.Config.ID = raftNode.RaftID
163164

@@ -274,25 +275,112 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
274275
// This means that if the current snapshot doesn't appear in the
275276
// directory for some strange reason, we won't delete anything, which
276277
// is the safe behavior.
277-
var (
278-
afterCurSnapshot bool
279-
removeErr error
280-
)
278+
curSnapshotIdx := -1
279+
var removeErr error
280+
281281
for i, snapFile := range snapshots {
282-
if afterCurSnapshot {
283-
if uint64(len(snapshots)-i) <= keepOldSnapshots {
284-
return removeErr
285-
}
286-
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
287-
if err != nil && removeErr == nil {
288-
removeErr = err
282+
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
283+
if uint64(i-curSnapshotIdx) > keepOldSnapshots {
284+
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
285+
if err != nil && removeErr == nil {
286+
removeErr = err
287+
}
289288
}
290289
} else if snapFile == curSnapshot {
291-
afterCurSnapshot = true
290+
curSnapshotIdx = i
291+
}
292+
}
293+
294+
if removeErr != nil {
295+
return removeErr
296+
}
297+
298+
// Remove any WAL files that only contain data from before the oldest
299+
// remaining snapshot.
300+
dirents, err = ioutil.ReadDir(n.snapDir())
301+
if err != nil {
302+
return err
303+
}
304+
305+
var remainingSnapshots []string
306+
for _, dirent := range dirents {
307+
if strings.HasSuffix(dirent.Name(), ".snap") {
308+
remainingSnapshots = append(remainingSnapshots, dirent.Name())
309+
}
310+
}
311+
312+
if len(remainingSnapshots) == 0 {
313+
return nil
314+
}
315+
316+
// Sort snapshot filenames in lexical order
317+
sort.Sort(sort.StringSlice(remainingSnapshots))
318+
319+
// Parse index out of snapshot filename
320+
var snapTerm, snapIndex uint64
321+
_, err = fmt.Sscanf(remainingSnapshots[0], "%016x-%016x.snap", &snapTerm, &snapIndex)
322+
if err != nil {
323+
return fmt.Errorf("malformed snapshot filename %s: %v", remainingSnapshots[0], err)
324+
}
325+
326+
// List the WALs
327+
dirents, err = ioutil.ReadDir(n.walDir())
328+
if err != nil {
329+
return err
330+
}
331+
332+
var wals []string
333+
for _, dirent := range dirents {
334+
if strings.HasSuffix(dirent.Name(), ".wal") {
335+
wals = append(wals, dirent.Name())
336+
}
337+
}
338+
339+
// Sort WAL filenames in lexical order
340+
sort.Sort(sort.StringSlice(wals))
341+
342+
found := false
343+
deleteUntil := -1
344+
345+
for i, walName := range wals {
346+
var walSeq, walIndex uint64
347+
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
348+
if err != nil {
349+
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
350+
}
351+
352+
if walIndex >= snapIndex {
353+
deleteUntil = i - 1
354+
found = true
355+
break
292356
}
293357
}
294358

295-
return removeErr
359+
// If all WAL files started with indices below the oldest snapshot's
360+
// index, we can delete all but the newest WAL file.
361+
if !found && len(wals) != 0 {
362+
deleteUntil = len(wals) - 1
363+
}
364+
365+
for i := 0; i < deleteUntil; i++ {
366+
walPath := filepath.Join(n.walDir(), wals[i])
367+
l, err := fileutil.NewLock(walPath)
368+
if err != nil {
369+
continue
370+
}
371+
err = l.TryLock()
372+
if err != nil {
373+
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
374+
}
375+
err = os.Remove(walPath)
376+
l.Unlock()
377+
l.Destroy()
378+
if err != nil {
379+
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
380+
}
381+
}
382+
383+
return nil
296384
}
297385

298386
func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {

manager/state/raft/storage_test.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55
"io/ioutil"
66
"path/filepath"
77
"testing"
8+
"time"
89

910
"github.com/docker/swarmkit/api"
1011
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
12+
"github.com/docker/swarmkit/manager/state/store"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
15+
"golang.org/x/net/context"
1316
)
1417

1518
func TestRaftSnapshot(t *testing.T) {
@@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) {
243246
require.NoError(t, err)
244247
raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values)
245248
}
249+
250+
func TestGCWAL(t *testing.T) {
251+
if testing.Short() {
252+
t.Skip("TestGCWAL skipped with -short because it's resource-intensive")
253+
}
254+
t.Parallel()
255+
256+
// Additional log entries from cluster setup, leader election
257+
extraLogEntries := 5
258+
// Number of large entries to propose
259+
proposals := 47
260+
261+
// Bring up a 3 node cluster
262+
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
263+
264+
for i := 0; i != proposals; i++ {
265+
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
266+
assert.NoError(t, err, "failed to propose value")
267+
}
268+
269+
time.Sleep(250 * time.Millisecond)
270+
271+
// Snapshot should have been triggered just before the WAL rotated, so
272+
// both WAL files should be preserved
273+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
274+
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
275+
if err != nil {
276+
return err
277+
}
278+
if len(dirents) != 1 {
279+
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
280+
}
281+
282+
dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
283+
if err != nil {
284+
return err
285+
}
286+
if len(dirents) != 2 {
287+
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
288+
}
289+
return nil
290+
}))
291+
292+
raftutils.TeardownCluster(t, nodes)
293+
294+
// Repeat this test, but trigger the snapshot after the WAL has rotated
295+
proposals++
296+
nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
297+
defer raftutils.TeardownCluster(t, nodes)
298+
299+
for i := 0; i != proposals; i++ {
300+
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
301+
assert.NoError(t, err, "failed to propose value")
302+
}
303+
304+
time.Sleep(250 * time.Millisecond)
305+
306+
// This time only one WAL file should be saved.
307+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
308+
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
309+
if err != nil {
310+
return err
311+
}
312+
if len(dirents) != 1 {
313+
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
314+
}
315+
316+
dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
317+
if err != nil {
318+
return err
319+
}
320+
if len(dirents) != 1 {
321+
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
322+
}
323+
return nil
324+
}))
325+
326+
// Restart the whole cluster
327+
for _, node := range nodes {
328+
node.Server.Stop()
329+
node.Shutdown()
330+
}
331+
332+
raftutils.AdvanceTicks(clockSource, 5)
333+
334+
i := 0
335+
for k, node := range nodes {
336+
nodes[k] = raftutils.RestartNode(t, clockSource, node, false)
337+
i++
338+
}
339+
raftutils.WaitForCluster(t, clockSource, nodes)
340+
341+
// Is the data intact after restart?
342+
for _, node := range nodes {
343+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
344+
var err error
345+
node.MemoryStore().View(func(tx store.ReadTx) {
346+
var allNodes []*api.Node
347+
allNodes, err = store.FindNodes(tx, store.All)
348+
if err != nil {
349+
return
350+
}
351+
if len(allNodes) != proposals {
352+
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
353+
return
354+
}
355+
})
356+
return err
357+
}))
358+
}
359+
360+
// It should still be possible to propose values
361+
_, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode")
362+
assert.NoError(t, err, "failed to propose value")
363+
364+
for _, node := range nodes {
365+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
366+
var err error
367+
node.MemoryStore().View(func(tx store.ReadTx) {
368+
var allNodes []*api.Node
369+
allNodes, err = store.FindNodes(tx, store.All)
370+
if err != nil {
371+
return
372+
}
373+
if len(allNodes) != proposals+1 {
374+
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
375+
return
376+
}
377+
})
378+
return err
379+
}))
380+
}
381+
}
382+
383+
// proposeHugeValue proposes a 1.4MB value to a raft test cluster
384+
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
385+
nodeIDStr := "id1"
386+
if len(nodeID) != 0 {
387+
nodeIDStr = nodeID[0]
388+
}
389+
a := make([]byte, 1400000)
390+
for i := 0; i != len(a); i++ {
391+
a[i] = 'a'
392+
}
393+
node := &api.Node{
394+
ID: nodeIDStr,
395+
Spec: api.NodeSpec{
396+
Annotations: api.Annotations{
397+
Name: nodeIDStr,
398+
Labels: map[string]string{
399+
"largestring": string(a),
400+
},
401+
},
402+
},
403+
}
404+
405+
storeActions := []*api.StoreAction{
406+
{
407+
Action: api.StoreActionKindCreate,
408+
Target: &api.StoreAction_Node{
409+
Node: node,
410+
},
411+
},
412+
}
413+
414+
ctx, _ := context.WithTimeout(context.Background(), time)
415+
416+
err := raftNode.ProposeValue(ctx, storeActions, func() {
417+
err := raftNode.MemoryStore().ApplyStoreActions(storeActions)
418+
assert.NoError(t, err, "error applying actions")
419+
})
420+
if err != nil {
421+
return nil, err
422+
}
423+
424+
return node, nil
425+
}

0 commit comments

Comments
 (0)