Skip to content

Add read only file plugin #1128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: add-auxiliary-command-server-to-agent-config
Choose a base branch
from
580 changes: 73 additions & 507 deletions internal/file/file_manager_service.go

Large diffs are not rendered by default.

141 changes: 0 additions & 141 deletions internal/file/file_manager_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"

"github.com/nginx/agent/v3/internal/model"
Expand All @@ -28,146 +27,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestFileManagerService_UpdateOverview(t *testing.T) {
ctx := context.Background()

filePath := filepath.Join(t.TempDir(), "nginx.conf")
fileMeta := protos.FileMeta(filePath, "")

fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}")
fileHash := files.GenerateHash(fileContent)

fileWriteErr := os.WriteFile(filePath, fileContent, 0o600)
require.NoError(t, fileWriteErr)

overview := protos.FileOverview(filePath, fileHash)

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
fakeFileServiceClient.UpdateOverviewReturnsOnCall(0, &mpi.UpdateOverviewResponse{
Overview: overview,
}, nil)

fakeFileServiceClient.UpdateOverviewReturnsOnCall(1, &mpi.UpdateOverviewResponse{}, nil)

fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil)

fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateOverview(ctx, "123", []*mpi.File{
{
FileMeta: fileMeta,
},
}, 0)

require.NoError(t, err)
assert.Equal(t, 2, fakeFileServiceClient.UpdateOverviewCallCount())
}

func TestFileManagerService_UpdateOverview_MaxIterations(t *testing.T) {
ctx := context.Background()

filePath := filepath.Join(t.TempDir(), "nginx.conf")
fileMeta := protos.FileMeta(filePath, "")

fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}")
fileHash := files.GenerateHash(fileContent)

fileWriteErr := os.WriteFile(filePath, fileContent, 0o600)
require.NoError(t, fileWriteErr)

overview := protos.FileOverview(filePath, fileHash)

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}

// do 5 iterations
for i := 0; i <= 5; i++ {
fakeFileServiceClient.UpdateOverviewReturnsOnCall(i, &mpi.UpdateOverviewResponse{
Overview: overview,
}, nil)
}

fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil)

fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateOverview(ctx, "123", []*mpi.File{
{
FileMeta: fileMeta,
},
}, 0)

require.Error(t, err)
assert.Equal(t, "too many UpdateOverview attempts", err.Error())
}

func TestFileManagerService_UpdateFile(t *testing.T) {
tests := []struct {
name string
isCert bool
}{
{
name: "non-cert",
isCert: false,
},
{
name: "cert",
isCert: true,
},
}

tempDir := os.TempDir()

for _, test := range tests {
ctx := context.Background()

testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf")

var fileMeta *mpi.FileMeta
if test.isCert {
fileMeta = protos.CertMeta(testFile.Name(), "")
} else {
fileMeta = protos.FileMeta(testFile.Name(), "")
}

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta})

require.NoError(t, err)
assert.Equal(t, 1, fakeFileServiceClient.UpdateFileCallCount())

helpers.RemoveFileWithErrorCheck(t, testFile.Name())
}
}

func TestFileManagerService_UpdateFile_LargeFile(t *testing.T) {
ctx := context.Background()
tempDir := os.TempDir()

testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf")
writeFileError := os.WriteFile(testFile.Name(), []byte("#test content"), 0o600)
require.NoError(t, writeFileError)
fileMeta := protos.FileMetaLargeFile(testFile.Name(), "")

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
fakeClientStreamingClient := &FakeClientStreamingClient{sendCount: atomic.Int32{}}
fakeFileServiceClient.UpdateFileStreamReturns(fakeClientStreamingClient, nil)
fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta})

require.NoError(t, err)
assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount())
assert.Equal(t, 14, int(fakeClientStreamingClient.sendCount.Load()))

helpers.RemoveFileWithErrorCheck(t, testFile.Name())
}

