Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/modules/components/pages/outputs/aws_s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,16 @@ The maximum period to wait on an upload before abandoning it and reattempting.

=== `object_canned_acl`

The object canned ACL value.
The object canned ACL value. Leave empty to omit the ACL from upload requests, which is required for buckets that have ACLs disabled (the AWS default since 2023).


*Type*: `string`

*Default*: `""`

Options:
`private`
``
, `private`
, `public-read`
, `public-read-write`
, `authenticated-read`
Expand Down
128 changes: 127 additions & 1 deletion internal/impl/aws/s3/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 Redpanda Data, Inc.
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,19 @@ package s3

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service/integration"

_ "github.com/redpanda-data/benthos/v4/public/components/pure"
Expand Down Expand Up @@ -265,4 +274,121 @@ cache_resources:
}),
)
})

t.Run("object_canned_acl_round_trip", func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
defer cancel()

bucket := "acl-roundtrip-bucket"
require.NoError(t, awstest.CreateBucket(ctx, lsPort, bucket))

yaml := fmt.Sprintf(`
output:
aws_s3:
bucket: %s
endpoint: http://localhost:%s
force_path_style_urls: true
region: eu-west-1
path: ${!counter()}.txt
object_canned_acl: bucket-owner-full-control
credentials:
id: xxxxx
secret: xxxxx
token: xxxxx
`, bucket, lsPort)

builder := service.NewStreamBuilder()
require.NoError(t, builder.SetYAML(yaml))

producer, err := builder.AddProducerFunc()
require.NoError(t, err)

stream, err := builder.Build()
require.NoError(t, err, "stream build must succeed when object_canned_acl is a valid value")

runErr := make(chan error, 1)
runCtx, runCancel := context.WithCancel(ctx)
defer runCancel()
go func() { runErr <- stream.Run(runCtx) }()

require.NoError(t, producer(ctx, service.NewMessage([]byte("hello acl"))))

require.NoError(t, stream.StopWithin(10*time.Second))
select {
case err := <-runErr:
if err != nil && !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}
case <-time.After(10 * time.Second):
t.Fatal("stream did not exit after StopWithin returned")
}

// Verify the object exists, then attempt to read its ACL. LocalStack
// Free does not always reflect the canned ACL value through GetObjectAcl,
// so the ACL check is best-effort and only fails if LocalStack returns
// an explicit error other than "not implemented" style noise. The
// authoritative end-to-end coverage is that the stream built and uploaded
// successfully with the ACL field set — the regression in PR #4413
// blocked exactly that.
client := newLocalStackS3Client(t, lsPort)
_, err = client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String("1.txt"),
})
require.NoError(t, err, "object should have been uploaded")

aclOut, aclErr := client.GetObjectAcl(ctx, &s3.GetObjectAclInput{
Bucket: aws.String(bucket),
Key: aws.String("1.txt"),
})
if aclErr != nil {
t.Logf("GetObjectAcl returned an error (LocalStack ACL support is partial): %v", aclErr)
} else {
t.Logf("GetObjectAcl returned %d grants", len(aclOut.Grants))
}
})

t.Run("object_canned_acl_invalid_rejected", func(t *testing.T) {
yaml := fmt.Sprintf(`
output:
aws_s3:
bucket: irrelevant
endpoint: http://localhost:%s
force_path_style_urls: true
region: eu-west-1
path: ${!counter()}.txt
object_canned_acl: not-a-real-acl
credentials:
id: xxxxx
secret: xxxxx
token: xxxxx
`, lsPort)

builder := service.NewStreamBuilder()
setErr := builder.SetYAML(yaml)
if setErr != nil {
assert.Contains(t, setErr.Error(), "not-a-real-acl")
return
}

_, buildErr := builder.Build()
require.Error(t, buildErr, "stream build must reject an invalid object_canned_acl value")
assert.Contains(t, buildErr.Error(), "not-a-real-acl")
})
}

// newLocalStackS3Client builds an S3 client pointed at the LocalStack instance
// on the supplied port, with path-style URLs and dummy credentials.
func newLocalStackS3Client(t *testing.T, port string) *s3.Client {
t.Helper()
endpoint := fmt.Sprintf("http://localhost:%s", port)
cfg, err := awsconfig.LoadDefaultConfig(t.Context(),
awsconfig.WithRegion("eu-west-1"),
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")),
)
require.NoError(t, err)
cfg.BaseEndpoint = &endpoint
return s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = true
})
}
12 changes: 9 additions & 3 deletions internal/impl/aws/s3/output.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 Redpanda Data, Inc.
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -155,7 +155,7 @@ func s3oConfigFromParsed(pConf *service.ParsedConfig) (conf s3oConfig, err error
return
}

