Skip to content

Commit 27848a5

Browse files
author
Aaron Lehmann
committed
raft: Garbage collect WAL files
We currently garbage collect snapshot files (keeping only KeepOldSnapshot outdated snapshots, which defaults to 0). However, we don't garbage collect the WAL files that the snapshots replace. Delete any WALs which are so old that they only contain information that predates the oldest of the snapshots we have retained. This means that by default, we will remove old WALs once they are supplanted by a snapshot. However, if KeepOldSnapshots is set above 0, we will keep whichever WALs are necessary to catch up from the oldest of the retained snapshots. This makes sure that the old snapshots we retain are actually useful, and avoids adding an independent knob for WAL retention that might end up with an inconsistent setting. Also, fix serious brokenness in the the deletion of old snapshots (it was deleting the most recent outdated snapshots, instead of the oldest). Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
1 parent e021d14 commit 27848a5

File tree

2 files changed

+277
-15
lines changed

2 files changed

+277
-15
lines changed

manager/state/raft/storage.go

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sort"
1111
"strings"
1212

13+
"github.com/coreos/etcd/pkg/fileutil"
1314
"github.com/coreos/etcd/raft"
1415
"github.com/coreos/etcd/raft/raftpb"
1516
"github.com/coreos/etcd/snap"
@@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
8081
}
8182
n.wal, err = wal.Create(n.walDir(), metadata)
8283
if err != nil {
83-
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
84+
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
8485
}
8586