func TestFileManagerService_ConfigApply_Add(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
Expand Down
35 changes: 35 additions & 0 deletions internal/file/file_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package file
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path"

"github.com/nginx/agent/v3/internal/model"

"google.golang.org/grpc"

"github.com/nginx/agent/v3/pkg/files"
Expand Down Expand Up @@ -140,3 +143,35 @@ func (fo *FileOperator) ReadChunk(

return chunk, err
}

func (fo *FileOperator) WriteManifestFile(updatedFiles map[string]*model.ManifestFile, manifestDir,
manifestPath string,
) (writeError error) {
manifestJSON, err := json.MarshalIndent(updatedFiles, "", " ")
if err != nil {
return fmt.Errorf("unable to marshal manifest file json: %w", err)
}

// 0755 allows read/execute for all, write for owner
if err = os.MkdirAll(manifestDir, dirPerm); err != nil {
return fmt.Errorf("unable to create directory %s: %w", manifestDir, err)
}

// 0600 ensures only root can read/write
newFile, err := os.OpenFile(manifestPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, filePerm)
if err != nil {
return fmt.Errorf("failed to read manifest file: %w", err)
}
defer func() {
if closeErr := newFile.Close(); closeErr != nil {
writeError = closeErr
}
}()

_, err = newFile.Write(manifestJSON)
if err != nil {
return fmt.Errorf("failed to write manifest file: %w", err)
}

return writeError
}
115 changes: 42 additions & 73 deletions internal/file/file_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package file

import (
"context"
"fmt"
"log/slog"

"github.com/nginx/agent/v3/pkg/files"
Expand All @@ -32,12 +31,16 @@ type FilePlugin struct {
config *config.Config
conn grpc.GrpcConnectionInterface
fileManagerService fileManagerServiceInterface
serverType model.ServerType
}

func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface) *FilePlugin {
func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface,
serverType model.ServerType,
) *FilePlugin {
return &FilePlugin{
config: agentConfig,
conn: grpcConnection,
config: agentConfig,
conn: grpcConnection,
serverType: serverType,
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Init & Close functions can you add the serverType to the context?

Expand All @@ -61,31 +64,43 @@ func (fp *FilePlugin) Info() *bus.Info {
}
}

// nolint: cyclop, revive
func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
case bus.ConnectionCreatedTopic:
slog.DebugContext(ctx, "File plugin received connection created message")
fp.fileManagerService.SetIsConnected(true)
case bus.NginxConfigUpdateTopic:
fp.handleNginxConfigUpdate(ctx, msg)
case bus.ConfigUploadRequestTopic:
fp.handleConfigUploadRequest(ctx, msg)
case bus.ConfigApplyRequestTopic:
fp.handleConfigApplyRequest(ctx, msg)
case bus.ConfigApplyCompleteTopic:
fp.handleConfigApplyComplete(ctx, msg)
case bus.ConfigApplySuccessfulTopic:
fp.handleConfigApplySuccess(ctx, msg)
case bus.ConfigApplyFailedTopic:
fp.handleConfigApplyFailedRequest(ctx, msg)
default:
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
if logger.ServerType(ctx) == fp.serverType.String() || logger.ServerType(ctx) == "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If logger.ServerType(ctx) == "" then we should create a new context with the serverType.

switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
case bus.ConnectionCreatedTopic:
slog.DebugContext(ctx, "File plugin received connection created message")
fp.fileManagerService.SetIsConnected(true)
case bus.NginxConfigUpdateTopic:
fp.handleNginxConfigUpdate(ctx, msg)
case bus.ConfigUploadRequestTopic:
fp.handleConfigUploadRequest(ctx, msg)
case bus.ConfigApplyRequestTopic:
fp.handleConfigApplyRequest(ctx, msg)
case bus.ConfigApplyCompleteTopic:
fp.handleConfigApplyComplete(ctx, msg)
case bus.ConfigApplySuccessfulTopic:
fp.handleConfigApplySuccess(ctx, msg)
case bus.ConfigApplyFailedTopic:
fp.handleConfigApplyFailedRequest(ctx, msg)
default:
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
}
}
}

func (fp *FilePlugin) Subscriptions() []string {
if fp.serverType == model.Auxiliary {
return []string{
bus.ConnectionResetTopic,
bus.ConnectionCreatedTopic,
bus.NginxConfigUpdateTopic,
bus.ConfigUploadRequestTopic,
}
}

return []string{
bus.ConnectionResetTopic,
bus.ConnectionCreatedTopic,
Expand Down Expand Up @@ -319,27 +334,10 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess
return
}

updateError := fp.fileManagerService.UpdateCurrentFilesOnDisk(
ctx,
files.ConvertToMapOfFiles(nginxConfigContext.Files),
true,
)
if updateError != nil {
slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError)
}

slog.InfoContext(ctx, "Updating overview after nginx config update")
err := fp.fileManagerService.UpdateOverview(ctx, nginxConfigContext.InstanceID, nginxConfigContext.Files, 0)
if err != nil {
slog.ErrorContext(
ctx,
"Failed to update file overview",
"instance_id", nginxConfigContext.InstanceID,
"error", err,
)
}
fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext)
}

// nolint: dupl
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remove duplicate code between the file plugin and the read only file plugin, you could use embedding to take all the functionally from the read only file plugin and override whatever functions you need to change.

This can be done by updateing the FilePlugin struct to the following:

type FilePlugin struct {
	*ReadFilePlugin
}

func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "File plugin received config upload request message")
managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
Expand All @@ -357,36 +355,7 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me

correlationID := logger.CorrelationID(ctx)

var updatingFilesError error

for _, file := range configUploadRequest.GetOverview().GetFiles() {
err := fp.fileManagerService.UpdateFile(
ctx,
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
file,
)
if err != nil {
slog.ErrorContext(
ctx,
"Failed to update file",
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
"file_name", file.GetFileMeta().GetName(),
"error", err,
)

response := fp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_ERROR,
fmt.Sprintf("Failed to update file %s", file.GetFileMeta().GetName()),
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
err.Error(),
)

updatingFilesError = err

fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response})

break
}
}
updatingFilesError := fp.fileManagerService.ConfigUpload(ctx, configUploadRequest)

response := &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
Expand Down
Loading