if slices.Contains(types.ObjectCannedACL("").Values(), types.ObjectCannedACL(objectCannedACL)) {
if objectCannedACL == "" || slices.Contains(types.ObjectCannedACL("").Values(), types.ObjectCannedACL(objectCannedACL)) {
conf.ObjectCannedACL = types.ObjectCannedACL(objectCannedACL)
} else {
err = fmt.Errorf("invalid object canned ACL value: %v", objectCannedACL)
Expand Down Expand Up @@ -306,13 +306,19 @@ output:
Default("5s"),
service.NewStringEnumField(s3oFieldObjectCannedACL,
slices.Collect(func(yield func(string) bool) {
// Empty string means "do not set an ACL on the upload",
// which is the default since buckets created after 2023
// have ACLs disabled by default.
if !yield("") {
return
}
for _, v := range types.ObjectCannedACL("").Values() {
if !yield(string(v)) {
return
}
}
})...).
Description("The object canned ACL value.").
Description("The object canned ACL value. Leave empty to omit the ACL from upload requests, which is required for buckets that have ACLs disabled (the AWS default since 2023).").
Default("").
Advanced(),
service.NewBatchPolicyField(s3oFieldBatching),
Expand Down
146 changes: 146 additions & 0 deletions internal/impl/aws/s3/output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3

import (
"fmt"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"
)

// TestObjectCannedACLConfigParsing exercises the full parse path for the
// object_canned_acl field: schema lint plus the runtime validator in
// s3oConfigFromParsed. It guards the regression fixed in PR #4413 where the
// default value "" was rejected by the validator.
func TestObjectCannedACLConfigParsing(t *testing.T) {
const baseConfig = `
bucket: foo
region: eu-west-1
credentials:
id: xxxxx
secret: xxxxx
`

tests := []struct {
name string
aclYAML string
expectedACL types.ObjectCannedACL
}{
{
name: "field omitted uses empty default",
aclYAML: "",
expectedACL: types.ObjectCannedACL(""),
},
{
name: "explicit empty string",
aclYAML: `object_canned_acl: ""`,
expectedACL: types.ObjectCannedACL(""),
},
{
name: "valid canned ACL bucket-owner-full-control",
aclYAML: `object_canned_acl: bucket-owner-full-control`,
expectedACL: types.ObjectCannedACLBucketOwnerFullControl,
},
{
name: "valid canned ACL private",
aclYAML: `object_canned_acl: private`,
expectedACL: types.ObjectCannedACLPrivate,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
yaml := baseConfig + tt.aclYAML + "\n"

parsed, err := s3oOutputSpec().ParseYAML(yaml, nil)
require.NoError(t, err)

conf, err := s3oConfigFromParsed(parsed)
require.NoError(t, err)
assert.Equal(t, tt.expectedACL, conf.ObjectCannedACL)
})
}
}

// TestObjectCannedACLLinting verifies that the schema's enum linter accepts
// the empty string as well as valid SDK canned ACL values, and rejects bogus
// ones. This locks in the agreement between the schema and the runtime
// validator after PR #4413.
func TestObjectCannedACLLinting(t *testing.T) {
const baseConfig = `
aws_s3:
bucket: foo
region: eu-west-1
credentials:
id: xxxxx
secret: xxxxx
`

tests := []struct {
name string
aclYAML string
lintContains string // empty means: expect no lint errors
}{
{
name: "field omitted",
aclYAML: "",
},
{
name: "explicit empty string",
aclYAML: ` object_canned_acl: ""`,
},
{
name: "valid canned ACL bucket-owner-full-control",
aclYAML: ` object_canned_acl: bucket-owner-full-control`,
},
{
name: "valid canned ACL private",
aclYAML: ` object_canned_acl: private`,
},
{
name: "invalid canned ACL value is rejected",
aclYAML: ` object_canned_acl: not-a-real-acl`,
lintContains: "not-a-real-acl",
},
}

linter := service.GlobalEnvironment().NewComponentConfigLinter()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
yaml := baseConfig + tt.aclYAML + "\n"
lints, err := linter.LintOutputYAML([]byte(yaml))
require.NoError(t, err)

if tt.lintContains == "" {
assert.Empty(t, lints, "expected no lint errors, got: %v", lints)
return
}

require.NotEmpty(t, lints, "expected a lint error containing %q", tt.lintContains)
var combined strings.Builder
for _, l := range lints {
fmt.Fprintf(&combined, "%v\n", l)
}
assert.Contains(t, combined.String(), tt.lintContains)
})
}
}