diff --git a/.dockerignore b/.dockerignore index ea986d5983..666562beed 100644 --- a/.dockerignore +++ b/.dockerignore @@ -14,6 +14,7 @@ private/buf/cmd/buf/command/alpha/protoc/internal/protoc-gen-insertion-point-rec private/buf/cmd/buf/command/alpha/protoc/internal/protoc-gen-insertion-point-writer/protoc-gen-insertion-point-writer private/buf/cmd/buf/command/alpha/protoc/test.txt private/buf/cmd/buf/command/generate/internal/protoc-gen-top-level-type-names-yaml/protoc-gen-top-level-type-names-yaml +private/buf/cmd/buf/testdata/imports/*/v3/modulelocks/ private/bufpkg/bufmodule/bufmoduleapi/cmd/buf-legacyfederation-go-data/buf-legacyfederation-go-data private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-digest/buf-digest private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-new-commit-id/buf-new-commit-id diff --git a/.gitignore b/.gitignore index cdf037a4be..7fdf13c275 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ /private/buf/cmd/buf/command/alpha/protoc/internal/protoc-gen-insertion-point-writer/protoc-gen-insertion-point-writer /private/buf/cmd/buf/command/alpha/protoc/test.txt /private/buf/cmd/buf/command/generate/internal/protoc-gen-top-level-type-names-yaml/protoc-gen-top-level-type-names-yaml +/private/buf/cmd/buf/testdata/imports/*/v3/modulelocks/ /private/bufpkg/bufmodule/bufmoduleapi/cmd/buf-legacyfederation-go-data/buf-legacyfederation-go-data /private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-digest/buf-digest /private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-new-commit-id/buf-new-commit-id diff --git a/make/buf/all.mk b/make/buf/all.mk index 7cc70fbc6a..9536f3d32b 100644 --- a/make/buf/all.mk +++ b/make/buf/all.mk @@ -31,7 +31,8 @@ FILE_IGNORES := $(FILE_IGNORES) \ private/buf/cmd/buf/command/alpha/protoc/test.txt \ private/bufpkg/buftesting/cache/ \ private/buf/buftesting/cache/ \ - private/pkg/storage/storageos/tmp/ + private/pkg/storage/storageos/tmp/ \ + private/buf/cmd/buf/testdata/imports/*/v3/modulelocks/ LICENSE_HEADER_LICENSE_TYPE := apache LICENSE_HEADER_COPYRIGHT_HOLDER := Buf Technologies, Inc. LICENSE_HEADER_YEAR_RANGE := 2020-2024 diff --git a/private/buf/bufcli/cache.go b/private/buf/bufcli/cache.go index 3214c5cc17..bd9a11f876 100644 --- a/private/buf/bufcli/cache.go +++ b/private/buf/bufcli/cache.go @@ -29,6 +29,7 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmodulestore" "github.com/bufbuild/buf/private/pkg/app/appext" "github.com/bufbuild/buf/private/pkg/command" + "github.com/bufbuild/buf/private/pkg/filelock" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage/storageos" ) @@ -50,6 +51,7 @@ var ( v3CacheModuleRelDirPath, v3CacheCommitsRelDirPath, v3CacheWKTRelDirPath, + v3CacheModuleLockRelDirPath, } // v1CacheModuleDataRelDirPath is the relative path to the cache directory where module data @@ -96,6 +98,11 @@ var ( // // Normalized. v3CacheWKTRelDirPath = normalpath.Join("v3", "wellknowntypes") + // v3CacheModuleLockRelDirPath is the relative path to the lock files directory for module data. + // This directory is used to store lock files for synchronizing reading and writing module data from the cache. + // + // Normalized. + v3CacheModuleLockRelDirPath = normalpath.Join("v3", "modulelocks") ) // NewModuleDataProvider returns a new ModuleDataProvider while creating the @@ -166,12 +173,20 @@ func newModuleDataProvider( if err != nil { return nil, err } + if err := createCacheDir(container.CacheDirPath(), v3CacheModuleLockRelDirPath); err != nil { + return nil, err + } + filelocker, err := filelock.NewLocker(normalpath.Join(container.CacheDirPath(), v3CacheModuleLockRelDirPath)) + if err != nil { + return nil, err + } return bufmodulecache.NewModuleDataProvider( container.Logger(), delegateModuleDataProvider, bufmodulestore.NewModuleDataStore( container.Logger(), cacheBucket, + filelocker, ), ), nil } diff --git a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go index 4ad4968a80..1933d39b56 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go +++ b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go @@ -16,15 +16,23 @@ package bufmodulecache import ( "context" + "fmt" + "os" + "path/filepath" "testing" + "time" "github.com/bufbuild/buf/private/bufpkg/bufmodule" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmodulestore" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduletesting" + "github.com/bufbuild/buf/private/pkg/filelock" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/storage/storagemem" + "github.com/bufbuild/buf/private/pkg/storage/storageos" + "github.com/bufbuild/buf/private/pkg/thread" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) func TestCommitProviderForModuleKeyBasic(t *testing.T) { @@ -165,6 +173,7 @@ func TestModuleDataProviderBasic(t *testing.T) { bufmodulestore.NewModuleDataStore( zap.NewNop(), storagemem.NewReadWriteBucket(), + filelock.NewNopLocker(), ), ) @@ -214,6 +223,64 @@ func TestModuleDataProviderBasic(t *testing.T) { ) } +func TestConcurrentCacheReadWrite(t *testing.T) { + t.Parallel() + + bsrProvider, moduleKeys := testGetBSRProviderAndModuleKeys(t, context.Background()) + tempDir := t.TempDir() + cacheDir := filepath.Join(tempDir, "cache") + logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) + + for i := 0; i < 20; i++ { + require.NoError(t, os.MkdirAll(cacheDir, 0755)) + jobs, err := slicesext.MapError( + []int{0, 1, 2, 3, 4}, + func(i int) (func(ctx context.Context) error, error) { + logger := logger.Named(fmt.Sprintf("job-%d", i)) + bucket, err := storageos.NewProvider().NewReadWriteBucket(cacheDir) + if err != nil { + return nil, err + } + filelocker, err := filelock.NewLocker( + cacheDir, + filelock.LockerWithLockRetryDelay(10*time.Millisecond), // Drops test time from ~16s to ~1s + ) + if err != nil { + return nil, err + } + cacheProvider := newModuleDataProvider( + logger, + bsrProvider, + bufmodulestore.NewModuleDataStore( + logger, + bucket, + filelocker, + ), + ) + return func(ctx context.Context) error { + moduleDatas, err := cacheProvider.GetModuleDatasForModuleKeys( + ctx, + moduleKeys, + ) + if err != nil { + return err + } + for _, moduleData := range moduleDatas { + // Calling moduleData.Bucket() checks the digest + if _, err := moduleData.Bucket(); err != nil { + return err + } + } + return nil + }, nil + }, + ) + require.NoError(t, err) + require.NoError(t, thread.Parallelize(context.Background(), jobs)) + require.NoError(t, os.RemoveAll(cacheDir)) + } +} + func testGetBSRProviderAndModuleKeys(t *testing.T, ctx context.Context) (bufmoduletesting.OmniProvider, []bufmodule.ModuleKey) { bsrProvider, err := bufmoduletesting.NewOmniProvider( bufmoduletesting.ModuleData{ @@ -235,7 +302,10 @@ func testGetBSRProviderAndModuleKeys(t *testing.T, ctx context.Context) (bufmodu bufmoduletesting.ModuleData{ Name: "buf.build/foo/mod3", PathToData: map[string][]byte{ - "mod3.proto": []byte( + "mod3a.proto": []byte( + `syntax = proto3; package mod3;`, + ), + "mod3b.proto": []byte( `syntax = proto3; package mod3;`, ), }, diff --git a/private/bufpkg/bufmodule/bufmodulestore/module_data_store.go b/private/bufpkg/bufmodule/bufmodulestore/module_data_store.go index 0f6db97ab9..4350480e3e 100644 --- a/private/bufpkg/bufmodule/bufmodulestore/module_data_store.go +++ b/private/bufpkg/bufmodule/bufmodulestore/module_data_store.go @@ -22,6 +22,7 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufmodule" "github.com/bufbuild/buf/private/pkg/encoding" + "github.com/bufbuild/buf/private/pkg/filelock" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/storage" @@ -38,6 +39,7 @@ var ( externalModuleDataFilesDir = "files" externalModuleDataV1BufYAMLDir = "v1_buf_yaml" externalModuleDataV1BufLockDir = "v1_buf_lock" + externalModuleDataLockFileExt = ".lock" ) // ModuleDatasResult is a result for a get of ModuleDatas. @@ -78,9 +80,10 @@ type ModuleDataStore interface { func NewModuleDataStore( logger *zap.Logger, bucket storage.ReadWriteBucket, + locker filelock.Locker, options ...ModuleDataStoreOption, ) ModuleDataStore { - return newModuleDataStore(logger, bucket, options...) + return newModuleDataStore(logger, bucket, locker, options...) } // ModuleDataStoreOption is an option for a new ModuleDataStore. @@ -101,6 +104,7 @@ func ModuleDataStoreWithTar() ModuleDataStoreOption { type moduleDataStore struct { logger *zap.Logger bucket storage.ReadWriteBucket + locker filelock.Locker tar bool } @@ -108,11 +112,13 @@ type moduleDataStore struct { func newModuleDataStore( logger *zap.Logger, bucket storage.ReadWriteBucket, + locker filelock.Locker, options ...ModuleDataStoreOption, ) *moduleDataStore { moduleDataStore := &moduleDataStore{ logger: logger, bucket: bucket, + locker: locker, } for _, option := range options { option(moduleDataStore) @@ -129,9 +135,9 @@ func (p *moduleDataStore) GetModuleDatasForModuleKeys( for _, moduleKey := range moduleKeys { moduleData, err := p.getModuleDataForModuleKey(ctx, moduleKey) if err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return nil, nil, err - } + // Any error returned from getModuleDataForModuleKey means that no module data is read + // from the cache, and is treated as a cache miss so we can fetch new module data and + // repopulate the cache. notFoundModuleKeys = append(notFoundModuleKeys, moduleKey) } else { foundModuleDatas = append(foundModuleDatas, moduleData) @@ -152,6 +158,31 @@ func (p *moduleDataStore) PutModuleDatas( return nil } +// getModuleDataForModuleKey reads the module data for the module key from the cache. +// +// If moduleDataStore is configured to store the module data as tarballs, then we read a +// single tar for all module data stored under the module key. +// +// If moduleDataStore is configured to store module data as individual files, then it +// takes the following steps to read the module data: +// +// 1. Acquire a shared lock on the module data lock file for the module key. +// 2. Attempt to read the module.yaml file for the module key. +// 3. If no valid module.yaml is present, then return an error and no data is read from +// the cache. The module.yaml is always written to the cache last, so we consider valid +// module data to be present if module.yaml is present. +// 4. If a valid module.yaml is found, then attempt to read the module data files from the +// cache. +// 5. If an error occurs while reading the files and/or an invalid config file is found, +// then no module data is read from the cache. +// 6. Once all files have been read from the cache, release the shared lock on the module +// data lock file. +// +// It is important to note that when we read from the cache, we use the presence and contents +// of module.yaml to determine if module data exists in the cache. If there is manual intervention +// that corrupts the contents of the cache, but leaves module.yaml in-tact, then we read +// the files as valid module data at this layer, and it fails tamper-proofing digest checks +// when the module data is accessed. func (p *moduleDataStore) getModuleDataForModuleKey( ctx context.Context, moduleKey bufmodule.ModuleKey, @@ -162,22 +193,50 @@ func (p *moduleDataStore) getModuleDataForModuleKey( moduleCacheBucket, err = p.getReadBucketForTar(ctx, moduleKey) if err != nil { if !errors.Is(err, fs.ErrNotExist) { - return nil, p.deleteInvalidModuleData(ctx, moduleKey, err) + // If there is an error fetching the tar bucket that is not because the path does + // not exist, we assume this is corrupted and delete the tar. + tarPath, err := getModuleDataStoreTarPath(moduleKey) + if err != nil { + return nil, err + } + if err := p.bucket.Delete(ctx, tarPath); err != nil { + return nil, err + } + // Return a path error indicating the module data was not found + return nil, &fs.PathError{Op: "read", Path: tarPath, Err: fs.ErrNotExist} } return nil, err } } else { - moduleCacheBucket, err = p.getReadWriteBucketForDir(moduleKey) - // Not checking for fs.ErrNotExist. Function only returns error on actual system error. + dirPath, err := getModuleDataStoreDirPath(moduleKey) if err != nil { return nil, err } - } - defer func() { - if retErr != nil { - retErr = p.deleteInvalidModuleData(ctx, moduleKey, retErr) + p.logDebugModuleKey( + moduleKey, + "module data store dir read write bucket", + zap.String("dirPath", dirPath), + ) + moduleCacheBucket = storage.MapReadWriteBucket(p.bucket, storage.MapOnPrefix(dirPath)) + moduleDataStoreDirLockPath, err := getModuleDataStoreDirLockPath(moduleKey) + if err != nil { + return nil, err } - }() + // Acquire a shared lock for module data lock file for reading module data from the cache. + unlocker, err := p.locker.RLock(ctx, moduleDataStoreDirLockPath) + if err != nil { + return nil, err + } + defer func() { + // Release lock on the module data lock file. + if err := unlocker.Unlock(); err != nil { + retErr = multierr.Append(retErr, err) + } + }() + } + // Attempt to read module.yaml from cache. The module.yaml file is always written last, + // so if a valid module.yaml file is present, then we proceed to read the rest of the + // the module data. data, err := storage.ReadPath(ctx, moduleCacheBucket, externalModuleDataFileName) p.logDebugModuleKey( moduleKey, @@ -195,6 +254,8 @@ func (p *moduleDataStore) getModuleDataForModuleKey( if !externalModuleData.isValid() { return nil, fmt.Errorf("invalid %s from cache for %s: %+v", externalModuleDataFileName, moduleKey.String(), externalModuleData) } + // A valid module.yaml was found, we proceed to reading module data. + // We don't want to do this lazily (or anything else in this function) as we want to // make sure everything we have is valid before returning so we can auto-correct // the cache if necessary. @@ -263,49 +324,32 @@ func (p *moduleDataStore) getModuleDataForModuleKey( ), nil } -func (p *moduleDataStore) deleteInvalidModuleData( - ctx context.Context, - moduleKey bufmodule.ModuleKey, - invalidErr error, -) (retErr error) { - p.logDebugModuleKey( - moduleKey, - "module data store invalid module data", - zap.Error(invalidErr), - ) - defer func() { - if retErr != nil { - // Do not return error, just log. We always returns a file not found error. - p.logDebugModuleKey( - moduleKey, - "module data store could not delete module data", - zap.Error(retErr), - ) - } - // This will act as if the file is not found. - retErr = &fs.PathError{Op: "read", Path: moduleKey.String(), Err: fs.ErrNotExist} - }() - - if p.tar { - tarPath, err := getModuleDataStoreTarPath(moduleKey) - if err != nil { - return err - } - return p.bucket.Delete(ctx, tarPath) - } - dirPath, err := getModuleDataStoreDirPath(moduleKey) - if err != nil { - return err - } - return p.bucket.DeleteAll(ctx, dirPath) -} - +// putModuleData puts the module data into the module cache. +// +// If moduleDataStore is configured to store the module data as tarballs, then a single tar +// for all module data is stored under the module key. +// +// If moduleDataStore is configured to store individual files, then it takes the following steps: +// +// 1. Acquire a shared lock on the module lock file for the module key. +// 2. Attempt to read the module.yaml file to ensure that there is no valid module already +// stored in the cache. The module.yaml file is always written last in the cache, so if +// it is present, then valid module data is present, and no new module data is written. +// 3. If no module.yaml is present, then we release the shared lock and acquire an exclusive +// lock on the module lock file for writing module data. +// 4. Once the exclusive lock is acquired, we do another check to ensure that there is no +// module.yaml present. This is because the shared lock is not upgraded to an exclusive +// lock, we released the shared lock before acquiring the exclusive lock, so to ensure +// there are absolutely no race conditions, we do another check. +// 5. Once we determine there is no module.yaml present, we proceed to writing the module +// data to the cache. +// 6. We write the module.yaml after we've written all other module data files. func (p *moduleDataStore) putModuleData( ctx context.Context, moduleData bufmodule.ModuleData, ) (retErr error) { moduleKey := moduleData.ModuleKey() - var moduleCacheBucket storage.WriteBucket + var moduleCacheBucket storage.ReadWriteBucket var err error if p.tar { var callback func(ctx context.Context) error @@ -317,11 +361,100 @@ func (p *moduleDataStore) putModuleData( } }() } else { - moduleCacheBucket, err = p.getReadWriteBucketForDir(moduleKey) + dirPath, err := getModuleDataStoreDirPath(moduleKey) if err != nil { return err } + p.logDebugModuleKey( + moduleKey, + "module data store dir read write bucket", + zap.String("dirPath", dirPath), + ) + moduleCacheBucket = storage.MapReadWriteBucket(p.bucket, storage.MapOnPrefix(dirPath)) + moduleDataStoreDirLockPath, err := getModuleDataStoreDirLockPath(moduleKey) + if err != nil { + return err + } + // Acquire shared lock to check for a valid module.yaml before writing to the module cache. + readUnlocker, err := p.locker.RLock(ctx, moduleDataStoreDirLockPath) + if err != nil { + return err + } + defer func() { + if readUnlocker != nil { + if err := readUnlocker.Unlock(); err != nil { + retErr = multierr.Append(retErr, err) + } + } + }() + data, err := storage.ReadPath(ctx, moduleCacheBucket, externalModuleDataFileName) + p.logDebugModuleKey( + moduleKey, + fmt.Sprintf("module data store put read check %s", externalModuleDataFileName), + zap.Bool("found", err == nil), + zap.Error(err), + ) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return err + } + if err == nil { + var externalModuleData externalModuleData + if err := encoding.UnmarshalYAMLNonStrict(data, &externalModuleData); err != nil { + return err + } + // If a valid module.yaml is present, since module.yaml is always written last, we + // assume that there is valid module data, and we do not attempt to write new data here. + if externalModuleData.isValid() { + return nil + } + } + // Otherwise, release shared lock and set readUnlocker to nil in order to acquire an + // exclusive lock for writing module data to the cache. filelock does not allow us to + // upgrade the shared lock to an exclusive lock, so we need to release the shared lock + // before acquiring an exclusive lock. + if readUnlocker != nil { + err := readUnlocker.Unlock() + readUnlocker = nil // unset the readUnlocker since we are upgrading the lock + if err != nil { + return err + } + } + // Acquire exclusive lock on module lock file for writing module data to the cache. + unlocker, err := p.locker.Lock(ctx, moduleDataStoreDirLockPath) + if err != nil { + return err + } + defer func() { + if err := unlocker.Unlock(); err != nil { + retErr = multierr.Append(retErr, err) + } + }() + // Before we start writing module data to the cache, we first check to see if module.yaml + // is present again after acquiring the exclusive lock. + // This is because the shared lock was released before acquiring the exclusive lock, + // and we need to make sure no valid module data was written in the interim. + data, err = storage.ReadPath(ctx, moduleCacheBucket, externalModuleDataFileName) + p.logDebugModuleKey( + moduleKey, + fmt.Sprintf("module data store put check %s", externalModuleDataFileName), + zap.Bool("found", err == nil), + zap.Error(err), + ) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return err + } + if err == nil { + var externalModuleData externalModuleData + if err := encoding.UnmarshalYAMLNonStrict(data, &externalModuleData); err != nil { + return err + } + // If a valid module.yaml is present, then we do not overwrite with new data. + if externalModuleData.isValid() { + return nil + } + } } + // Proceed to writing module data. depModuleKeys, err := moduleData.DeclaredDepModuleKeys() if err != nil { return err @@ -351,7 +484,6 @@ func (p *moduleDataStore) putModuleData( ctx, filesBucket, storage.MapWriteBucket(moduleCacheBucket, storage.MapOnPrefix(externalModuleDataFilesDir)), - storage.CopyWithAtomic(), ); err != nil { return err } @@ -388,23 +520,13 @@ func (p *moduleDataStore) putModuleData( // Put the module.yaml last, so that we only have a module.yaml if the cache is finished writing. // We can use the existence of the module.yaml file to say whether or not the cache contains a // given ModuleKey, otherwise we overwrite any contents in the cache. - return storage.PutPath(ctx, moduleCacheBucket, externalModuleDataFileName, data) -} - -// Only returns error on actual system error. -func (p *moduleDataStore) getReadWriteBucketForDir( - moduleKey bufmodule.ModuleKey, -) (storage.ReadWriteBucket, error) { - dirPath, err := getModuleDataStoreDirPath(moduleKey) - if err != nil { - return nil, err - } - p.logDebugModuleKey( - moduleKey, - "module data store dir read write bucket", - zap.String("dirPath", dirPath), + return storage.PutPath( + ctx, + moduleCacheBucket, + externalModuleDataFileName, + data, + storage.PutWithAtomic(), ) - return storage.MapReadWriteBucket(p.bucket, storage.MapOnPrefix(dirPath)), nil } // May return fs.ErrNotExist error if tar not found. @@ -445,7 +567,7 @@ func (p *moduleDataStore) getReadBucketForTar( func (p *moduleDataStore) getWriteBucketAndCallbackForTar( moduleKey bufmodule.ModuleKey, -) (storage.WriteBucket, func(context.Context) error) { +) (storage.ReadWriteBucket, func(context.Context) error) { readWriteBucket := storagemem.NewReadWriteBucket() return readWriteBucket, func(ctx context.Context) (retErr error) { tarPath, err := getModuleDataStoreTarPath(moduleKey) @@ -554,6 +676,14 @@ func getDeclaredDepModuleKeyForExternalModuleDataDep(dep externalModuleDataDep) ) } +func getModuleDataStoreDirLockPath(moduleKey bufmodule.ModuleKey) (string, error) { + moduleDataStoreDirPath, err := getModuleDataStoreDirPath(moduleKey) + if err != nil { + return "", err + } + return moduleDataStoreDirPath + externalModuleDataLockFileExt, nil +} + // externalModuleData is the store representation of a ModuleData. // // We could use a protobuf Message for this. diff --git a/private/bufpkg/bufmodule/bufmodulestore/module_data_store_test.go b/private/bufpkg/bufmodule/bufmodulestore/module_data_store_test.go index 7c4f72b148..6baba477ca 100644 --- a/private/bufpkg/bufmodule/bufmodulestore/module_data_store_test.go +++ b/private/bufpkg/bufmodule/bufmodulestore/module_data_store_test.go @@ -21,10 +21,12 @@ import ( "github.com/bufbuild/buf/private/bufpkg/bufmodule" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduletesting" + "github.com/bufbuild/buf/private/pkg/filelock" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storagemem" + "github.com/bufbuild/buf/private/pkg/storage/storageos" "github.com/bufbuild/buf/private/pkg/zaputil" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -40,17 +42,41 @@ func TestModuleDataStoreBasicTar(t *testing.T) { testModuleDataStoreBasic(t, true) } +func TestModuleDataStoreOS(t *testing.T) { + t.Parallel() + testModuleDataStoreOS(t) +} + func testModuleDataStoreBasic(t *testing.T, tar bool) { - ctx := context.Background() bucket := storagemem.NewReadWriteBucket() + filelocker := filelock.NewNopLocker() var moduleDataStoreOptions []ModuleDataStoreOption if tar { moduleDataStoreOptions = append(moduleDataStoreOptions, ModuleDataStoreWithTar()) } + testModuleDataStore(t, bucket, filelocker, moduleDataStoreOptions, tar) +} + +func testModuleDataStoreOS(t *testing.T) { + tempDir := t.TempDir() + bucket, err := storageos.NewProvider().NewReadWriteBucket(tempDir) + require.NoError(t, err) + filelocker, err := filelock.NewLocker(tempDir) + require.NoError(t, err) + testModuleDataStore(t, bucket, filelocker, nil, false) +} + +func testModuleDataStore( + t *testing.T, + bucket storage.ReadWriteBucket, + filelocker filelock.Locker, + moduleDataStoreOptions []ModuleDataStoreOption, + tar bool, +) { + ctx := context.Background() logger := zaputil.NewLogger(os.Stderr, zapcore.DebugLevel, zaputil.NewTextEncoder()) - moduleDataStore := NewModuleDataStore(logger, bucket, moduleDataStoreOptions...) + moduleDataStore := NewModuleDataStore(logger, bucket, filelocker, moduleDataStoreOptions...) moduleKeys, moduleDatas := testGetModuleKeysAndModuleDatas(t, ctx) - foundModuleDatas, notFoundModuleKeys, err := moduleDataStore.GetModuleDatasForModuleKeys( ctx, moduleKeys, diff --git a/private/pkg/filelock/filelock.go b/private/pkg/filelock/filelock.go index c15cbabe00..6ac1ece7c0 100644 --- a/private/pkg/filelock/filelock.go +++ b/private/pkg/filelock/filelock.go @@ -63,8 +63,32 @@ type Locker interface { // // The root directory path should generally be a data directory path. // The root directory must exist. -func NewLocker(rootDirPath string) (Locker, error) { - return newLocker(rootDirPath) +func NewLocker(rootDirPath string, options ...LockerOption) (Locker, error) { + return newLocker(rootDirPath, options...) +} + +// LockerOption is an option for a new Locker. +type LockerOption func(*lockerOptions) + +// LockerWithLockTimeout sets the default lock timeout for the Locker. +// +// If Lock/RLock is called with LockWithTimeout, that will override this default timeout. +// If this is not set, the default lock timeout is set to DefaultLockTimeout. +func LockerWithLockTimeout(lockTimeout time.Duration) LockerOption { + return func(lockerOptions *lockerOptions) { + lockerOptions.lockTimeout = lockTimeout + } +} + +// LockerWithLockRetryDelay sets the default lock retry delay for the Locker. +// +// If Lock/RLock is called with LockWithRetryDelay, that will override the default lock +// retry delay. +// If this is not set, the default lock retry delay is set to DefaultLockRetryDelay. +func LockerWithLockRetryDelay(lockRetryDelay time.Duration) LockerOption { + return func(lockerOptions *lockerOptions) { + lockerOptions.lockRetryDelay = lockRetryDelay + } } // LockOption is an option for lock. diff --git a/private/pkg/filelock/locker.go b/private/pkg/filelock/locker.go index 190288d013..9395e43d92 100644 --- a/private/pkg/filelock/locker.go +++ b/private/pkg/filelock/locker.go @@ -18,15 +18,19 @@ import ( "context" "fmt" "os" + "time" "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/buf/private/pkg/slicesext" ) type locker struct { - rootDirPath string + rootDirPath string + lockTimeout time.Duration + lockRetryDelay time.Duration } -func newLocker(rootDirPath string) (*locker, error) { +func newLocker(rootDirPath string, options ...LockerOption) (*locker, error) { // allow symlinks fileInfo, err := os.Stat(normalpath.Unnormalize(rootDirPath)) if err != nil { @@ -35,9 +39,15 @@ func newLocker(rootDirPath string) (*locker, error) { if !fileInfo.IsDir() { return nil, fmt.Errorf("%q is not a directory", rootDirPath) } + lockerOptions := newLockerOptions() + for _, option := range options { + option(lockerOptions) + } return &locker{ // do not validate - allow anything including absolute paths and jumping context - rootDirPath: normalpath.Normalize(rootDirPath), + rootDirPath: normalpath.Normalize(rootDirPath), + lockTimeout: lockerOptions.lockTimeout, + lockRetryDelay: lockerOptions.lockRetryDelay, }, nil } @@ -45,6 +55,13 @@ func (l *locker) Lock(ctx context.Context, path string, options ...LockOption) ( if err := validatePath(path); err != nil { return nil, err } + options = slicesext.Concat( + []LockOption{ + LockWithTimeout(l.lockTimeout), + LockWithRetryDelay(l.lockRetryDelay), + }, + options, // Any additional options set will be applied last + ) return lock( ctx, normalpath.Unnormalize(normalpath.Join(l.rootDirPath, path)), @@ -56,6 +73,13 @@ func (l *locker) RLock(ctx context.Context, path string, options ...LockOption) if err := validatePath(path); err != nil { return nil, err } + options = slicesext.Concat( + []LockOption{ + LockWithTimeout(l.lockTimeout), + LockWithRetryDelay(l.lockRetryDelay), + }, + options, // Any additional options set will be applied last + ) return rlock( ctx, normalpath.Unnormalize(normalpath.Join(l.rootDirPath, path)), @@ -74,3 +98,15 @@ func validatePath(path string) error { } return nil } + +type lockerOptions struct { + lockTimeout time.Duration + lockRetryDelay time.Duration +} + +func newLockerOptions() *lockerOptions { + return &lockerOptions{ + lockTimeout: DefaultLockTimeout, + lockRetryDelay: DefaultLockRetryDelay, + } +}