Skip to content

Commit 9e5b141

Browse files
authored
fix: fix potential deadlock in oras.Copy (#359)
1. Refactored `copyGraph` and internal graph utilities 2. Added a test case that could cause dead locks Fixes #349 Signed-off-by: Lixia (Sylvia) Lei <lixlei@microsoft.com>
1 parent 7ee86b9 commit 9e5b141

6 files changed

Lines changed: 255 additions & 257 deletions

File tree

copy.go

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import (
2626
"oras.land/oras-go/v2/content"
2727
"oras.land/oras-go/v2/errdef"
2828
"oras.land/oras-go/v2/internal/cas"
29-
"oras.land/oras-go/v2/internal/graph"
3029
"oras.land/oras-go/v2/internal/platform"
3130
"oras.land/oras-go/v2/internal/registryutil"
3231
"oras.land/oras-go/v2/internal/status"
32+
"oras.land/oras-go/v2/internal/syncutil"
3333
"oras.land/oras-go/v2/registry"
3434
)
3535

@@ -45,6 +45,9 @@ var (
4545
DefaultCopyGraphOptions CopyGraphOptions
4646
)
4747

48+
// errSkipDesc signals copyNode() to stop processing a descriptor.
49+
var errSkipDesc = errors.New("skip descriptor")
50+
4851
// CopyOptions contains parameters for oras.Copy.
4952
type CopyOptions struct {
5053
CopyGraphOptions
@@ -168,86 +171,88 @@ func CopyGraph(ctx context.Context, src content.ReadOnlyStorage, dst content.Sto
168171
// copyGraph copies a rooted directed acyclic graph (DAG) from the source CAS to
169172
// the destination CAS with specified caching.
170173
func copyGraph(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, proxy *cas.Proxy, root ocispec.Descriptor, opts CopyGraphOptions) error {
171-
// track content status
172-
tracker := status.NewTracker()
173-
174174
// if FindSuccessors is not provided, use the default one
175175
if opts.FindSuccessors == nil {
176176
opts.FindSuccessors = content.Successors
177177
}
178+
// if Concurrency is not set or invalid, use the default concurrency
179+
if opts.Concurrency <= 0 {
180+
opts.Concurrency = defaultConcurrency
181+
}
182+
limiter := semaphore.NewWeighted(opts.Concurrency)
183+
// track content status
184+
tracker := status.NewTracker()
178185

179-
// prepare pre-handler
180-
preHandler := graph.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
186+
// traverse the graph
187+
var fn syncutil.GoFunc[ocispec.Descriptor]
188+
fn = func(ctx context.Context, region *syncutil.LimitedRegion, desc ocispec.Descriptor) (err error) {
181189
// skip the descriptor if other go routine is working on it
182190
done, committed := tracker.TryCommit(desc)
183191
if !committed {
184-
return nil, graph.ErrSkipDesc
192+
return nil
185193
}
194+
defer func() {
195+
if err == nil {
196+
// mark the content as done on success
197+
close(done)
198+
}
199+
}()
186200

187201
// skip if a rooted sub-DAG exists
188202
exists, err := dst.Exists(ctx, desc)
189203
if err != nil {
190-
return nil, err
204+
return err
191205
}
192206
if exists {
193-
// mark the content as done
194-
close(done)
195207
if opts.OnCopySkipped != nil {
196208
if err := opts.OnCopySkipped(ctx, desc); err != nil {
197-
return nil, err
209+
return err
198210
}
199211
}
200-
return nil, graph.ErrSkipDesc
212+
return nil
201213
}
202214

203215
// find successors while non-leaf nodes will be fetched and cached
204-
return opts.FindSuccessors(ctx, proxy, desc)
205-
})
206-
207-
// prepare post-handler
208-
postHandler := graph.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (_ []ocispec.Descriptor, err error) {
209-
defer func() {
210-
if err == nil {
211-
// mark the content as done on success
212-
done, _ := tracker.TryCommit(desc)
213-
close(done)
214-
}
215-
}()
216-
217-
// leaf nodes does not exist in the cache.
218-
// copy them directly.
219-
exists, err := proxy.Cache.Exists(ctx, desc)
216+
successors, err := opts.FindSuccessors(ctx, proxy, desc)
220217
if err != nil {
221-
return nil, err
218+
return err
222219
}
223-
if !exists {
224-
return nil, copyNode(ctx, src, dst, desc, opts)
220+
221+
// handle leaf nodes
222+
if len(successors) == 0 {
223+
exists, err = proxy.Cache.Exists(ctx, desc)
224+
if err != nil {
225+
return err
226+
}
227+
if exists {
228+
return copyNode(ctx, proxy.Cache, dst, desc, opts)
229+
}
230+
return copyNode(ctx, src, dst, desc, opts)
225231
}
226232

227-
// for non-leaf nodes, wait for its successors to complete
228-
successors, err := opts.FindSuccessors(ctx, proxy, desc)
229-
if err != nil {
230-
return nil, err
233+
// for non-leaf nodes, process successors and wait for them to complete
234+
region.End()
235+
if err := syncutil.Go(ctx, limiter, fn, successors...); err != nil {
236+
return err
231237
}
232238
for _, node := range successors {
233239
done, committed := tracker.TryCommit(node)
234240
if committed {
235-
return nil, fmt.Errorf("%s: %s: successor not committed", desc.Digest, node.Digest)
241+
return fmt.Errorf("%s: %s: successor not committed", desc.Digest, node.Digest)
236242
}
237243
select {
238244
case <-done:
239245
case <-ctx.Done():
240-
return nil, ctx.Err()
246+
return ctx.Err()
241247
}
242248
}
243-
return nil, copyNode(ctx, proxy.Cache, dst, desc, opts)
244-
})
245-
246-
if opts.Concurrency <= 0 {
247-
opts.Concurrency = defaultConcurrency
249+
if err := region.Start(); err != nil {
250+
return err
251+
}
252+
return copyNode(ctx, proxy.Cache, dst, desc, opts)
248253
}
249-
// traverse the graph
250-
return graph.Dispatch(ctx, preHandler, postHandler, semaphore.NewWeighted(opts.Concurrency), root)
254+
255+
return syncutil.Go(ctx, limiter, fn, root)
251256
}
252257

253258
// doCopyNode copies a single content from the source CAS to the destination CAS.
@@ -269,7 +274,7 @@ func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.St
269274
func copyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor, opts CopyGraphOptions) error {
270275
if opts.PreCopy != nil {
271276
if err := opts.PreCopy(ctx, desc); err != nil {
272-
if err == graph.ErrSkipDesc {
277+
if err == errSkipDesc {
273278
return nil
274279
}
275280
return err
@@ -361,7 +366,7 @@ func prepareCopy(ctx context.Context, dst Target, dstRef string, proxy *cas.Prox
361366
}
362367
}
363368
// skip the regular copy workflow
364-
return graph.ErrSkipDesc
369+
return errSkipDesc
365370
}
366371
} else {
367372
postCopy := opts.PostCopy

copy_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,3 +1413,101 @@ func TestCopyGraph_WithOptions(t *testing.T) {
14131413
t.Fatalf("CopyGraph() error = %v, wantErr %v", err, errdef.ErrSizeExceedsLimit)
14141414
}
14151415
}
1416+
1417+
func TestCopyGraph_WithConcurrencyLimit(t *testing.T) {
1418+
src := cas.NewMemory()
1419+
// generate test content
1420+
var blobs [][]byte
1421+
var descs []ocispec.Descriptor
1422+
appendBlob := func(mediaType string, blob []byte) {
1423+
blobs = append(blobs, blob)
1424+
descs = append(descs, ocispec.Descriptor{
1425+
MediaType: mediaType,
1426+
Digest: digest.FromBytes(blob),
1427+
Size: int64(len(blob)),
1428+
})
1429+
}
1430+
generateManifest := func(config ocispec.Descriptor, layers ...ocispec.Descriptor) {
1431+
manifest := ocispec.Manifest{
1432+
MediaType: ocispec.MediaTypeImageManifest,
1433+
Config: config,
1434+
Layers: layers,
1435+
}
1436+
manifestJSON, err := json.Marshal(manifest)
1437+
if err != nil {
1438+
t.Fatal(err)
1439+
}
1440+
appendBlob(manifest.MediaType, manifestJSON)
1441+
}
1442+
generateArtifact := func(subject *ocispec.Descriptor, artifactType string, blobs ...ocispec.Descriptor) {
1443+
manifest := ocispec.Artifact{
1444+
MediaType: ocispec.MediaTypeArtifactManifest,
1445+
Subject: subject,
1446+
Blobs: blobs,
1447+
ArtifactType: artifactType,
1448+
}
1449+
manifestJSON, err := json.Marshal(manifest)
1450+
if err != nil {
1451+
t.Fatal(err)
1452+
}
1453+
appendBlob(manifest.MediaType, manifestJSON)
1454+
}
1455+
generateIndex := func(manifests ...ocispec.Descriptor) {
1456+
index := ocispec.Index{
1457+
MediaType: ocispec.MediaTypeImageIndex,
1458+
Manifests: manifests,
1459+
}
1460+
indexJSON, err := json.Marshal(index)
1461+
if err != nil {
1462+
t.Fatal(err)
1463+
}
1464+
appendBlob(index.MediaType, indexJSON)
1465+
}
1466+
1467+
appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
1468+
appendBlob(ocispec.MediaTypeImageLayer, []byte("foo")) // Blob 1
1469+
appendBlob(ocispec.MediaTypeImageLayer, []byte("bar")) // Blob 2
1470+
generateManifest(descs[0], descs[1:3]...) // Blob 3
1471+
generateArtifact(&descs[3], "artifact.1") // Blob 4
1472+
generateArtifact(&descs[3], "artifact.2") // Blob 5
1473+
generateArtifact(&descs[3], "artifact.3") // Blob 6
1474+
generateArtifact(&descs[3], "artifact.4") // Blob 7
1475+
generateIndex(descs[3:8]...) // Blob 8
1476+
1477+
ctx := context.Background()
1478+
for i := range blobs {
1479+
err := src.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
1480+
if err != nil {
1481+
t.Fatalf("failed to push test content to src: %d: %v", i, err)
1482+
}
1483+
}
1484+
1485+
// test different concurrency limit
1486+
root := descs[len(descs)-1]
1487+
directSuccessorsNum := 5
1488+
opts := oras.DefaultCopyGraphOptions
1489+
for i := 1; i <= directSuccessorsNum; i++ {
1490+
dst := cas.NewMemory()
1491+
opts.Concurrency = int64(i)
1492+
if err := oras.CopyGraph(ctx, src, dst, root, opts); err != nil {
1493+
t.Fatalf("CopyGraph(concurrency: %d) error = %v, wantErr %v", i, err, false)
1494+
}
1495+
1496+
// verify contents
1497+
contents := dst.Map()
1498+
if got, want := len(contents), len(blobs); got != want {
1499+
t.Errorf("len(dst) = %v, wantErr %v", got, want)
1500+
}
1501+
for i := range blobs {
1502+
got, err := content.FetchAll(ctx, dst, descs[i])
1503+
if err != nil {
1504+
t.Errorf("content[%d] error = %v, wantErr %v", i, err, false)
1505+
continue
1506+
}
1507+
if want := blobs[i]; !bytes.Equal(got, want) {
1508+
t.Errorf("content[%d] = %v, want %v", i, got, want)
1509+
}
1510+
}
1511+
}
1512+
1513+
}

internal/graph/graph.go

Lines changed: 0 additions & 112 deletions
This file was deleted.

0 commit comments

Comments
 (0)