diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e4ed5f2c4a..b5ba348d225 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ storage: * [CHANGE] **Breaking Change** Convert metrics generator from deployment to a statefulset in jsonnet. Refer to the PR for seamless migration instructions. [#2533](https://github.com/grafana/tempo/pull/2533) [#2467](https://github.com/grafana/tempo/pull/2647) (@zalegrala) * [FEATURE] New experimental API to derive on-demand RED metrics grouped by any attribute, and new metrics generator processor [#2368](https://github.com/grafana/tempo/pull/2368) [#2418](https://github.com/grafana/tempo/pull/2418) [#2424](https://github.com/grafana/tempo/pull/2424) [#2442](https://github.com/grafana/tempo/pull/2442) [#2480](https://github.com/grafana/tempo/pull/2480) [#2481](https://github.com/grafana/tempo/pull/2481) [#2501](https://github.com/grafana/tempo/pull/2501) [#2579](https://github.com/grafana/tempo/pull/2579) [#2582](https://github.com/grafana/tempo/pull/2582) (@mdisibio @zalegrala) * [FEATURE] New TraceQL structural operators descendant (>>), child (>), and sibling (~) [#2625](https://github.com/grafana/tempo/pull/2625) [#2660](https://github.com/grafana/tempo/pull/2660) (@mdisibio) -* [FEATURE] Add user-configurable overrides module [#2543](https://github.com/grafana/tempo/pull/2543) [#2682](https://github.com/grafana/tempo/pull/2682) (@electron0zero @kvrhdn) +* [FEATURE] Add user-configurable overrides module [#2543](https://github.com/grafana/tempo/pull/2543) [#2682](https://github.com/grafana/tempo/pull/2682) [#2681](https://github.com/grafana/tempo/pull/2681) (@electron0zero @kvrhdn) * [FEATURE] Add support for `q` query param in `/api/v2/search//values` to filter results based on a TraceQL query [#2253](https://github.com/grafana/tempo/pull/2253) (@mapno) To make use of filtering, configure `autocomplete_filtering_enabled`. * [FEATURE] Add support for `by()` and `coalesce()` to TraceQL. [#2490](https://github.com/grafana/tempo/pull/2490) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index ab945e12f14..c2a744774db 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -216,6 +216,7 @@ func (t *App) initOverridesAPI() (services.Service, error) { t.Server.HTTP.Path(overridesPath).Methods(http.MethodGet).Handler(wrapHandler(userConfigOverridesAPI.GetOverridesHandler)) t.Server.HTTP.Path(overridesPath).Methods(http.MethodPost).Handler(wrapHandler(userConfigOverridesAPI.PostOverridesHandler)) + t.Server.HTTP.Path(overridesPath).Methods(http.MethodPatch).Handler(wrapHandler(userConfigOverridesAPI.PatchOverridesHandler)) t.Server.HTTP.Path(overridesPath).Methods(http.MethodDelete).Handler(wrapHandler(userConfigOverridesAPI.DeleteOverridesHandler)) return userConfigOverridesAPI, nil diff --git a/go.mod b/go.mod index bfd0a6c253e..d20d42298a0 100644 --- a/go.mod +++ b/go.mod @@ -100,6 +100,7 @@ require ( require ( github.com/Azure/go-autorest/autorest v0.11.28 + github.com/evanphx/json-patch v4.12.0+incompatible github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/googleapis/gax-go/v2 v2.7.0 github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 diff --git a/go.sum b/go.sum index bea97374aea..147f115b5cb 100644 --- a/go.sum +++ b/go.sum @@ -587,6 +587,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= +github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= +github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM= github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= diff --git a/integration/e2e/config-all-in-one-gcs.yaml b/integration/e2e/config-all-in-one-gcs.yaml index 927361b9219..0fe96d4f731 100644 --- a/integration/e2e/config-all-in-one-gcs.yaml +++ b/integration/e2e/config-all-in-one-gcs.yaml @@ -45,8 +45,9 @@ overrides: poll_interval: 10s client: backend: gcs + # fsouza/fake-gcs-server does not support versioning + confirm_versioning: false gcs: - # TODO use separate bucket? bucket_name: tempo endpoint: https://tempo_e2e-gcs:4443/storage/v1/ insecure: true diff --git a/integration/e2e/overrides_test.go b/integration/e2e/overrides_test.go index e281f5f0653..da191042df6 100644 --- a/integration/e2e/overrides_test.go +++ b/integration/e2e/overrides_test.go @@ -63,12 +63,12 @@ func TestOverrides(t *testing.T) { // Modify overrides fmt.Println("* Setting overrides.forwarders") - err = apiClient.SetOverrides(&userconfigurableapi.UserConfigurableLimits{ + _, err = apiClient.SetOverrides(&userconfigurableapi.UserConfigurableLimits{ Forwarders: &[]string{}, - }) + }, "0") require.NoError(t, err) - limits, err := apiClient.GetOverrides() + limits, version, err := apiClient.GetOverrides() printLimits(limits) require.NoError(t, err) @@ -81,10 +81,10 @@ func TestOverrides(t *testing.T) { // Clear overrides fmt.Println("* Deleting overrides") - err = apiClient.DeleteOverrides() + err = apiClient.DeleteOverrides(version) require.NoError(t, err) - _, err = apiClient.GetOverrides() + _, _, err = apiClient.GetOverrides() require.ErrorIs(t, err, httpclient.ErrNotFound) }) } diff --git a/modules/overrides/user_configurable_overrides.go b/modules/overrides/user_configurable_overrides.go index 21be818cc0d..97d152ceb44 100644 --- a/modules/overrides/user_configurable_overrides.go +++ b/modules/overrides/user_configurable_overrides.go @@ -21,6 +21,7 @@ import ( api "github.com/grafana/tempo/modules/overrides/userconfigurableapi" tempo_log "github.com/grafana/tempo/pkg/util/log" + "github.com/grafana/tempo/tempodb/backend" ) type UserConfigurableOverridesConfig struct { @@ -142,7 +143,11 @@ func (o *userConfigurableOverridesManager) reloadAllTenantLimits(ctx context.Con // For every tenant with user-configurable overrides, download and cache them for _, tenant := range tenants { - limits, err := o.client.Get(ctx, tenant) + limits, _, err := o.client.Get(ctx, tenant) + if err == backend.ErrDoesNotExist { + o.setTenantLimit(tenant, nil) + continue + } if err != nil { return errors.Wrapf(err, "failed to load tenant limits for tenant %v", tenant) } diff --git a/modules/overrides/user_configurable_overrides_test.go b/modules/overrides/user_configurable_overrides_test.go index eba7231e467..364d09baf07 100644 --- a/modules/overrides/user_configurable_overrides_test.go +++ b/modules/overrides/user_configurable_overrides_test.go @@ -38,7 +38,7 @@ func TestUserConfigOverridesManager(t *testing.T) { userConfigurableLimits := &api.UserConfigurableLimits{ Forwarders: &[]string{"my-other-forwarder"}, } - err := mgr.client.Set(context.Background(), tenant2, userConfigurableLimits) + _, err := mgr.client.Set(context.Background(), tenant2, userConfigurableLimits, backend.VersionNew) assert.NoError(t, err) assert.NoError(t, mgr.reloadAllTenantLimits(context.Background())) @@ -50,12 +50,12 @@ func TestUserConfigOverridesManager(t *testing.T) { assert.Equal(t, []string{"my-other-forwarder"}, mgr.Forwarders(tenant2)) // Delete limits for tenant-1 - err = mgr.client.Delete(context.Background(), tenant2) + err = mgr.client.Delete(context.Background(), tenant2, backend.VersionNew) assert.NoError(t, err) assert.NoError(t, mgr.reloadAllTenantLimits(context.Background())) - // Verify default limits are returned + // Verify default limits are returned again assert.Equal(t, 1024, mgr.MaxBytesPerTrace(tenant1)) assert.Equal(t, []string{"my-forwarder"}, mgr.Forwarders(tenant1)) assert.Equal(t, 1024, mgr.MaxBytesPerTrace(tenant2)) @@ -92,7 +92,7 @@ func TestUserConfigOverridesManager_deletedFromBackend(t *testing.T) { limits := &api.UserConfigurableLimits{ Forwarders: &[]string{"my-other-forwarder"}, } - err := mgr.client.Set(context.Background(), tenant1, limits) + _, err := mgr.client.Set(context.Background(), tenant1, limits, backend.VersionNew) assert.NoError(t, err) assert.NoError(t, mgr.reloadAllTenantLimits(context.Background())) @@ -118,13 +118,13 @@ func TestUserConfigOverridesManager_backendUnavailable(t *testing.T) { limits := &api.UserConfigurableLimits{ Forwarders: &[]string{"my-other-forwarder"}, } - err := mgr.client.Set(context.Background(), tenant1, limits) + _, err := mgr.client.Set(context.Background(), tenant1, limits, backend.VersionNew) assert.NoError(t, err) assert.NoError(t, mgr.reloadAllTenantLimits(context.Background())) // replace reader by this uncooperative fella - mgr.client = badClient{} + mgr.client = &badClient{} // reloading fails assert.Error(t, mgr.reloadAllTenantLimits(context.Background())) @@ -197,7 +197,7 @@ func writeUserConfigurableOverridesToDisk(t *testing.T, dir string, tenant strin }) assert.NoError(t, err) - err = client.Set(context.Background(), tenant, limits) + _, err = client.Set(context.Background(), tenant, limits, backend.VersionNew) assert.NoError(t, err) } @@ -208,25 +208,27 @@ func deleteUserConfigurableOverridesFromDisk(t *testing.T, dir string, tenant st }) assert.NoError(t, err) - err = client.Delete(context.Background(), tenant) + err = client.Delete(context.Background(), tenant, backend.VersionNew) assert.NoError(t, err) } type badClient struct{} -func (b badClient) List(context.Context) ([]string, error) { +var _ api.Client = (*badClient)(nil) + +func (b *badClient) List(context.Context) ([]string, error) { return nil, errors.New("no") } -func (b badClient) Get(context.Context, string) (*api.UserConfigurableLimits, error) { - return nil, errors.New("no") +func (b *badClient) Get(context.Context, string) (*api.UserConfigurableLimits, backend.Version, error) { + return nil, "", errors.New("no") } -func (b badClient) Set(context.Context, string, *api.UserConfigurableLimits) error { - return errors.New("no") +func (b *badClient) Set(context.Context, string, *api.UserConfigurableLimits, backend.Version) (backend.Version, error) { + return "", errors.New("no") } -func (b badClient) Delete(context.Context, string) error { +func (b *badClient) Delete(context.Context, string, backend.Version) error { return errors.New("no") } diff --git a/modules/overrides/userconfigurableapi/client.go b/modules/overrides/userconfigurableapi/client.go index 3714133eebf..d4e351928ad 100644 --- a/modules/overrides/userconfigurableapi/client.go +++ b/modules/overrides/userconfigurableapi/client.go @@ -9,11 +9,13 @@ import ( "os" "path" + "github.com/go-kit/log/level" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/tempo/pkg/util/log" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/azure" "github.com/grafana/tempo/tempodb/backend/gcs" @@ -27,6 +29,11 @@ const ( ) var ( + metricList = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "overrides_user_configurable_overrides_list_total", + Help: "How often the user-configurable overrides was listed", + }) metricFetch = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "overrides_user_configurable_overrides_fetch_total", @@ -42,6 +49,10 @@ var ( type UserConfigurableOverridesClientConfig struct { Backend string `yaml:"backend"` + // ConfirmVersioning is enabled when creating the backend client. If versioning is disabled no + // checks against concurrent writes will be performed. + ConfirmVersioning bool `yaml:"confirm_versioning"` + Local *local.Config `yaml:"local"` GCS *gcs.Config `yaml:"gcs"` S3 *s3.Config `yaml:"s3"` @@ -49,72 +60,93 @@ type UserConfigurableOverridesClientConfig struct { } func (c *UserConfigurableOverridesClientConfig) RegisterFlagsAndApplyDefaults(*flag.FlagSet) { + c.ConfirmVersioning = true + c.Local = &local.Config{} c.GCS = &gcs.Config{} c.S3 = &s3.Config{} c.Azure = &azure.Config{} } +// Client is a collection of methods to manage overrides on a backend. type Client interface { + // List tenants that have user-configurable overrides. List(ctx context.Context) ([]string, error) - Get(context.Context, string) (*UserConfigurableLimits, error) - Set(context.Context, string, *UserConfigurableLimits) error - Delete(context.Context, string) error + // Get the user-configurable overrides. Returns backend.ErrDoesNotExist if no limits are set. + Get(context.Context, string) (*UserConfigurableLimits, backend.Version, error) + // Set the user-configurable overrides. Returns backend.ErrVersionDoesNotMatch if the backend + // has a newer version. + Set(context.Context, string, *UserConfigurableLimits, backend.Version) (backend.Version, error) + // Delete the user-configurable overrides. + Delete(context.Context, string, backend.Version) error + // Shutdown the client. Shutdown() } type userConfigOverridesClient struct { - r backend.RawReader - w backend.RawWriter + rw backend.VersionedReaderWriter } var _ Client = (*userConfigOverridesClient)(nil) func NewUserConfigOverridesClient(cfg *UserConfigurableOverridesClientConfig) (Client, error) { - r, w, err := initBackend(cfg) + rw, err := initBackend(cfg) if err != nil { return nil, err } - return &userConfigOverridesClient{r, w}, nil + return &userConfigOverridesClient{rw}, nil } func (o *userConfigOverridesClient) Shutdown() { - o.r.Shutdown() + o.rw.Shutdown() } -func initBackend(cfg *UserConfigurableOverridesClientConfig) (reader backend.RawReader, writer backend.RawWriter, err error) { +func initBackend(cfg *UserConfigurableOverridesClientConfig) (rw backend.VersionedReaderWriter, err error) { switch cfg.Backend { case backend.Local: - reader, writer, _, err = local.New(cfg.Local) + r, w, _, err := local.New(cfg.Local) if err != nil { - return + return nil, err } // Create overrides directory with necessary permissions err = os.MkdirAll(path.Join(cfg.Local.Path, OverridesKeyPath), os.ModePerm) + if err != nil { + return nil, err + } + rw = backend.NewFakeVersionedReaderWriter(r, w) case backend.GCS: - reader, writer, _, err = gcs.New(cfg.GCS) + rw, err = gcs.NewVersionedReaderWriter(cfg.GCS, cfg.ConfirmVersioning) case backend.S3: - reader, writer, _, err = s3.New(cfg.S3) + rw, err = s3.NewVersionedReaderWriter(cfg.S3) case backend.Azure: - reader, writer, _, err = azure.New(cfg.Azure) + rw, err = azure.NewVersionedReaderWriter(cfg.Azure) default: err = fmt.Errorf("unknown backend %s", cfg.Backend) } - return + if err != nil { + return nil, err + } + if cfg.Backend == backend.Local || cfg.Backend == backend.S3 || cfg.Backend == backend.Azure { + level.Warn(log.Logger).Log( + "msg", "versioned backend requests are best-effort for the configured backend, concurrent requests modifying user-configurable overrides might cause data races", + "backend", cfg.Backend, + ) + } + return rw, nil } func (o *userConfigOverridesClient) List(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.List") defer span.Finish() - return o.r.List(ctx, []string{OverridesKeyPath}) + metricList.Inc() + + return o.rw.List(ctx, []string{OverridesKeyPath}) } -// Get downloads the tenant limits from the backend. Returns nil, nil if no limits are set. -func (o *userConfigOverridesClient) Get(ctx context.Context, userID string) (tenantLimits *UserConfigurableLimits, err error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.Get") +func (o *userConfigOverridesClient) Get(ctx context.Context, userID string) (tenantLimits *UserConfigurableLimits, version backend.Version, err error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.Get", opentracing.Tag{Key: "tenant", Value: userID}) defer span.Finish() - span.SetTag("tenant", userID) metricFetch.WithLabelValues(userID).Inc() defer func() { @@ -123,12 +155,9 @@ func (o *userConfigOverridesClient) Get(ctx context.Context, userID string) (ten } }() - reader, _, err := o.r.Read(ctx, OverridesFileName, []string{OverridesKeyPath, userID}, false) - if err == backend.ErrDoesNotExist { - return nil, nil - } + reader, version, err := o.rw.ReadVersioned(ctx, OverridesFileName, []string{OverridesKeyPath, userID}) if err != nil { - return + return nil, "", err } defer reader.Close() @@ -137,26 +166,21 @@ func (o *userConfigOverridesClient) Get(ctx context.Context, userID string) (ten return } -// Set stores the user-configurable limits on the backend. -func (o *userConfigOverridesClient) Set(ctx context.Context, userID string, limits *UserConfigurableLimits) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.Set") +func (o *userConfigOverridesClient) Set(ctx context.Context, userID string, limits *UserConfigurableLimits, version backend.Version) (backend.Version, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.Set", opentracing.Tag{Key: "tenant", Value: userID}) defer span.Finish() - span.SetTag("tenant", userID) data, err := jsoniter.Marshal(limits) if err != nil { - return err + return "", err } - // Store on the bucket - return o.w.Write(ctx, OverridesFileName, []string{OverridesKeyPath, userID}, bytes.NewReader(data), -1, false) + return o.rw.WriteVersioned(ctx, OverridesFileName, []string{OverridesKeyPath, userID}, bytes.NewReader(data), version) } -// Delete will clear all user-configurable limits for the given tenant. -func (o *userConfigOverridesClient) Delete(ctx context.Context, userID string) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.Delete") +func (o *userConfigOverridesClient) Delete(ctx context.Context, userID string, version backend.Version) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "userConfigOverridesClient.Delete", opentracing.Tag{Key: "tenant", Value: userID}) defer span.Finish() - span.SetTag("tenant", userID) - return o.w.Delete(ctx, OverridesFileName, []string{OverridesKeyPath, userID}, false) + return o.rw.DeleteVersioned(ctx, OverridesFileName, []string{OverridesKeyPath, userID}, version) } diff --git a/modules/overrides/userconfigurableapi/client_test.go b/modules/overrides/userconfigurableapi/client_test.go index 2b92cc1d637..1df40f1a095 100644 --- a/modules/overrides/userconfigurableapi/client_test.go +++ b/modules/overrides/userconfigurableapi/client_test.go @@ -37,10 +37,11 @@ func TestUserConfigOverridesClient(t *testing.T) { limits := &UserConfigurableLimits{ Forwarders: &[]string{"my-forwarder"}, } - assert.NoError(t, client.Set(ctx, tenant, limits)) + _, err = client.Set(ctx, tenant, limits, "") + assert.NoError(t, err) // Get - retrievedLimits, err := client.Get(ctx, tenant) + retrievedLimits, _, err := client.Get(ctx, tenant) assert.NoError(t, err) assert.Equal(t, limits, retrievedLimits) @@ -50,10 +51,9 @@ func TestUserConfigOverridesClient(t *testing.T) { assert.Equal(t, []string{tenant}, list) // Delete - assert.NoError(t, client.Delete(ctx, tenant)) + assert.NoError(t, client.Delete(ctx, tenant, "")) // Get - does not exist - retrievedLimits, err = client.Get(ctx, tenant) - assert.NoError(t, err) - assert.Nil(t, retrievedLimits) + _, _, err = client.Get(ctx, tenant) + assert.ErrorIs(t, err, backend.ErrDoesNotExist) } diff --git a/modules/overrides/userconfigurableapi/http.go b/modules/overrides/userconfigurableapi/http.go index a9488aac821..aef228aed77 100644 --- a/modules/overrides/userconfigurableapi/http.go +++ b/modules/overrides/userconfigurableapi/http.go @@ -1,9 +1,13 @@ package userconfigurableapi import ( + "bytes" "context" + "encoding/json" + "io" "net/http" + jsonpatch "github.com/evanphx/json-patch" "github.com/go-kit/log/level" "github.com/grafana/dskit/services" jsoniter "github.com/json-iterator/go" @@ -15,6 +19,12 @@ import ( "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/util/log" + "github.com/grafana/tempo/tempodb/backend" +) + +const ( + headerEtag = "ETag" + headerIfMatch = "If-Match" ) type Validator interface { @@ -67,14 +77,13 @@ func (a *UserConfigOverridesAPI) GetOverridesHandler(w http.ResponseWriter, r *h } logRequest(userID, r) - limits, err := a.client.Get(ctx, userID) - if err != nil { - handleError(span, userID, r, w, http.StatusInternalServerError, err) + limits, version, err := a.client.Get(ctx, userID) + if err == backend.ErrDoesNotExist { + w.WriteHeader(http.StatusNotFound) return } - - if limits == nil { - w.WriteHeader(http.StatusNotFound) + if err != nil { + handleError(span, userID, r, w, http.StatusInternalServerError, err) return } @@ -84,8 +93,8 @@ func (a *UserConfigOverridesAPI) GetOverridesHandler(w http.ResponseWriter, r *h return } + w.Header().Set(headerEtag, string(version)) w.Header().Set(api.HeaderContentType, api.HeaderAcceptJSON) - w.WriteHeader(http.StatusOK) _, _ = w.Write(data) } @@ -102,6 +111,12 @@ func (a *UserConfigOverridesAPI) PostOverridesHandler(w http.ResponseWriter, r * } logRequest(userID, r) + ifMatchVersion := r.Header.Get(headerIfMatch) + if ifMatchVersion == "" { + handleError(span, userID, r, w, http.StatusPreconditionRequired, errors.New("must specify If-Match header")) + return + } + d := jsoniter.NewDecoder(r.Body) // error in case of unwanted fields d.DisallowUnknownFields() @@ -110,7 +125,6 @@ func (a *UserConfigOverridesAPI) PostOverridesHandler(w http.ResponseWriter, r * err = d.Decode(&limits) if err != nil { - // bad JSON or unrecognized json field handleError(span, userID, r, w, http.StatusBadRequest, err) return } @@ -121,12 +135,90 @@ func (a *UserConfigOverridesAPI) PostOverridesHandler(w http.ResponseWriter, r * return } - err = a.client.Set(ctx, userID, limits) + version, err := a.client.Set(ctx, userID, limits, backend.Version(ifMatchVersion)) + if err == backend.ErrVersionDoesNotMatch { + handleError(span, userID, r, w, http.StatusPreconditionFailed, err) + return + } if err != nil { handleError(span, userID, r, w, http.StatusInternalServerError, errors.Wrap(err, "failed to store user-configurable limits")) + return } - w.WriteHeader(http.StatusOK) + w.Header().Set(headerEtag, string(version)) +} + +func (a *UserConfigOverridesAPI) PatchOverridesHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + span, ctx := opentracing.StartSpanFromContext(ctx, "UserConfigOverridesAPI.PatchOverridesHandler") + defer span.Finish() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + handleError(span, userID, r, w, http.StatusBadRequest, err) + return + } + logRequest(userID, r) + + patch, err := io.ReadAll(r.Body) + if err != nil { + handleError(span, userID, r, w, http.StatusInternalServerError, err) + return + } + + currLimits, currVersion, err := a.client.Get(ctx, userID) + if err != nil && err != backend.ErrDoesNotExist { + handleError(span, userID, r, w, http.StatusInternalServerError, err) + return + } + + patchedBytes := patch + if err != backend.ErrDoesNotExist { + currBytes, err := json.Marshal(currLimits) + if err != nil { + handleError(span, userID, r, w, http.StatusInternalServerError, err) + return + } + + patchedBytes, err = jsonpatch.MergePatch(currBytes, patch) + if err != nil { + handleError(span, userID, r, w, http.StatusBadRequest, errors.Wrap(err, "applying patch failed")) + return + } + } else { + currVersion = backend.VersionNew + } + + var patchedLimits UserConfigurableLimits + d := jsoniter.NewDecoder(bytes.NewReader(patchedBytes)) + // error in case of unwanted fields + d.DisallowUnknownFields() + + err = d.Decode(&patchedLimits) + if err != nil { + handleError(span, userID, r, w, http.StatusBadRequest, err) + return + } + + err = a.validator.Validate(&patchedLimits) + if err != nil { + handleError(span, userID, r, w, http.StatusBadRequest, err) + return + } + + updatedVersion, err := a.client.Set(ctx, userID, &patchedLimits, currVersion) + if err == backend.ErrVersionDoesNotMatch { + handleError(span, userID, r, w, http.StatusInternalServerError, errors.New("overrides have been modified during request processing, try again")) + return + } + if err != nil { + handleError(span, userID, r, w, http.StatusInternalServerError, err) + return + } + + w.Header().Set(headerEtag, string(updatedVersion)) + w.Header().Set(api.HeaderContentType, api.HeaderAcceptJSON) + _, _ = w.Write(patchedBytes) } func (a *UserConfigOverridesAPI) DeleteOverridesHandler(w http.ResponseWriter, r *http.Request) { @@ -141,9 +233,15 @@ func (a *UserConfigOverridesAPI) DeleteOverridesHandler(w http.ResponseWriter, r } logRequest(userID, r) - err = a.client.Delete(ctx, userID) + ifMatchVersion := r.Header.Get(headerIfMatch) + if ifMatchVersion == "" { + handleError(span, userID, r, w, http.StatusPreconditionRequired, errors.New("must specify If-Match header")) + return + } + + err = a.client.Delete(ctx, userID, backend.Version(ifMatchVersion)) if err != nil { - handleError(span, userID, nil, w, http.StatusInternalServerError, errors.Wrap(err, "failed to delete user-configurable limits")) + handleError(span, userID, r, w, http.StatusInternalServerError, errors.Wrap(err, "failed to delete user-configurable limits")) } w.WriteHeader(http.StatusOK) diff --git a/modules/overrides/userconfigurableapi/http_test.go b/modules/overrides/userconfigurableapi/http_test.go index cec875a6abe..0df173cdf03 100644 --- a/modules/overrides/userconfigurableapi/http_test.go +++ b/modules/overrides/userconfigurableapi/http_test.go @@ -2,6 +2,7 @@ package userconfigurableapi import ( "bytes" + "context" "errors" "net/http" "net/http/httptest" @@ -11,7 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" - "golang.org/x/net/context" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/tempodb/backend" @@ -30,9 +30,10 @@ func Test_UserConfigOverridesAPI_overridesHandlers(t *testing.T) { require.NoError(t, err) // Provision some data - require.NoError(t, overridesAPI.client.Set(context.Background(), tenant, &UserConfigurableLimits{ + _, err = overridesAPI.client.Set(context.Background(), tenant, &UserConfigurableLimits{ Forwarders: &[]string{"my-other-forwarder"}, - })) + }, backend.VersionNew) + require.NoError(t, err) postJSON, err := jsoniter.Marshal(&UserConfigurableLimits{ Forwarders: &[]string{"my-updated-forwarder"}, @@ -115,7 +116,7 @@ func Test_UserConfigOverridesAPI_overridesHandlers(t *testing.T) { assert.Equal(t, tc.expStatusCode, res.StatusCode) if tc.req.Method == http.MethodPost { - limits, err := overridesAPI.client.Get(context.Background(), tenant) + limits, _, err := overridesAPI.client.Get(context.Background(), tenant) assert.NoError(t, err) assert.NotNil(t, limits.Forwarders) assert.Equal(t, *limits.Forwarders, []string{"my-updated-forwarder"}) @@ -124,13 +125,199 @@ func Test_UserConfigOverridesAPI_overridesHandlers(t *testing.T) { } } +func Test_UserConfigOverridesAPI_patchOverridesHandlers(t *testing.T) { + tenant := "my-tenant" + + tests := []struct { + name string + patch string + current string + expResp string + expContentType string + expStatusCode int + }{ + { + name: "PATCH - no values stored yet", + patch: `{"forwarders":["my-other-forwarder"]}`, + current: ``, + expResp: `{"forwarders":["my-other-forwarder"]}`, + expContentType: api.HeaderAcceptJSON, + expStatusCode: 200, + }, + { + name: "PATCH - empty overrides are merged", + patch: `{"forwarders":["my-other-forwarder"]}`, + current: `{}`, + expResp: `{"forwarders":["my-other-forwarder"]}`, + expContentType: api.HeaderAcceptJSON, + expStatusCode: 200, + }, + { + name: "PATCH - overwrite", + patch: `{"forwarders":["my-other-forwarder"]}`, + current: `{"forwarders":["previous-forwarder"]}`, + expResp: `{"forwarders":["my-other-forwarder"]}`, + expContentType: api.HeaderAcceptJSON, + expStatusCode: 200, + }, + { + name: "PATCH - invalid patch", + patch: `{"newField":true}`, + current: `{"forwarders":["prior-forwarder"]}`, + expResp: "userconfigurableapi.UserConfigurableLimits.ReadObject: found unknown field: newField, error found in #10 byte of ...|\"newField\":true}|..., bigger context ...|{\"forwarders\":[\"prior-forwarder\"],\"newField\":true}|...\n", + expStatusCode: 400, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + overridesAPI, err := NewUserConfigOverridesAPI(&UserConfigurableOverridesClientConfig{ + Backend: backend.Local, + Local: &local.Config{Path: t.TempDir()}, + }, &mockValidator{}) + require.NoError(t, err) + + if tc.current != "" { + _, err := overridesAPI.client.Set(context.Background(), tenant, parseJSON(t, tc.current), backend.VersionNew) + assert.NoError(t, err) + } + + w := httptest.NewRecorder() + + r := httptest.NewRequest(http.MethodPatch, "/", bytes.NewReader([]byte(tc.patch))) + ctx := user.InjectOrgID(r.Context(), tenant) + r = r.WithContext(ctx) + + overridesAPI.PatchOverridesHandler(w, r) + + data := w.Body.String() + require.Equal(t, tc.expResp, data) + + res := w.Result() + if tc.expContentType != "" { + require.Equal(t, tc.expContentType, w.Header().Get(api.HeaderContentType)) + } + require.Equal(t, tc.expStatusCode, res.StatusCode) + }) + } +} + +func TestUserConfigOverridesAPI_patchOverridesHandler_noVersionConflict(t *testing.T) { + overridesAPI, err := NewUserConfigOverridesAPI(&UserConfigurableOverridesClientConfig{ + Backend: backend.Local, + Local: &local.Config{Path: t.TempDir()}, + }, &mockValidator{}) + require.NoError(t, err) + + // inject our client + testClient := &testClient{} + overridesAPI.client = testClient + + testClient.get = func(ctx context.Context, userID string) (*UserConfigurableLimits, backend.Version, error) { + return &UserConfigurableLimits{}, "1", nil + } + testClient.set = func(ctx context.Context, userID string, limits *UserConfigurableLimits, version backend.Version) (backend.Version, error) { + // Must pass in version from get + assert.Equal(t, backend.Version("1"), version) + assert.NotNil(t, limits) + assert.Equal(t, UserConfigurableLimits{Forwarders: &[]string{"f"}}, *limits) + return "2", nil + } + + w := httptest.NewRecorder() + + r := httptest.NewRequest(http.MethodPatch, "/", bytes.NewReader([]byte(`{"forwarders":["f"]}`))) + ctx := user.InjectOrgID(r.Context(), "foo") + r = r.WithContext(ctx) + + overridesAPI.PatchOverridesHandler(w, r) + + data := w.Body.String() + assert.Equal(t, `{"forwarders":["f"]}`, data) + + res := w.Result() + assert.Equal(t, "2", res.Header.Get(headerEtag)) + assert.Equal(t, 200, res.StatusCode) +} + +func TestUserConfigOverridesAPI_patchOverridesHandler_versionConflict(t *testing.T) { + overridesAPI, err := NewUserConfigOverridesAPI(&UserConfigurableOverridesClientConfig{ + Backend: backend.Local, + Local: &local.Config{Path: t.TempDir()}, + }, &mockValidator{}) + require.NoError(t, err) + + // inject our client + testClient := &testClient{} + overridesAPI.client = testClient + + testClient.get = func(ctx context.Context, userID string) (*UserConfigurableLimits, backend.Version, error) { + return &UserConfigurableLimits{}, "1", nil + } + testClient.set = func(ctx context.Context, userID string, limits *UserConfigurableLimits, version backend.Version) (backend.Version, error) { + // Someone else changed the file! + return "", backend.ErrVersionDoesNotMatch + } + + w := httptest.NewRecorder() + + r := httptest.NewRequest(http.MethodPatch, "/", bytes.NewReader([]byte(`{"forwarders":["f"]}`))) + ctx := user.InjectOrgID(r.Context(), "foo") + r = r.WithContext(ctx) + + overridesAPI.PatchOverridesHandler(w, r) + + res := w.Result() + assert.Equal(t, 500, res.StatusCode) + + data := w.Body.String() + assert.Equal(t, "overrides have been modified during request processing, try again\n", data) +} + func prepareRequest(tenant, method string, payload []byte) *http.Request { r := httptest.NewRequest(method, "/", bytes.NewReader(payload)) ctx := user.InjectOrgID(r.Context(), tenant) r = r.WithContext(ctx) + + if method == "POST" || method == "DELETE" { + r.Header.Set(headerIfMatch, string(backend.VersionNew)) + } + return r } +func parseJSON(t *testing.T, s string) *UserConfigurableLimits { + var limits UserConfigurableLimits + err := jsoniter.Unmarshal([]byte(s), &limits) + require.NoError(t, err) + return &limits +} + +type testClient struct { + get func(context.Context, string) (*UserConfigurableLimits, backend.Version, error) + set func(context.Context, string, *UserConfigurableLimits, backend.Version) (backend.Version, error) +} + +var _ Client = (*testClient)(nil) + +func (t *testClient) List(_ context.Context) ([]string, error) { + panic("implement me") +} + +func (t *testClient) Get(ctx context.Context, userID string) (*UserConfigurableLimits, backend.Version, error) { + return t.get(ctx, userID) +} + +func (t *testClient) Set(ctx context.Context, userID string, limits *UserConfigurableLimits, version backend.Version) (backend.Version, error) { + return t.set(ctx, userID, limits, version) +} + +func (t *testClient) Delete(_ context.Context, _ string, _ backend.Version) error { + panic("implement me") +} + +func (t *testClient) Shutdown() { +} + type mockValidator struct { err error } diff --git a/modules/querier/external/client_test.go b/modules/querier/external/client_test.go index bf9a35a6bcf..e63851d7898 100644 --- a/modules/querier/external/client_test.go +++ b/modules/querier/external/client_test.go @@ -8,11 +8,12 @@ import ( "testing" "time" - "github.com/grafana/tempo/pkg/tempopb" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "go.uber.org/atomic" "golang.org/x/oauth2" + + "github.com/grafana/tempo/pkg/tempopb" ) func TestAuthHeader(t *testing.T) { diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 16d11e6e4aa..6b62b69866a 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -215,48 +215,50 @@ func (c *Client) buildQueryURL(queryType string, query string, start int64, end return fmt.Sprint(joinURL) } -func (c *Client) GetOverrides() (*api.UserConfigurableLimits, error) { +func (c *Client) GetOverrides() (*api.UserConfigurableLimits, string, error) { req, err := http.NewRequest("GET", c.BaseURL+tempo_api.PathOverrides, nil) if err != nil { - return nil, err + return nil, "", err } req.Header.Set(acceptHeader, applicationJSON) resp, body, err := c.doRequest(req) if err != nil { if resp != nil && resp.StatusCode == http.StatusNotFound { - return nil, ErrNotFound + return nil, "", ErrNotFound } - return nil, err + return nil, "", err } limits := &api.UserConfigurableLimits{} if err = json.Unmarshal(body, limits); err != nil { - return nil, fmt.Errorf("error decoding overrides, err: %v body: %s", err, string(body)) + return nil, "", fmt.Errorf("error decoding overrides, err: %v body: %s", err, string(body)) } - return limits, err + return limits, resp.Header.Get("Etag"), err } -func (c *Client) SetOverrides(limits *api.UserConfigurableLimits) error { +func (c *Client) SetOverrides(limits *api.UserConfigurableLimits, version string) (string, error) { b, err := json.Marshal(limits) if err != nil { - return err + return "", err } req, err := http.NewRequest("POST", c.BaseURL+tempo_api.PathOverrides, bytes.NewBuffer(b)) if err != nil { - return err + return "", err } + req.Header.Set("If-Match", version) - _, _, err = c.doRequest(req) - return err + resp, _, err := c.doRequest(req) + return resp.Header.Get("Etag"), err } -func (c *Client) DeleteOverrides() error { +func (c *Client) DeleteOverrides(version string) error { req, err := http.NewRequest("DELETE", c.BaseURL+tempo_api.PathOverrides, nil) if err != nil { return err } + req.Header.Set("If-Match", version) _, _, err = c.doRequest(req) return err diff --git a/tempodb/backend/azure/azure.go b/tempodb/backend/azure/azure.go index 1d89e9550e0..9f4fce3eab3 100644 --- a/tempodb/backend/azure/azure.go +++ b/tempodb/backend/azure/azure.go @@ -31,38 +31,54 @@ type readerWriter struct { hedgedContainerURL blob.ContainerURL } +var ( + _ backend.RawReader = (*readerWriter)(nil) + _ backend.RawWriter = (*readerWriter)(nil) + _ backend.Compactor = (*readerWriter)(nil) + _ backend.VersionedReaderWriter = (*readerWriter)(nil) +) + type appendTracker struct { Name string } // NewNoConfirm gets the Azure blob container without testing it func NewNoConfirm(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { - return internalNew(cfg, false) + rw, err := internalNew(cfg, false) + return rw, rw, rw, err } // New gets the Azure blob container func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { + rw, err := internalNew(cfg, true) + return rw, rw, rw, err +} + +// NewVersionedReaderWriter creates a client to perform versioned requests. Note that write requests are +// best-effort for now. We need to update the SDK to make use of the precondition headers. +// https://github.com/grafana/tempo/issues/2705 +func NewVersionedReaderWriter(cfg *Config) (backend.VersionedReaderWriter, error) { return internalNew(cfg, true) } -func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { +func internalNew(cfg *Config, confirm bool) (*readerWriter, error) { ctx := context.Background() container, err := GetContainer(ctx, cfg, false) if err != nil { - return nil, nil, nil, errors.Wrap(err, "getting storage container") + return nil, errors.Wrap(err, "getting storage container") } hedgedContainer, err := GetContainer(ctx, cfg, true) if err != nil { - return nil, nil, nil, errors.Wrap(err, "getting hedged storage container") + return nil, errors.Wrap(err, "getting hedged storage container") } if confirm { // Getting container properties to check if container exists _, err = container.GetProperties(ctx, blob.LeaseAccessConditions{}) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to GetProperties: %w", err) + return nil, fmt.Errorf("failed to GetProperties: %w", err) } } @@ -72,7 +88,7 @@ func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWrite hedgedContainerURL: hedgedContainer, } - return rw, rw, rw, nil + return rw, nil } // Write implements backend.Writer @@ -198,6 +214,44 @@ func (rw *readerWriter) ReadRange(ctx context.Context, name string, keypath back func (rw *readerWriter) Shutdown() { } +func (rw *readerWriter) WriteVersioned(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, version backend.Version) (backend.Version, error) { + // TODO use conditional if-match API + _, currentVersion, err := rw.ReadVersioned(ctx, name, keypath) + if err != nil && err != backend.ErrDoesNotExist { + return "", err + } + if err != backend.ErrDoesNotExist && currentVersion != version { + return "", backend.ErrVersionDoesNotMatch + } + + err = rw.Write(ctx, name, keypath, data, -1, false) + if err != nil { + return "", err + } + + _, currentVersion, err = rw.ReadVersioned(ctx, name, keypath) + return currentVersion, err +} + +func (rw *readerWriter) DeleteVersioned(ctx context.Context, name string, keypath backend.KeyPath, version backend.Version) error { + // TODO use conditional if-match API + _, currentVersion, err := rw.ReadVersioned(ctx, name, keypath) + if err != nil && err != backend.ErrDoesNotExist { + return err + } + if err != backend.ErrDoesNotExist && currentVersion != version { + return backend.ErrVersionDoesNotMatch + } + + return rw.Delete(ctx, name, keypath, false) +} + +func (rw *readerWriter) ReadVersioned(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, backend.Version, error) { + // TODO properly extract version from object + readCloser, _, err := rw.Read(ctx, name, keypath, false) + return readCloser, backend.VersionNew, err +} + func (rw *readerWriter) writeAll(ctx context.Context, name string, b []byte) error { err := rw.writer(ctx, bytes.NewReader(b), name) if err != nil { diff --git a/tempodb/backend/gcs/compactor.go b/tempodb/backend/gcs/compactor.go index 4c58f362bd9..cfea93b8205 100644 --- a/tempodb/backend/gcs/compactor.go +++ b/tempodb/backend/gcs/compactor.go @@ -8,8 +8,9 @@ import ( "cloud.google.com/go/storage" "github.com/google/uuid" "github.com/googleapis/gax-go/v2" - "github.com/grafana/tempo/tempodb/backend" "google.golang.org/api/iterator" + + "github.com/grafana/tempo/tempodb/backend" ) func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error { @@ -69,7 +70,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error { func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*backend.CompactedBlockMeta, error) { name := backend.CompactedMetaFileName(blockID, tenantID, rw.cfg.Prefix) - bytes, modTime, err := rw.readAllWithModTime(context.Background(), name) + bytes, attrs, err := rw.readAll(context.Background(), name) if err != nil { return nil, readError(err) } @@ -79,7 +80,7 @@ func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) ( if err != nil { return nil, err } - out.CompactedTime = modTime + out.CompactedTime = attrs.LastModified return out, nil } diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index 4e440f8d991..9c3b09fbc8c 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -4,11 +4,12 @@ import ( "bytes" "context" "crypto/tls" + "fmt" "io" "net/http" "path" + "strconv" "strings" - "time" "github.com/grafana/tempo/tempodb/backend/instrumentation" @@ -30,33 +31,62 @@ type readerWriter struct { hedgedBucket *storage.BucketHandle } +var ( + _ backend.RawReader = (*readerWriter)(nil) + _ backend.RawWriter = (*readerWriter)(nil) + _ backend.Compactor = (*readerWriter)(nil) + _ backend.VersionedReaderWriter = (*readerWriter)(nil) +) + // NewNoConfirm gets the GCS backend without testing it func NewNoConfirm(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { - return internalNew(cfg, false) + rw, err := internalNew(cfg, false) + return rw, rw, rw, err } // New gets the GCS backend func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { - return internalNew(cfg, true) + rw, err := internalNew(cfg, true) + return rw, rw, rw, err +} + +func NewVersionedReaderWriter(cfg *Config, confirmVersioning bool) (backend.VersionedReaderWriter, error) { + rw, err := internalNew(cfg, true) + if err != nil { + return nil, err + } + + if confirmVersioning { + bucketAttrs, err := rw.bucket.Attrs(context.Background()) + if err != nil { + return nil, errors.Wrap(err, "getting bucket attrs") + } + + if !bucketAttrs.VersioningEnabled { + return nil, errors.New("versioning is not enabled on bucket") + } + } + + return rw, nil } -func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { +func internalNew(cfg *Config, confirm bool) (*readerWriter, error) { ctx := context.Background() bucket, err := createBucket(ctx, cfg, false) if err != nil { - return nil, nil, nil, errors.Wrap(err, "creating bucket") + return nil, errors.Wrap(err, "creating bucket") } hedgedBucket, err := createBucket(ctx, cfg, true) if err != nil { - return nil, nil, nil, errors.Wrap(err, "creating hedged bucket") + return nil, errors.Wrap(err, "creating hedged bucket") } // Check bucket exists by getting attrs if confirm { if _, err = bucket.Attrs(ctx); err != nil { - return nil, nil, nil, errors.Wrap(err, "getting bucket attrs") + return nil, errors.Wrap(err, "getting bucket attrs") } } @@ -66,7 +96,7 @@ func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWrite hedgedBucket: hedgedBucket, } - return rw, rw, rw, nil + return rw, nil } // Write implements backend.Writer @@ -77,7 +107,7 @@ func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend. span.SetTag("object", name) - w := rw.writer(derivedCtx, backend.ObjectFileName(keypath, name)) + w := rw.writer(derivedCtx, backend.ObjectFileName(keypath, name), nil) _, err := io.Copy(w, data) if err != nil { @@ -99,7 +129,7 @@ func (rw *readerWriter) Append(ctx context.Context, name string, keypath backend var w *storage.Writer if tracker == nil { - w = rw.writer(ctx, backend.ObjectFileName(keypath, name)) + w = rw.writer(ctx, backend.ObjectFileName(keypath, name), nil) } else { w = tracker.(*storage.Writer) } @@ -123,8 +153,8 @@ func (rw *readerWriter) CloseAppend(_ context.Context, tracker backend.AppendTra } func (rw *readerWriter) Delete(ctx context.Context, name string, keypath backend.KeyPath, _ bool) error { - handle := rw.bucket.Object(backend.ObjectFileName(keypath, name)) - return handle.Delete(ctx) + return rw.bucket.Object(backend.ObjectFileName(keypath, name)). + Delete(ctx) } // List implements backend.Reader @@ -165,7 +195,7 @@ func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.K span.SetTag("object", name) - b, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) + b, _, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) if err != nil { span.SetTag("error", true) } @@ -192,8 +222,80 @@ func (rw *readerWriter) ReadRange(ctx context.Context, name string, keypath back func (rw *readerWriter) Shutdown() { } -func (rw *readerWriter) writer(ctx context.Context, name string) *storage.Writer { +func (rw *readerWriter) WriteVersioned(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, version backend.Version) (backend.Version, error) { + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.WriteVersioned", opentracing.Tags{ + "object": name, + }) + defer span.Finish() + + generation, err := strconv.ParseInt(string(version), 10, 64) + if err != nil { + return "", errors.New("invalid version number") + } + + preconditions := &storage.Conditions{ + GenerationMatch: generation, + } + w := rw.writer(derivedCtx, backend.ObjectFileName(keypath, name), preconditions) + + _, err = io.Copy(w, data) + if err != nil { + w.Close() + span.SetTag("error", true) + return "", errors.Wrap(err, "failed to write") + } + + err = w.Close() + if err != nil { + return "", err + } + + return toVersion(w.Attrs().Generation), nil +} + +func (rw *readerWriter) DeleteVersioned(ctx context.Context, name string, keypath backend.KeyPath, version backend.Version) error { + object := rw.bucket.Object(backend.ObjectFileName(keypath, name)) + + if version != backend.VersionNew { + generation, err := strconv.ParseInt(string(version), 10, 64) + if err != nil { + return errors.New("invalid version number") + } + + preconditions := storage.Conditions{ + GenerationMatch: generation, + } + object.If(preconditions) + } + + return object.Delete(ctx) +} + +func (rw *readerWriter) ReadVersioned(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, backend.Version, error) { + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.ReadVersioned", opentracing.Tags{ + "object": name, + }) + defer span.Finish() + + b, attrs, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) + if err != nil { + span.SetTag("error", true) + return nil, "", readError(err) + } + return io.NopCloser(bytes.NewReader(b)), toVersion(attrs.Generation), nil +} + +func toVersion(generation int64) backend.Version { + return backend.Version(fmt.Sprint(generation)) +} + +func (rw *readerWriter) writer(ctx context.Context, name string, conditions *storage.Conditions) *storage.Writer { o := rw.bucket.Object(name) + if (conditions != nil && *conditions != storage.Conditions{}) { + o = o.If(*conditions) + } w := o.NewWriter(ctx) w.ChunkSize = rw.cfg.ChunkBufferSize @@ -208,29 +310,19 @@ func (rw *readerWriter) writer(ctx context.Context, name string) *storage.Writer return w } -func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error) { - r, err := rw.hedgedBucket.Object(name).NewReader(ctx) - if err != nil { - return nil, err - } - defer r.Close() - - return tempo_io.ReadAllWithEstimate(r, r.Attrs.Size) -} - -func (rw *readerWriter) readAllWithModTime(ctx context.Context, name string) ([]byte, time.Time, error) { +func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, *storage.ReaderObjectAttrs, error) { r, err := rw.hedgedBucket.Object(name).NewReader(ctx) if err != nil { - return nil, time.Time{}, err + return nil, nil, err } defer r.Close() buf, err := tempo_io.ReadAllWithEstimate(r, r.Attrs.Size) if err != nil { - return nil, time.Time{}, err + return nil, nil, err } - return buf, r.Attrs.LastModified, nil + return buf, &r.Attrs, nil } func (rw *readerWriter) readRange(ctx context.Context, name string, offset int64, buffer []byte) error { diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go index 4e1143de71d..90117163bc2 100644 --- a/tempodb/backend/s3/s3.go +++ b/tempodb/backend/s3/s3.go @@ -33,6 +33,13 @@ type readerWriter struct { hedgedCore *minio.Core } +var ( + _ backend.RawReader = (*readerWriter)(nil) + _ backend.RawWriter = (*readerWriter)(nil) + _ backend.Compactor = (*readerWriter)(nil) + _ backend.VersionedReaderWriter = (*readerWriter)(nil) +) + // appendTracker is a struct used to track multipart uploads type appendTracker struct { uploadID string @@ -65,36 +72,44 @@ func (s *overrideSignatureVersion) IsExpired() bool { // NewNoConfirm gets the S3 backend without testing it func NewNoConfirm(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { - return internalNew(cfg, false) + rw, err := internalNew(cfg, false) + return rw, rw, rw, err } // New gets the S3 backend func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { + rw, err := internalNew(cfg, true) + return rw, rw, rw, err +} + +// NewVersionedReaderWriter creates a client to perform versioned requests. Note that write requests are +// best-effort since the S3 API does not support precondition headers. +func NewVersionedReaderWriter(cfg *Config) (backend.VersionedReaderWriter, error) { return internalNew(cfg, true) } -func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWriter, backend.Compactor, error) { +func internalNew(cfg *Config, confirm bool) (*readerWriter, error) { if cfg == nil { - return nil, nil, nil, fmt.Errorf("config is nil") + return nil, fmt.Errorf("config is nil") } l := log.Logger core, err := createCore(cfg, false) if err != nil { - return nil, nil, nil, fmt.Errorf("unexpected error creating core: %w", err) + return nil, fmt.Errorf("unexpected error creating core: %w", err) } hedgedCore, err := createCore(cfg, true) if err != nil { - return nil, nil, nil, fmt.Errorf("unexpected error creating hedgedCore: %w", err) + return nil, fmt.Errorf("unexpected error creating hedgedCore: %w", err) } // try listing objects if confirm { _, err = core.ListObjects(cfg.Bucket, cfg.Prefix, "", "/", 0) if err != nil { - return nil, nil, nil, fmt.Errorf("unexpected error from ListObjects on %s: %w", cfg.Bucket, err) + return nil, fmt.Errorf("unexpected error from ListObjects on %s: %w", cfg.Bucket, err) } } @@ -104,7 +119,7 @@ func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWrite core: core, hedgedCore: hedgedCore, } - return rw, rw, rw, nil + return rw, nil } func getPutObjectOptions(rw *readerWriter) minio.PutObjectOptions { @@ -291,6 +306,55 @@ func (rw *readerWriter) ReadRange(ctx context.Context, name string, keypath back func (rw *readerWriter) Shutdown() { } +func (rw *readerWriter) WriteVersioned(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, version backend.Version) (backend.Version, error) { + // Note there is a potential data race here because S3 does not support conditional headers. If + // another process writes to the same object in between ReadVersioned and Write its changes will + // be overwritten. + _, currentVersion, err := rw.ReadVersioned(ctx, name, keypath) + if err != nil && err != backend.ErrDoesNotExist { + return "", err + } + if err != backend.ErrDoesNotExist && currentVersion != version { + return "", backend.ErrVersionDoesNotMatch + } + + err = rw.Write(ctx, name, keypath, data, -1, false) + if err != nil { + return "", err + } + + _, currentVersion, err = rw.ReadVersioned(ctx, name, keypath) + return currentVersion, err +} + +func (rw *readerWriter) DeleteVersioned(ctx context.Context, name string, keypath backend.KeyPath, version backend.Version) error { + // Note there is a potential data race here because S3 does not support conditional headers. If + // another process writes to the same object in between ReadVersioned and Delete its changes will + // be overwritten. + _, currentVersion, err := rw.ReadVersioned(ctx, name, keypath) + if err != nil && err != backend.ErrDoesNotExist { + return err + } + if err != backend.ErrDoesNotExist && currentVersion != version { + return backend.ErrVersionDoesNotMatch + } + + return rw.Delete(ctx, name, keypath, false) +} + +func (rw *readerWriter) ReadVersioned(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, backend.Version, error) { + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "s3.ReadVersioned") + defer span.Finish() + + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + b, objectInfo, err := rw.readAllWithObjInfo(derivedCtx, backend.ObjectFileName(keypath, name)) + if err != nil { + return nil, "", readError(err) + } + + return io.NopCloser(bytes.NewReader(b)), backend.Version(objectInfo.ETag), nil +} + func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error) { reader, info, _, err := rw.hedgedCore.GetObject(ctx, rw.cfg.Bucket, name, minio.GetObjectOptions{}) if err != nil { diff --git a/tempodb/backend/versioned.go b/tempodb/backend/versioned.go new file mode 100644 index 00000000000..80bd25b23f5 --- /dev/null +++ b/tempodb/backend/versioned.go @@ -0,0 +1,61 @@ +package backend + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type UpdateFn func(current io.ReadCloser) ([]byte, error) + +type Version string + +const ( + // VersionNew is a placeholder version for a new file + VersionNew Version = "0" +) + +var ErrVersionDoesNotMatch = errors.New("version does not match") + +// VersionedReaderWriter is a collection of methods to read and write data from tempodb backends with +// versioning enabled. +type VersionedReaderWriter interface { + RawReader + + // WriteVersioned data to an object, if the version does not match the request will fail with + // ErrVersionDoesNotMatch. If the operation will create a new file, pass VersionNew. + WriteVersioned(ctx context.Context, name string, keypath KeyPath, data io.Reader, version Version) (Version, error) + + // DeleteVersioned an object, if the version does not match the request will fail with + // ErrVersionDoesNotMatch. + DeleteVersioned(ctx context.Context, name string, keypath KeyPath, version Version) error + + // ReadVersioned data from an object and returns the current version. + ReadVersioned(ctx context.Context, name string, keypath KeyPath) (io.ReadCloser, Version, error) +} + +type FakeVersionedReaderWriter struct { + RawReader + RawWriter +} + +var _ VersionedReaderWriter = (*FakeVersionedReaderWriter)(nil) + +func NewFakeVersionedReaderWriter(r RawReader, w RawWriter) *FakeVersionedReaderWriter { + return &FakeVersionedReaderWriter{r, w} +} + +func (f *FakeVersionedReaderWriter) WriteVersioned(ctx context.Context, name string, keypath KeyPath, data io.Reader, _ Version) (Version, error) { + err := f.Write(ctx, name, keypath, data, -1, false) + return VersionNew, err +} + +func (f *FakeVersionedReaderWriter) ReadVersioned(ctx context.Context, name string, keypath KeyPath) (io.ReadCloser, Version, error) { + readCloser, _, err := f.Read(ctx, name, keypath, false) + return readCloser, VersionNew, err +} + +func (f *FakeVersionedReaderWriter) DeleteVersioned(ctx context.Context, name string, keypath KeyPath, _ Version) error { + return f.Delete(ctx, name, keypath, false) +} diff --git a/vendor/github.com/evanphx/json-patch/.gitignore b/vendor/github.com/evanphx/json-patch/.gitignore new file mode 100644 index 00000000000..b7ed7f956df --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/.gitignore @@ -0,0 +1,6 @@ +# editor and IDE paraphernalia +.idea +.vscode + +# macOS paraphernalia +.DS_Store diff --git a/vendor/github.com/evanphx/json-patch/LICENSE b/vendor/github.com/evanphx/json-patch/LICENSE new file mode 100644 index 00000000000..df76d7d7716 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2014, Evan Phoenix +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. +* Neither the name of the Evan Phoenix nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/evanphx/json-patch/README.md b/vendor/github.com/evanphx/json-patch/README.md new file mode 100644 index 00000000000..28e35169375 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/README.md @@ -0,0 +1,317 @@ +# JSON-Patch +`jsonpatch` is a library which provides functionality for both applying +[RFC6902 JSON patches](http://tools.ietf.org/html/rfc6902) against documents, as +well as for calculating & applying [RFC7396 JSON merge patches](https://tools.ietf.org/html/rfc7396). + +[![GoDoc](https://godoc.org/github.com/evanphx/json-patch?status.svg)](http://godoc.org/github.com/evanphx/json-patch) +[![Build Status](https://travis-ci.org/evanphx/json-patch.svg?branch=master)](https://travis-ci.org/evanphx/json-patch) +[![Report Card](https://goreportcard.com/badge/github.com/evanphx/json-patch)](https://goreportcard.com/report/github.com/evanphx/json-patch) + +# Get It! + +**Latest and greatest**: +```bash +go get -u github.com/evanphx/json-patch/v5 +``` + +**Stable Versions**: +* Version 5: `go get -u gopkg.in/evanphx/json-patch.v5` +* Version 4: `go get -u gopkg.in/evanphx/json-patch.v4` + +(previous versions below `v3` are unavailable) + +# Use It! +* [Create and apply a merge patch](#create-and-apply-a-merge-patch) +* [Create and apply a JSON Patch](#create-and-apply-a-json-patch) +* [Comparing JSON documents](#comparing-json-documents) +* [Combine merge patches](#combine-merge-patches) + + +# Configuration + +* There is a global configuration variable `jsonpatch.SupportNegativeIndices`. + This defaults to `true` and enables the non-standard practice of allowing + negative indices to mean indices starting at the end of an array. This + functionality can be disabled by setting `jsonpatch.SupportNegativeIndices = + false`. + +* There is a global configuration variable `jsonpatch.AccumulatedCopySizeLimit`, + which limits the total size increase in bytes caused by "copy" operations in a + patch. It defaults to 0, which means there is no limit. + +These global variables control the behavior of `jsonpatch.Apply`. + +An alternative to `jsonpatch.Apply` is `jsonpatch.ApplyWithOptions` whose behavior +is controlled by an `options` parameter of type `*jsonpatch.ApplyOptions`. + +Structure `jsonpatch.ApplyOptions` includes the configuration options above +and adds two new options: `AllowMissingPathOnRemove` and `EnsurePathExistsOnAdd`. + +When `AllowMissingPathOnRemove` is set to `true`, `jsonpatch.ApplyWithOptions` will ignore +`remove` operations whose `path` points to a non-existent location in the JSON document. +`AllowMissingPathOnRemove` defaults to `false` which will lead to `jsonpatch.ApplyWithOptions` +returning an error when hitting a missing `path` on `remove`. + +When `EnsurePathExistsOnAdd` is set to `true`, `jsonpatch.ApplyWithOptions` will make sure +that `add` operations produce all the `path` elements that are missing from the target object. + +Use `jsonpatch.NewApplyOptions` to create an instance of `jsonpatch.ApplyOptions` +whose values are populated from the global configuration variables. + +## Create and apply a merge patch +Given both an original JSON document and a modified JSON document, you can create +a [Merge Patch](https://tools.ietf.org/html/rfc7396) document. + +It can describe the changes needed to convert from the original to the +modified JSON document. + +Once you have a merge patch, you can apply it to other JSON documents using the +`jsonpatch.MergePatch(document, patch)` function. + +```go +package main + +import ( + "fmt" + + jsonpatch "github.com/evanphx/json-patch" +) + +func main() { + // Let's create a merge patch from these two documents... + original := []byte(`{"name": "John", "age": 24, "height": 3.21}`) + target := []byte(`{"name": "Jane", "age": 24}`) + + patch, err := jsonpatch.CreateMergePatch(original, target) + if err != nil { + panic(err) + } + + // Now lets apply the patch against a different JSON document... + + alternative := []byte(`{"name": "Tina", "age": 28, "height": 3.75}`) + modifiedAlternative, err := jsonpatch.MergePatch(alternative, patch) + + fmt.Printf("patch document: %s\n", patch) + fmt.Printf("updated alternative doc: %s\n", modifiedAlternative) +} +``` + +When ran, you get the following output: + +```bash +$ go run main.go +patch document: {"height":null,"name":"Jane"} +updated alternative doc: {"age":28,"name":"Jane"} +``` + +## Create and apply a JSON Patch +You can create patch objects using `DecodePatch([]byte)`, which can then +be applied against JSON documents. + +The following is an example of creating a patch from two operations, and +applying it against a JSON document. + +```go +package main + +import ( + "fmt" + + jsonpatch "github.com/evanphx/json-patch" +) + +func main() { + original := []byte(`{"name": "John", "age": 24, "height": 3.21}`) + patchJSON := []byte(`[ + {"op": "replace", "path": "/name", "value": "Jane"}, + {"op": "remove", "path": "/height"} + ]`) + + patch, err := jsonpatch.DecodePatch(patchJSON) + if err != nil { + panic(err) + } + + modified, err := patch.Apply(original) + if err != nil { + panic(err) + } + + fmt.Printf("Original document: %s\n", original) + fmt.Printf("Modified document: %s\n", modified) +} +``` + +When ran, you get the following output: + +```bash +$ go run main.go +Original document: {"name": "John", "age": 24, "height": 3.21} +Modified document: {"age":24,"name":"Jane"} +``` + +## Comparing JSON documents +Due to potential whitespace and ordering differences, one cannot simply compare +JSON strings or byte-arrays directly. + +As such, you can instead use `jsonpatch.Equal(document1, document2)` to +determine if two JSON documents are _structurally_ equal. This ignores +whitespace differences, and key-value ordering. + +```go +package main + +import ( + "fmt" + + jsonpatch "github.com/evanphx/json-patch" +) + +func main() { + original := []byte(`{"name": "John", "age": 24, "height": 3.21}`) + similar := []byte(` + { + "age": 24, + "height": 3.21, + "name": "John" + } + `) + different := []byte(`{"name": "Jane", "age": 20, "height": 3.37}`) + + if jsonpatch.Equal(original, similar) { + fmt.Println(`"original" is structurally equal to "similar"`) + } + + if !jsonpatch.Equal(original, different) { + fmt.Println(`"original" is _not_ structurally equal to "different"`) + } +} +``` + +When ran, you get the following output: +```bash +$ go run main.go +"original" is structurally equal to "similar" +"original" is _not_ structurally equal to "different" +``` + +## Combine merge patches +Given two JSON merge patch documents, it is possible to combine them into a +single merge patch which can describe both set of changes. + +The resulting merge patch can be used such that applying it results in a +document structurally similar as merging each merge patch to the document +in succession. + +```go +package main + +import ( + "fmt" + + jsonpatch "github.com/evanphx/json-patch" +) + +func main() { + original := []byte(`{"name": "John", "age": 24, "height": 3.21}`) + + nameAndHeight := []byte(`{"height":null,"name":"Jane"}`) + ageAndEyes := []byte(`{"age":4.23,"eyes":"blue"}`) + + // Let's combine these merge patch documents... + combinedPatch, err := jsonpatch.MergeMergePatches(nameAndHeight, ageAndEyes) + if err != nil { + panic(err) + } + + // Apply each patch individual against the original document + withoutCombinedPatch, err := jsonpatch.MergePatch(original, nameAndHeight) + if err != nil { + panic(err) + } + + withoutCombinedPatch, err = jsonpatch.MergePatch(withoutCombinedPatch, ageAndEyes) + if err != nil { + panic(err) + } + + // Apply the combined patch against the original document + + withCombinedPatch, err := jsonpatch.MergePatch(original, combinedPatch) + if err != nil { + panic(err) + } + + // Do both result in the same thing? They should! + if jsonpatch.Equal(withCombinedPatch, withoutCombinedPatch) { + fmt.Println("Both JSON documents are structurally the same!") + } + + fmt.Printf("combined merge patch: %s", combinedPatch) +} +``` + +When ran, you get the following output: +```bash +$ go run main.go +Both JSON documents are structurally the same! +combined merge patch: {"age":4.23,"eyes":"blue","height":null,"name":"Jane"} +``` + +# CLI for comparing JSON documents +You can install the commandline program `json-patch`. + +This program can take multiple JSON patch documents as arguments, +and fed a JSON document from `stdin`. It will apply the patch(es) against +the document and output the modified doc. + +**patch.1.json** +```json +[ + {"op": "replace", "path": "/name", "value": "Jane"}, + {"op": "remove", "path": "/height"} +] +``` + +**patch.2.json** +```json +[ + {"op": "add", "path": "/address", "value": "123 Main St"}, + {"op": "replace", "path": "/age", "value": "21"} +] +``` + +**document.json** +```json +{ + "name": "John", + "age": 24, + "height": 3.21 +} +``` + +You can then run: + +```bash +$ go install github.com/evanphx/json-patch/cmd/json-patch +$ cat document.json | json-patch -p patch.1.json -p patch.2.json +{"address":"123 Main St","age":"21","name":"Jane"} +``` + +# Help It! +Contributions are welcomed! Leave [an issue](https://github.com/evanphx/json-patch/issues) +or [create a PR](https://github.com/evanphx/json-patch/compare). + + +Before creating a pull request, we'd ask that you make sure tests are passing +and that you have added new tests when applicable. + +Contributors can run tests using: + +```bash +go test -cover ./... +``` + +Builds for pull requests are tested automatically +using [TravisCI](https://travis-ci.org/evanphx/json-patch). diff --git a/vendor/github.com/evanphx/json-patch/errors.go b/vendor/github.com/evanphx/json-patch/errors.go new file mode 100644 index 00000000000..75304b4437c --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/errors.go @@ -0,0 +1,38 @@ +package jsonpatch + +import "fmt" + +// AccumulatedCopySizeError is an error type returned when the accumulated size +// increase caused by copy operations in a patch operation has exceeded the +// limit. +type AccumulatedCopySizeError struct { + limit int64 + accumulated int64 +} + +// NewAccumulatedCopySizeError returns an AccumulatedCopySizeError. +func NewAccumulatedCopySizeError(l, a int64) *AccumulatedCopySizeError { + return &AccumulatedCopySizeError{limit: l, accumulated: a} +} + +// Error implements the error interface. +func (a *AccumulatedCopySizeError) Error() string { + return fmt.Sprintf("Unable to complete the copy, the accumulated size increase of copy is %d, exceeding the limit %d", a.accumulated, a.limit) +} + +// ArraySizeError is an error type returned when the array size has exceeded +// the limit. +type ArraySizeError struct { + limit int + size int +} + +// NewArraySizeError returns an ArraySizeError. +func NewArraySizeError(l, s int) *ArraySizeError { + return &ArraySizeError{limit: l, size: s} +} + +// Error implements the error interface. +func (a *ArraySizeError) Error() string { + return fmt.Sprintf("Unable to create array of size %d, limit is %d", a.size, a.limit) +} diff --git a/vendor/github.com/evanphx/json-patch/merge.go b/vendor/github.com/evanphx/json-patch/merge.go new file mode 100644 index 00000000000..ad88d40181c --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/merge.go @@ -0,0 +1,389 @@ +package jsonpatch + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" +) + +func merge(cur, patch *lazyNode, mergeMerge bool) *lazyNode { + curDoc, err := cur.intoDoc() + + if err != nil { + pruneNulls(patch) + return patch + } + + patchDoc, err := patch.intoDoc() + + if err != nil { + return patch + } + + mergeDocs(curDoc, patchDoc, mergeMerge) + + return cur +} + +func mergeDocs(doc, patch *partialDoc, mergeMerge bool) { + for k, v := range *patch { + if v == nil { + if mergeMerge { + (*doc)[k] = nil + } else { + delete(*doc, k) + } + } else { + cur, ok := (*doc)[k] + + if !ok || cur == nil { + if !mergeMerge { + pruneNulls(v) + } + + (*doc)[k] = v + } else { + (*doc)[k] = merge(cur, v, mergeMerge) + } + } + } +} + +func pruneNulls(n *lazyNode) { + sub, err := n.intoDoc() + + if err == nil { + pruneDocNulls(sub) + } else { + ary, err := n.intoAry() + + if err == nil { + pruneAryNulls(ary) + } + } +} + +func pruneDocNulls(doc *partialDoc) *partialDoc { + for k, v := range *doc { + if v == nil { + delete(*doc, k) + } else { + pruneNulls(v) + } + } + + return doc +} + +func pruneAryNulls(ary *partialArray) *partialArray { + newAry := []*lazyNode{} + + for _, v := range *ary { + if v != nil { + pruneNulls(v) + } + newAry = append(newAry, v) + } + + *ary = newAry + + return ary +} + +var ErrBadJSONDoc = fmt.Errorf("Invalid JSON Document") +var ErrBadJSONPatch = fmt.Errorf("Invalid JSON Patch") +var errBadMergeTypes = fmt.Errorf("Mismatched JSON Documents") + +// MergeMergePatches merges two merge patches together, such that +// applying this resulting merged merge patch to a document yields the same +// as merging each merge patch to the document in succession. +func MergeMergePatches(patch1Data, patch2Data []byte) ([]byte, error) { + return doMergePatch(patch1Data, patch2Data, true) +} + +// MergePatch merges the patchData into the docData. +func MergePatch(docData, patchData []byte) ([]byte, error) { + return doMergePatch(docData, patchData, false) +} + +func doMergePatch(docData, patchData []byte, mergeMerge bool) ([]byte, error) { + doc := &partialDoc{} + + docErr := json.Unmarshal(docData, doc) + + patch := &partialDoc{} + + patchErr := json.Unmarshal(patchData, patch) + + if _, ok := docErr.(*json.SyntaxError); ok { + return nil, ErrBadJSONDoc + } + + if _, ok := patchErr.(*json.SyntaxError); ok { + return nil, ErrBadJSONPatch + } + + if docErr == nil && *doc == nil { + return nil, ErrBadJSONDoc + } + + if patchErr == nil && *patch == nil { + return nil, ErrBadJSONPatch + } + + if docErr != nil || patchErr != nil { + // Not an error, just not a doc, so we turn straight into the patch + if patchErr == nil { + if mergeMerge { + doc = patch + } else { + doc = pruneDocNulls(patch) + } + } else { + patchAry := &partialArray{} + patchErr = json.Unmarshal(patchData, patchAry) + + if patchErr != nil { + return nil, ErrBadJSONPatch + } + + pruneAryNulls(patchAry) + + out, patchErr := json.Marshal(patchAry) + + if patchErr != nil { + return nil, ErrBadJSONPatch + } + + return out, nil + } + } else { + mergeDocs(doc, patch, mergeMerge) + } + + return json.Marshal(doc) +} + +// resemblesJSONArray indicates whether the byte-slice "appears" to be +// a JSON array or not. +// False-positives are possible, as this function does not check the internal +// structure of the array. It only checks that the outer syntax is present and +// correct. +func resemblesJSONArray(input []byte) bool { + input = bytes.TrimSpace(input) + + hasPrefix := bytes.HasPrefix(input, []byte("[")) + hasSuffix := bytes.HasSuffix(input, []byte("]")) + + return hasPrefix && hasSuffix +} + +// CreateMergePatch will return a merge patch document capable of converting +// the original document(s) to the modified document(s). +// The parameters can be bytes of either two JSON Documents, or two arrays of +// JSON documents. +// The merge patch returned follows the specification defined at http://tools.ietf.org/html/draft-ietf-appsawg-json-merge-patch-07 +func CreateMergePatch(originalJSON, modifiedJSON []byte) ([]byte, error) { + originalResemblesArray := resemblesJSONArray(originalJSON) + modifiedResemblesArray := resemblesJSONArray(modifiedJSON) + + // Do both byte-slices seem like JSON arrays? + if originalResemblesArray && modifiedResemblesArray { + return createArrayMergePatch(originalJSON, modifiedJSON) + } + + // Are both byte-slices are not arrays? Then they are likely JSON objects... + if !originalResemblesArray && !modifiedResemblesArray { + return createObjectMergePatch(originalJSON, modifiedJSON) + } + + // None of the above? Then return an error because of mismatched types. + return nil, errBadMergeTypes +} + +// createObjectMergePatch will return a merge-patch document capable of +// converting the original document to the modified document. +func createObjectMergePatch(originalJSON, modifiedJSON []byte) ([]byte, error) { + originalDoc := map[string]interface{}{} + modifiedDoc := map[string]interface{}{} + + err := json.Unmarshal(originalJSON, &originalDoc) + if err != nil { + return nil, ErrBadJSONDoc + } + + err = json.Unmarshal(modifiedJSON, &modifiedDoc) + if err != nil { + return nil, ErrBadJSONDoc + } + + dest, err := getDiff(originalDoc, modifiedDoc) + if err != nil { + return nil, err + } + + return json.Marshal(dest) +} + +// createArrayMergePatch will return an array of merge-patch documents capable +// of converting the original document to the modified document for each +// pair of JSON documents provided in the arrays. +// Arrays of mismatched sizes will result in an error. +func createArrayMergePatch(originalJSON, modifiedJSON []byte) ([]byte, error) { + originalDocs := []json.RawMessage{} + modifiedDocs := []json.RawMessage{} + + err := json.Unmarshal(originalJSON, &originalDocs) + if err != nil { + return nil, ErrBadJSONDoc + } + + err = json.Unmarshal(modifiedJSON, &modifiedDocs) + if err != nil { + return nil, ErrBadJSONDoc + } + + total := len(originalDocs) + if len(modifiedDocs) != total { + return nil, ErrBadJSONDoc + } + + result := []json.RawMessage{} + for i := 0; i < len(originalDocs); i++ { + original := originalDocs[i] + modified := modifiedDocs[i] + + patch, err := createObjectMergePatch(original, modified) + if err != nil { + return nil, err + } + + result = append(result, json.RawMessage(patch)) + } + + return json.Marshal(result) +} + +// Returns true if the array matches (must be json types). +// As is idiomatic for go, an empty array is not the same as a nil array. +func matchesArray(a, b []interface{}) bool { + if len(a) != len(b) { + return false + } + if (a == nil && b != nil) || (a != nil && b == nil) { + return false + } + for i := range a { + if !matchesValue(a[i], b[i]) { + return false + } + } + return true +} + +// Returns true if the values matches (must be json types) +// The types of the values must match, otherwise it will always return false +// If two map[string]interface{} are given, all elements must match. +func matchesValue(av, bv interface{}) bool { + if reflect.TypeOf(av) != reflect.TypeOf(bv) { + return false + } + switch at := av.(type) { + case string: + bt := bv.(string) + if bt == at { + return true + } + case float64: + bt := bv.(float64) + if bt == at { + return true + } + case bool: + bt := bv.(bool) + if bt == at { + return true + } + case nil: + // Both nil, fine. + return true + case map[string]interface{}: + bt := bv.(map[string]interface{}) + if len(bt) != len(at) { + return false + } + for key := range bt { + av, aOK := at[key] + bv, bOK := bt[key] + if aOK != bOK { + return false + } + if !matchesValue(av, bv) { + return false + } + } + return true + case []interface{}: + bt := bv.([]interface{}) + return matchesArray(at, bt) + } + return false +} + +// getDiff returns the (recursive) difference between a and b as a map[string]interface{}. +func getDiff(a, b map[string]interface{}) (map[string]interface{}, error) { + into := map[string]interface{}{} + for key, bv := range b { + av, ok := a[key] + // value was added + if !ok { + into[key] = bv + continue + } + // If types have changed, replace completely + if reflect.TypeOf(av) != reflect.TypeOf(bv) { + into[key] = bv + continue + } + // Types are the same, compare values + switch at := av.(type) { + case map[string]interface{}: + bt := bv.(map[string]interface{}) + dst := make(map[string]interface{}, len(bt)) + dst, err := getDiff(at, bt) + if err != nil { + return nil, err + } + if len(dst) > 0 { + into[key] = dst + } + case string, float64, bool: + if !matchesValue(av, bv) { + into[key] = bv + } + case []interface{}: + bt := bv.([]interface{}) + if !matchesArray(at, bt) { + into[key] = bv + } + case nil: + switch bv.(type) { + case nil: + // Both nil, fine. + default: + into[key] = bv + } + default: + panic(fmt.Sprintf("Unknown type:%T in key %s", av, key)) + } + } + // Now add all deleted values as nil + for key := range a { + _, found := b[key] + if !found { + into[key] = nil + } + } + return into, nil +} diff --git a/vendor/github.com/evanphx/json-patch/patch.go b/vendor/github.com/evanphx/json-patch/patch.go new file mode 100644 index 00000000000..dc2b7e51e60 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/patch.go @@ -0,0 +1,851 @@ +package jsonpatch + +import ( + "bytes" + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +const ( + eRaw = iota + eDoc + eAry +) + +var ( + // SupportNegativeIndices decides whether to support non-standard practice of + // allowing negative indices to mean indices starting at the end of an array. + // Default to true. + SupportNegativeIndices bool = true + // AccumulatedCopySizeLimit limits the total size increase in bytes caused by + // "copy" operations in a patch. + AccumulatedCopySizeLimit int64 = 0 +) + +var ( + ErrTestFailed = errors.New("test failed") + ErrMissing = errors.New("missing value") + ErrUnknownType = errors.New("unknown object type") + ErrInvalid = errors.New("invalid state detected") + ErrInvalidIndex = errors.New("invalid index referenced") +) + +type lazyNode struct { + raw *json.RawMessage + doc partialDoc + ary partialArray + which int +} + +// Operation is a single JSON-Patch step, such as a single 'add' operation. +type Operation map[string]*json.RawMessage + +// Patch is an ordered collection of Operations. +type Patch []Operation + +type partialDoc map[string]*lazyNode +type partialArray []*lazyNode + +type container interface { + get(key string) (*lazyNode, error) + set(key string, val *lazyNode) error + add(key string, val *lazyNode) error + remove(key string) error +} + +func newLazyNode(raw *json.RawMessage) *lazyNode { + return &lazyNode{raw: raw, doc: nil, ary: nil, which: eRaw} +} + +func (n *lazyNode) MarshalJSON() ([]byte, error) { + switch n.which { + case eRaw: + return json.Marshal(n.raw) + case eDoc: + return json.Marshal(n.doc) + case eAry: + return json.Marshal(n.ary) + default: + return nil, ErrUnknownType + } +} + +func (n *lazyNode) UnmarshalJSON(data []byte) error { + dest := make(json.RawMessage, len(data)) + copy(dest, data) + n.raw = &dest + n.which = eRaw + return nil +} + +func deepCopy(src *lazyNode) (*lazyNode, int, error) { + if src == nil { + return nil, 0, nil + } + a, err := src.MarshalJSON() + if err != nil { + return nil, 0, err + } + sz := len(a) + ra := make(json.RawMessage, sz) + copy(ra, a) + return newLazyNode(&ra), sz, nil +} + +func (n *lazyNode) intoDoc() (*partialDoc, error) { + if n.which == eDoc { + return &n.doc, nil + } + + if n.raw == nil { + return nil, ErrInvalid + } + + err := json.Unmarshal(*n.raw, &n.doc) + + if err != nil { + return nil, err + } + + n.which = eDoc + return &n.doc, nil +} + +func (n *lazyNode) intoAry() (*partialArray, error) { + if n.which == eAry { + return &n.ary, nil + } + + if n.raw == nil { + return nil, ErrInvalid + } + + err := json.Unmarshal(*n.raw, &n.ary) + + if err != nil { + return nil, err + } + + n.which = eAry + return &n.ary, nil +} + +func (n *lazyNode) compact() []byte { + buf := &bytes.Buffer{} + + if n.raw == nil { + return nil + } + + err := json.Compact(buf, *n.raw) + + if err != nil { + return *n.raw + } + + return buf.Bytes() +} + +func (n *lazyNode) tryDoc() bool { + if n.raw == nil { + return false + } + + err := json.Unmarshal(*n.raw, &n.doc) + + if err != nil { + return false + } + + n.which = eDoc + return true +} + +func (n *lazyNode) tryAry() bool { + if n.raw == nil { + return false + } + + err := json.Unmarshal(*n.raw, &n.ary) + + if err != nil { + return false + } + + n.which = eAry + return true +} + +func (n *lazyNode) equal(o *lazyNode) bool { + if n.which == eRaw { + if !n.tryDoc() && !n.tryAry() { + if o.which != eRaw { + return false + } + + return bytes.Equal(n.compact(), o.compact()) + } + } + + if n.which == eDoc { + if o.which == eRaw { + if !o.tryDoc() { + return false + } + } + + if o.which != eDoc { + return false + } + + if len(n.doc) != len(o.doc) { + return false + } + + for k, v := range n.doc { + ov, ok := o.doc[k] + + if !ok { + return false + } + + if (v == nil) != (ov == nil) { + return false + } + + if v == nil && ov == nil { + continue + } + + if !v.equal(ov) { + return false + } + } + + return true + } + + if o.which != eAry && !o.tryAry() { + return false + } + + if len(n.ary) != len(o.ary) { + return false + } + + for idx, val := range n.ary { + if !val.equal(o.ary[idx]) { + return false + } + } + + return true +} + +// Kind reads the "op" field of the Operation. +func (o Operation) Kind() string { + if obj, ok := o["op"]; ok && obj != nil { + var op string + + err := json.Unmarshal(*obj, &op) + + if err != nil { + return "unknown" + } + + return op + } + + return "unknown" +} + +// Path reads the "path" field of the Operation. +func (o Operation) Path() (string, error) { + if obj, ok := o["path"]; ok && obj != nil { + var op string + + err := json.Unmarshal(*obj, &op) + + if err != nil { + return "unknown", err + } + + return op, nil + } + + return "unknown", errors.Wrapf(ErrMissing, "operation missing path field") +} + +// From reads the "from" field of the Operation. +func (o Operation) From() (string, error) { + if obj, ok := o["from"]; ok && obj != nil { + var op string + + err := json.Unmarshal(*obj, &op) + + if err != nil { + return "unknown", err + } + + return op, nil + } + + return "unknown", errors.Wrapf(ErrMissing, "operation, missing from field") +} + +func (o Operation) value() *lazyNode { + if obj, ok := o["value"]; ok { + return newLazyNode(obj) + } + + return nil +} + +// ValueInterface decodes the operation value into an interface. +func (o Operation) ValueInterface() (interface{}, error) { + if obj, ok := o["value"]; ok && obj != nil { + var v interface{} + + err := json.Unmarshal(*obj, &v) + + if err != nil { + return nil, err + } + + return v, nil + } + + return nil, errors.Wrapf(ErrMissing, "operation, missing value field") +} + +func isArray(buf []byte) bool { +Loop: + for _, c := range buf { + switch c { + case ' ': + case '\n': + case '\t': + continue + case '[': + return true + default: + break Loop + } + } + + return false +} + +func findObject(pd *container, path string) (container, string) { + doc := *pd + + split := strings.Split(path, "/") + + if len(split) < 2 { + return nil, "" + } + + parts := split[1 : len(split)-1] + + key := split[len(split)-1] + + var err error + + for _, part := range parts { + + next, ok := doc.get(decodePatchKey(part)) + + if next == nil || ok != nil { + return nil, "" + } + + if isArray(*next.raw) { + doc, err = next.intoAry() + + if err != nil { + return nil, "" + } + } else { + doc, err = next.intoDoc() + + if err != nil { + return nil, "" + } + } + } + + return doc, decodePatchKey(key) +} + +func (d *partialDoc) set(key string, val *lazyNode) error { + (*d)[key] = val + return nil +} + +func (d *partialDoc) add(key string, val *lazyNode) error { + (*d)[key] = val + return nil +} + +func (d *partialDoc) get(key string) (*lazyNode, error) { + return (*d)[key], nil +} + +func (d *partialDoc) remove(key string) error { + _, ok := (*d)[key] + if !ok { + return errors.Wrapf(ErrMissing, "Unable to remove nonexistent key: %s", key) + } + + delete(*d, key) + return nil +} + +// set should only be used to implement the "replace" operation, so "key" must +// be an already existing index in "d". +func (d *partialArray) set(key string, val *lazyNode) error { + idx, err := strconv.Atoi(key) + if err != nil { + return err + } + + if idx < 0 { + if !SupportNegativeIndices { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + if idx < -len(*d) { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + idx += len(*d) + } + + (*d)[idx] = val + return nil +} + +func (d *partialArray) add(key string, val *lazyNode) error { + if key == "-" { + *d = append(*d, val) + return nil + } + + idx, err := strconv.Atoi(key) + if err != nil { + return errors.Wrapf(err, "value was not a proper array index: '%s'", key) + } + + sz := len(*d) + 1 + + ary := make([]*lazyNode, sz) + + cur := *d + + if idx >= len(ary) { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + + if idx < 0 { + if !SupportNegativeIndices { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + if idx < -len(ary) { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + idx += len(ary) + } + + copy(ary[0:idx], cur[0:idx]) + ary[idx] = val + copy(ary[idx+1:], cur[idx:]) + + *d = ary + return nil +} + +func (d *partialArray) get(key string) (*lazyNode, error) { + idx, err := strconv.Atoi(key) + + if err != nil { + return nil, err + } + + if idx < 0 { + if !SupportNegativeIndices { + return nil, errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + if idx < -len(*d) { + return nil, errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + idx += len(*d) + } + + if idx >= len(*d) { + return nil, errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + + return (*d)[idx], nil +} + +func (d *partialArray) remove(key string) error { + idx, err := strconv.Atoi(key) + if err != nil { + return err + } + + cur := *d + + if idx >= len(cur) { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + + if idx < 0 { + if !SupportNegativeIndices { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + if idx < -len(cur) { + return errors.Wrapf(ErrInvalidIndex, "Unable to access invalid index: %d", idx) + } + idx += len(cur) + } + + ary := make([]*lazyNode, len(cur)-1) + + copy(ary[0:idx], cur[0:idx]) + copy(ary[idx:], cur[idx+1:]) + + *d = ary + return nil + +} + +func (p Patch) add(doc *container, op Operation) error { + path, err := op.Path() + if err != nil { + return errors.Wrapf(ErrMissing, "add operation failed to decode path") + } + + con, key := findObject(doc, path) + + if con == nil { + return errors.Wrapf(ErrMissing, "add operation does not apply: doc is missing path: \"%s\"", path) + } + + err = con.add(key, op.value()) + if err != nil { + return errors.Wrapf(err, "error in add for path: '%s'", path) + } + + return nil +} + +func (p Patch) remove(doc *container, op Operation) error { + path, err := op.Path() + if err != nil { + return errors.Wrapf(ErrMissing, "remove operation failed to decode path") + } + + con, key := findObject(doc, path) + + if con == nil { + return errors.Wrapf(ErrMissing, "remove operation does not apply: doc is missing path: \"%s\"", path) + } + + err = con.remove(key) + if err != nil { + return errors.Wrapf(err, "error in remove for path: '%s'", path) + } + + return nil +} + +func (p Patch) replace(doc *container, op Operation) error { + path, err := op.Path() + if err != nil { + return errors.Wrapf(err, "replace operation failed to decode path") + } + + if path == "" { + val := op.value() + + if val.which == eRaw { + if !val.tryDoc() { + if !val.tryAry() { + return errors.Wrapf(err, "replace operation value must be object or array") + } + } + } + + switch val.which { + case eAry: + *doc = &val.ary + case eDoc: + *doc = &val.doc + case eRaw: + return errors.Wrapf(err, "replace operation hit impossible case") + } + + return nil + } + + con, key := findObject(doc, path) + + if con == nil { + return errors.Wrapf(ErrMissing, "replace operation does not apply: doc is missing path: %s", path) + } + + _, ok := con.get(key) + if ok != nil { + return errors.Wrapf(ErrMissing, "replace operation does not apply: doc is missing key: %s", path) + } + + err = con.set(key, op.value()) + if err != nil { + return errors.Wrapf(err, "error in remove for path: '%s'", path) + } + + return nil +} + +func (p Patch) move(doc *container, op Operation) error { + from, err := op.From() + if err != nil { + return errors.Wrapf(err, "move operation failed to decode from") + } + + con, key := findObject(doc, from) + + if con == nil { + return errors.Wrapf(ErrMissing, "move operation does not apply: doc is missing from path: %s", from) + } + + val, err := con.get(key) + if err != nil { + return errors.Wrapf(err, "error in move for path: '%s'", key) + } + + err = con.remove(key) + if err != nil { + return errors.Wrapf(err, "error in move for path: '%s'", key) + } + + path, err := op.Path() + if err != nil { + return errors.Wrapf(err, "move operation failed to decode path") + } + + con, key = findObject(doc, path) + + if con == nil { + return errors.Wrapf(ErrMissing, "move operation does not apply: doc is missing destination path: %s", path) + } + + err = con.add(key, val) + if err != nil { + return errors.Wrapf(err, "error in move for path: '%s'", path) + } + + return nil +} + +func (p Patch) test(doc *container, op Operation) error { + path, err := op.Path() + if err != nil { + return errors.Wrapf(err, "test operation failed to decode path") + } + + if path == "" { + var self lazyNode + + switch sv := (*doc).(type) { + case *partialDoc: + self.doc = *sv + self.which = eDoc + case *partialArray: + self.ary = *sv + self.which = eAry + } + + if self.equal(op.value()) { + return nil + } + + return errors.Wrapf(ErrTestFailed, "testing value %s failed", path) + } + + con, key := findObject(doc, path) + + if con == nil { + return errors.Wrapf(ErrMissing, "test operation does not apply: is missing path: %s", path) + } + + val, err := con.get(key) + if err != nil { + return errors.Wrapf(err, "error in test for path: '%s'", path) + } + + if val == nil { + if op.value().raw == nil { + return nil + } + return errors.Wrapf(ErrTestFailed, "testing value %s failed", path) + } else if op.value() == nil { + return errors.Wrapf(ErrTestFailed, "testing value %s failed", path) + } + + if val.equal(op.value()) { + return nil + } + + return errors.Wrapf(ErrTestFailed, "testing value %s failed", path) +} + +func (p Patch) copy(doc *container, op Operation, accumulatedCopySize *int64) error { + from, err := op.From() + if err != nil { + return errors.Wrapf(err, "copy operation failed to decode from") + } + + con, key := findObject(doc, from) + + if con == nil { + return errors.Wrapf(ErrMissing, "copy operation does not apply: doc is missing from path: %s", from) + } + + val, err := con.get(key) + if err != nil { + return errors.Wrapf(err, "error in copy for from: '%s'", from) + } + + path, err := op.Path() + if err != nil { + return errors.Wrapf(ErrMissing, "copy operation failed to decode path") + } + + con, key = findObject(doc, path) + + if con == nil { + return errors.Wrapf(ErrMissing, "copy operation does not apply: doc is missing destination path: %s", path) + } + + valCopy, sz, err := deepCopy(val) + if err != nil { + return errors.Wrapf(err, "error while performing deep copy") + } + + (*accumulatedCopySize) += int64(sz) + if AccumulatedCopySizeLimit > 0 && *accumulatedCopySize > AccumulatedCopySizeLimit { + return NewAccumulatedCopySizeError(AccumulatedCopySizeLimit, *accumulatedCopySize) + } + + err = con.add(key, valCopy) + if err != nil { + return errors.Wrapf(err, "error while adding value during copy") + } + + return nil +} + +// Equal indicates if 2 JSON documents have the same structural equality. +func Equal(a, b []byte) bool { + ra := make(json.RawMessage, len(a)) + copy(ra, a) + la := newLazyNode(&ra) + + rb := make(json.RawMessage, len(b)) + copy(rb, b) + lb := newLazyNode(&rb) + + return la.equal(lb) +} + +// DecodePatch decodes the passed JSON document as an RFC 6902 patch. +func DecodePatch(buf []byte) (Patch, error) { + var p Patch + + err := json.Unmarshal(buf, &p) + + if err != nil { + return nil, err + } + + return p, nil +} + +// Apply mutates a JSON document according to the patch, and returns the new +// document. +func (p Patch) Apply(doc []byte) ([]byte, error) { + return p.ApplyIndent(doc, "") +} + +// ApplyIndent mutates a JSON document according to the patch, and returns the new +// document indented. +func (p Patch) ApplyIndent(doc []byte, indent string) ([]byte, error) { + if len(doc) == 0 { + return doc, nil + } + + var pd container + if doc[0] == '[' { + pd = &partialArray{} + } else { + pd = &partialDoc{} + } + + err := json.Unmarshal(doc, pd) + + if err != nil { + return nil, err + } + + err = nil + + var accumulatedCopySize int64 + + for _, op := range p { + switch op.Kind() { + case "add": + err = p.add(&pd, op) + case "remove": + err = p.remove(&pd, op) + case "replace": + err = p.replace(&pd, op) + case "move": + err = p.move(&pd, op) + case "test": + err = p.test(&pd, op) + case "copy": + err = p.copy(&pd, op, &accumulatedCopySize) + default: + err = fmt.Errorf("Unexpected kind: %s", op.Kind()) + } + + if err != nil { + return nil, err + } + } + + if indent != "" { + return json.MarshalIndent(pd, "", indent) + } + + return json.Marshal(pd) +} + +// From http://tools.ietf.org/html/rfc6901#section-4 : +// +// Evaluation of each reference token begins by decoding any escaped +// character sequence. This is performed by first transforming any +// occurrence of the sequence '~1' to '/', and then transforming any +// occurrence of the sequence '~0' to '~'. + +var ( + rfc6901Decoder = strings.NewReplacer("~1", "/", "~0", "~") +) + +func decodePatchKey(k string) string { + return rfc6901Decoder.Replace(k) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d69f0c4c10f..3019c7069e0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -204,6 +204,9 @@ github.com/eapache/queue # github.com/edsrzf/mmap-go v1.1.0 ## explicit; go 1.17 github.com/edsrzf/mmap-go +# github.com/evanphx/json-patch v4.12.0+incompatible +## explicit +github.com/evanphx/json-patch # github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb ## explicit github.com/facette/natsort