8687
n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
@@ -127,15 +128,15 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
127128
repaired := false
128129
for {
129130
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
130-
return fmt.Errorf("open wal error: %v", err)
131+
return fmt.Errorf("open WAL error: %v", err)
131132
}
132133
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
133134
if err := n.wal.Close(); err != nil {
134135
return err
135136
}
136137
// we can only repair ErrUnexpectedEOF and we never repair twice.
137138
if repaired || err != io.ErrUnexpectedEOF {
138-
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
139+
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
139140
}
140141
if !wal.Repair(n.walDir()) {
141142
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
@@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
157158

158159
var raftNode api.RaftMember
159160
if err := raftNode.Unmarshal(metadata); err != nil {
160-
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
161+
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
161162
}
162163
n.Config.ID = raftNode.RaftID
163164

@@ -274,25 +275,106 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
274275
// This means that if the current snapshot doesn't appear in the
275276
// directory for some strange reason, we won't delete anything, which
276277
// is the safe behavior.
278+
curSnapshotIdx := -1
277279
var (
278-
afterCurSnapshot bool
279-
removeErr error
280+
removeErr error
281+
remainingSnapshots []string
280282
)
283+
281284
for i, snapFile := range snapshots {
282-
if afterCurSnapshot {
283-
if uint64(len(snapshots)-i) <= keepOldSnapshots {
284-
return removeErr
285-
}
286-
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
287-
if err != nil && removeErr == nil {
288-
removeErr = err
285+
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
286+
if uint64(i-curSnapshotIdx) > keepOldSnapshots {
287+
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
288+
if err != nil && removeErr == nil {
289+
removeErr = err
290+
}
291+
continue
289292
}
290293
} else if snapFile == curSnapshot {
291-
afterCurSnapshot = true
294+
curSnapshotIdx = i
295+
}
296+
remainingSnapshots = append(remainingSnapshots, snapFile)
297+
}
298+
299+
if removeErr != nil {
300+
return removeErr
301+
}
302+
303+
// Remove any WAL files that only contain data from before the oldest
304+
// remaining snapshot.
305+
306+
if len(remainingSnapshots) == 0 {
307+
return nil
308+
}
309+
310+
// Sort snapshot filenames in lexical order
311+
sort.Sort(sort.StringSlice(remainingSnapshots))
312+
313+
// Parse index out of snapshot filename
314+
var snapTerm, snapIndex uint64
315+
_, err = fmt.Sscanf(remainingSnapshots[0], "%016x-%016x.snap", &snapTerm, &snapIndex)
316+
if err != nil {
317+
return fmt.Errorf("malformed snapshot filename %s: %v", remainingSnapshots[0], err)
318+
}
319+
320+
// List the WALs
321+
dirents, err = ioutil.ReadDir(n.walDir())
322+
if err != nil {
323+
return err
324+
}
325+
326+
var wals []string
327+
for _, dirent := range dirents {
328+
if strings.HasSuffix(dirent.Name(), ".wal") {
329+
wals = append(wals, dirent.Name())
292330
}
293331
}
294332

295-
return removeErr
333+
// Sort WAL filenames in lexical order
334+
sort.Sort(sort.StringSlice(wals))
335+
336+
found := false
337+
deleteUntil := -1
338+
339+
for i, walName := range wals {
340+
var walSeq, walIndex uint64
341+
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
342+
if err != nil {
343+
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
344+
}
345+
346+
if walIndex >= snapIndex {
347+
deleteUntil = i - 1
348+
found = true
349+
break
350+
}
351+
}
352+
353+
// If all WAL files started with indices below the oldest snapshot's
354+
// index, we can delete all but the newest WAL file.
355+
if !found && len(wals) != 0 {
356+
deleteUntil = len(wals) - 1
357+
}
358+
359+
for i := 0; i < deleteUntil; i++ {
360+
walPath := filepath.Join(n.walDir(), wals[i])
361+
l, err := fileutil.NewLock(walPath)
362+
if err != nil {
363+
continue
364+
}
365+
err = l.TryLock()
366+
if err != nil {
367+
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
368+
}
369+
err = os.Remove(walPath)
370+
l.Unlock()
371+
l.Destroy()
372+
if err != nil {
373+
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
374+
}
375+
}
376+
377+
return nil
296378
}
297379

298380
func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {

manager/state/raft/storage_test.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55
"io/ioutil"
66
"path/filepath"
77
"testing"
8+
"time"
89

910
"github.com/docker/swarmkit/api"
1011
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
12+
"github.com/docker/swarmkit/manager/state/store"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
15+
"golang.org/x/net/context"
1316
)
1417

1518
func TestRaftSnapshot(t *testing.T) {
@@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) {
243246
require.NoError(t, err)
244247
raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values)
245248
}
249+
250+
func TestGCWAL(t *testing.T) {
251+
if testing.Short() {
252+
t.Skip("TestGCWAL skipped with -short because it's resource-intensive")
253+
}
254+
t.Parallel()
255+
256+
// Additional log entries from cluster setup, leader election
257+
extraLogEntries := 5
258+
// Number of large entries to propose
259+
proposals := 47
260+
261+
// Bring up a 3 node cluster
262+
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
263+
264+
for i := 0; i != proposals; i++ {
265+
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
266+
assert.NoError(t, err, "failed to propose value")
267+
}
268+
269+
time.Sleep(250 * time.Millisecond)
270+
271+
// Snapshot should have been triggered just before the WAL rotated, so
272+
// both WAL files should be preserved
273+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
274+
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
275+
if err != nil {
276+
return err
277+
}
278+
if len(dirents) != 1 {
279+
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
280+
}
281+
282+
dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
283+
if err != nil {
284+
return err
285+
}
286+
if len(dirents) != 2 {
287+
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
288+
}
289+
return nil
290+
}))
291+
292+
raftutils.TeardownCluster(t, nodes)
293+
294+
// Repeat this test, but trigger the snapshot after the WAL has rotated
295+
proposals++
296+
nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
297+
defer raftutils.TeardownCluster(t, nodes)
298+
299+
for i := 0; i != proposals; i++ {
300+
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
301+
assert.NoError(t, err, "failed to propose value")
302+
}
303+
304+
time.Sleep(250 * time.Millisecond)
305+
306+
// This time only one WAL file should be saved.
307+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
308+
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
309+
if err != nil {
310+
return err
311+
}
312+
if len(dirents) != 1 {
313+
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
314+
}
315+
316+
dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
317+
if err != nil {
318+
return err
319+
}
320+
if len(dirents) != 1 {
321+
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
322+
}
323+
return nil
324+
}))
325+
326+
// Restart the whole cluster
327+
for _, node := range nodes {
328+
node.Server.Stop()
329+
node.Shutdown()
330+
}
331+
332+
raftutils.AdvanceTicks(clockSource, 5)
333+
334+
i := 0
335+
for k, node := range nodes {
336+
nodes[k] = raftutils.RestartNode(t, clockSource, node, false)
337+
i++
338+
}
339+
raftutils.WaitForCluster(t, clockSource, nodes)
340+
341+
// Is the data intact after restart?
342+
for _, node := range nodes {
343+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
344+
var err error
345+
node.MemoryStore().View(func(tx store.ReadTx) {
346+
var allNodes []*api.Node
347+
allNodes, err = store.FindNodes(tx, store.All)
348+
if err != nil {
349+
return
350+
}
351+
if len(allNodes) != proposals {
352+
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
353+
return
354+
}
355+
})
356+
return err
357+
}))
358+
}
359+
360+
// It should still be possible to propose values
361+
_, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode")
362+
assert.NoError(t, err, "failed to propose value")
363+
364+
for _, node := range nodes {
365+
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
366+
var err error
367+
node.MemoryStore().View(func(tx store.ReadTx) {
368+
var allNodes []*api.Node
369+
allNodes, err = store.FindNodes(tx, store.All)
370+
if err != nil {
371+
return
372+
}
373+
if len(allNodes) != proposals+1 {
374+
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
375+
return
376+
}
377+
})
378+
return err
379+
}))
380+
}
381+
}
382+
383+
// proposeHugeValue proposes a 1.4MB value to a raft test cluster
384+
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
385+
nodeIDStr := "id1"
386+
if len(nodeID) != 0 {
387+
nodeIDStr = nodeID[0]
388+
}
389+
a := make([]byte, 1400000)
390+
for i := 0; i != len(a); i++ {
391+
a[i] = 'a'
392+
}
393+
node := &api.Node{
394+
ID: nodeIDStr,
395+
Spec: api.NodeSpec{
396+
Annotations: api.Annotations{
397+
Name: nodeIDStr,
398+
Labels: map[string]string{
399+
"largestring": string(a),
400+
},
401+
},
402+
},
403+
}
404+
405+
storeActions := []*api.StoreAction{
406+
{
407+
Action: api.StoreActionKindCreate,
408+
Target: &api.StoreAction_Node{
409+
Node: node,
410+
},
411+
},
412+
}
413+
414+
ctx, _ := context.WithTimeout(context.Background(), time)
415+
416+
err := raftNode.ProposeValue(ctx, storeActions, func() {
417+
err := raftNode.MemoryStore().ApplyStoreActions(storeActions)
418+
assert.NoError(t, err, "error applying actions")
419+
})
420+
if err != nil {
421+
return nil, err
422+
}
423+
424+
return node, nil
425+
}

0 commit comments

Comments
 (0)