Skip to content

Commit 8e6ade9

Browse files
committed
Change CRDB driver to use new method for getting transaction timestamp
The existing call disables most optimizations on write transactions, but was necessary for legacy reasons. Following this change, any CRDB version 23 (or later) will use the newer, better call Also changes the stats system to use the CRDB stats instead of a custom table
1 parent 66a871c commit 8e6ade9

File tree

10 files changed

+212
-82
lines changed

10 files changed

+212
-82
lines changed

internal/datastore/crdb/crdb.go

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ const (
6868
errUnableToInstantiate = "unable to instantiate datastore"
6969
errRevision = "unable to find revision: %w"
7070

71-
querySelectNow = "SELECT cluster_logical_timestamp()"
72-
queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;"
71+
querySelectNow = "SELECT cluster_logical_timestamp()"
72+
queryTransactionNowPreV23 = querySelectNow
73+
queryTransactionNow = "SHOW COMMIT TIMESTAMP"
74+
queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;"
7375

7476
livingTupleConstraint = "pk_relation_tuple"
7577
)
@@ -122,6 +124,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
122124
changefeedQuery = queryChangefeedPreV22
123125
}
124126

127+
transactionNowQuery := queryTransactionNow
128+
if version.Major < 23 {
129+
log.Info().Object("version", version).Msg("using transaction now query for CRDB version < 23")
130+
transactionNowQuery = queryTransactionNowPreV23
131+
}
132+
125133
clusterTTLNanos, err := readClusterTTLNanos(initCtx, initPool)
126134
if err != nil {
127135
return nil, fmt.Errorf("unable to read cluster gc window: %w", err)
@@ -172,8 +180,9 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
172180
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
173181
writeOverlapKeyer: keyer,
174182
overlapKeyInit: keySetInit,
175-
disableStats: config.disableStats,
176183
beginChangefeedQuery: changefeedQuery,
184+
transactionNowQuery: transactionNowQuery,
185+
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
177186
}
178187
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)
179188

@@ -254,9 +263,10 @@ type crdbDatastore struct {
254263
watchBufferWriteTimeout time.Duration
255264
writeOverlapKeyer overlapKeyer
256265
overlapKeyInit func(ctx context.Context) keySet
257-
disableStats bool
266+
analyzeBeforeStatistics bool
258267

259268
beginChangefeedQuery string
269+
transactionNowQuery string
260270

261271
featureGroup singleflight.Group[string, *datastore.Features]
262272

@@ -321,21 +331,11 @@ func (cds *crdbDatastore) ReadWriteTx(
321331
}
322332
}
323333

324-
if cds.disableStats {
325-
var err error
326-
commitTimestamp, err = readCRDBNow(ctx, querier)
327-
if err != nil {
328-
return fmt.Errorf("error getting commit timestamp: %w", err)
329-
}
330-
return nil
331-
}
332-
333334
var err error
334-
commitTimestamp, err = updateCounter(ctx, tx, rwt.relCountChange)
335+
commitTimestamp, err = cds.readTransactionCommitRev(ctx, querier)
335336
if err != nil {
336-
return fmt.Errorf("error updating relationship counter: %w", err)
337+
return fmt.Errorf("error getting commit timestamp: %w", err)
337338
}
338-
339339
return nil
340340
})
341341
if err != nil {
@@ -371,7 +371,9 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
371371
return datastore.ReadyState{}, err
372372
}
373373

374-
if version != headMigration {
374+
// TODO(jschorr): Remove the check for the older migration once we are confident
375+
// that all users have migrated past it.
376+
if version != headMigration && version != "add-caveats" {
375377
return datastore.ReadyState{
376378
Message: fmt.Sprintf(
377379
"datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.",
@@ -467,6 +469,20 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
467469
return &features, nil
468470
}
469471

472+
func (cds *crdbDatastore) readTransactionCommitRev(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) {
473+
ctx, span := tracer.Start(ctx, "readTransactionCommitRev")
474+
defer span.End()
475+
476+
var hlcNow decimal.Decimal
477+
if err := reader.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
478+
return row.Scan(&hlcNow)
479+
}, cds.transactionNowQuery); err != nil {
480+
return datastore.NoRevision, fmt.Errorf("unable to read timestamp: %w", err)
481+
}
482+
483+
return revisions.NewForHLC(hlcNow)
484+
}
485+
470486
func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) {
471487
ctx, span := tracer.Start(ctx, "readCRDBNow")
472488
defer span.End()

internal/datastore/crdb/crdb_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func TestCRDBDatastore(t *testing.T) {
5353
RevisionQuantization(revisionQuantization),
5454
WatchBufferLength(watchBufferLength),
5555
OverlapStrategy(overlapStrategyPrefix),
56+
DebugAnalyzeBeforeStatistics(),
5657
)
5758
require.NoError(t, err)
5859
return ds
@@ -84,6 +85,7 @@ func TestCRDBDatastoreWithFollowerReads(t *testing.T) {
8485
GCWindow(gcWindow),
8586
RevisionQuantization(quantization),
8687
FollowerReadDelay(followerReadDelay),
88+
DebugAnalyzeBeforeStatistics(),
8789
)
8890
require.NoError(err)
8991
return ds
@@ -134,15 +136,19 @@ func TestWatchFeatureDetection(t *testing.T) {
134136
require.NoError(t, err)
135137
},
136138
expectEnabled: false,
137-
expectMessage: "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: ERROR: user unprivileged does not have CHANGEFEED privilege on relation relation_tuple (SQLSTATE 42501)",
139+
expectMessage: "(SQLSTATE 42501)",
138140
},
139141
{
140142
name: "rangefeeds enabled, user has permission",
141143
postInit: func(ctx context.Context, adminConn *pgx.Conn) {
142144
_, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
143145
require.NoError(t, err)
146+
144147
_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, tableTuple))
145148
require.NoError(t, err)
149+
150+
_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, tableTuple))
151+
require.NoError(t, err)
146152
},
147153
expectEnabled: true,
148154
},
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package migrations
2+
3+
import (
4+
"context"
5+
6+
"github.com/jackc/pgx/v5"
7+
)
8+
9+
const (
10+
dropStatsTable = `DROP TABLE relationship_estimate_counters;`
11+
)
12+
13+
func init() {
14+
err := CRDBMigrations.Register("remove-stats-table", "add-caveats", removeStatsTable, noAtomicMigration)
15+
if err != nil {
16+
panic("failed to register migration: " + err.Error())
17+
}
18+
}
19+
20+
func removeStatsTable(ctx context.Context, conn *pgx.Conn) error {
21+
if _, err := conn.Exec(ctx, dropStatsTable); err != nil {
22+
return err
23+
}
24+
return nil
25+
}

internal/datastore/crdb/options.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type crdbOptions struct {
2020
maxRetries uint8
2121
overlapStrategy string
2222
overlapKey string
23-
disableStats bool
2423
enableConnectionBalancing bool
24+
analyzeBeforeStatistics bool
2525

2626
enablePrometheusStats bool
2727
}
@@ -65,7 +65,6 @@ func generateConfig(options []Option) (crdbOptions, error) {
6565
maxRetries: defaultMaxRetries,
6666
overlapKey: defaultOverlapKey,
6767
overlapStrategy: defaultOverlapStrategy,
68-
disableStats: false,
6968
enablePrometheusStats: defaultEnablePrometheusStats,
7069
enableConnectionBalancing: defaultEnableConnectionBalancing,
7170
connectRate: defaultConnectRate,
@@ -283,11 +282,6 @@ func OverlapKey(key string) Option {
283282
return func(po *crdbOptions) { po.overlapKey = key }
284283
}
285284

286-
// DisableStats disables recording counts to the stats table
287-
func DisableStats(disable bool) Option {
288-
return func(po *crdbOptions) { po.disableStats = disable }
289-
}
290-
291285
// WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres
292286
// clients being used by the datastore are enabled.
293287
//
@@ -303,3 +297,12 @@ func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
303297
func WithEnableConnectionBalancing(connectionBalancing bool) Option {
304298
return func(po *crdbOptions) { po.enableConnectionBalancing = connectionBalancing }
305299
}
300+
301+
// DebugAnalyzeBeforeStatistics signals to the Statistics method that it should
302+
// run Analyze on the database before returning statistics. This should only be
303+
// used for debug and testing.
304+
//
305+
// Disabled by default.
306+
func DebugAnalyzeBeforeStatistics() Option {
307+
return func(po *crdbOptions) { po.analyzeBeforeStatistics = true }
308+
}

internal/datastore/crdb/stats.go

Lines changed: 79 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,24 @@ package crdb
33
import (
44
"context"
55
"fmt"
6-
"math/rand"
7-
"time"
6+
"slices"
87

98
"github.com/Masterminds/squirrel"
109
"github.com/jackc/pgx/v5"
11-
"github.com/shopspring/decimal"
10+
"github.com/rs/zerolog/log"
1211

1312
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
14-
"github.com/authzed/spicedb/internal/datastore/revisions"
1513
"github.com/authzed/spicedb/pkg/datastore"
1614
)
1715

1816
const (
1917
tableMetadata = "metadata"
2018
colUniqueID = "unique_id"
21-
22-
tableCounters = "relationship_estimate_counters"
23-
colID = "id"
24-
colCount = "count"
2519
)
2620

2721
var (
28-
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
29-
queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s AS OF SYSTEM TIME follower_read_timestamp()", colCount, tableCounters)
30-
31-
upsertCounterQuery = psql.Insert(tableCounters).Columns(
32-
colID,
33-
colCount,
34-
).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters))
35-
36-
rng = rand.NewSource(time.Now().UnixNano())
37-
38-
uniqueID string
22+
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
23+
uniqueID string
3924
)
4025

4126
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
@@ -52,14 +37,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
5237
}
5338

5439
var nsDefs []datastore.RevisionedNamespace
55-
var relCount int64
56-
57-
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
58-
return row.Scan(&relCount)
59-
}, queryRelationshipEstimate); err != nil {
60-
return datastore.Stats{}, fmt.Errorf("unable to read relationship count: %w", err)
61-
}
62-
6340
if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
6441
_, err := tx.Exec(ctx, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
6542
if err != nil {
@@ -76,37 +53,85 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
7653
return datastore.Stats{}, err
7754
}
7855

79-
// NOTE: this is a stop-gap solution to prevent panics in telemetry collection
80-
if relCount < 0 {
81-
relCount = 0
82-
}
83-
84-
return datastore.Stats{
85-
UniqueID: uniqueID,
86-
EstimatedRelationshipCount: uint64(relCount),
87-
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs),
88-
}, nil
89-
}
56+
if cds.analyzeBeforeStatistics {
57+
if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
58+
if _, err := tx.Exec(ctx, "ANALYZE "+tableTuple); err != nil {
59+
return fmt.Errorf("unable to analyze tuple table: %w", err)
60+
}
9061

91-
func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (datastore.Revision, error) {
92-
counterID := make([]byte, 2)
93-
// nolint:gosec
94-
// G404 use of non cryptographically secure random number generator is not concern here,
95-
// as this is only used to randomly distributed the counters across multiple rows and reduce write row contention
96-
_, err := rand.New(rng).Read(counterID)
97-
if err != nil {
98-
return datastore.NoRevision, fmt.Errorf("unable to select random counter: %w", err)
62+
return nil
63+
}); err != nil {
64+
return datastore.Stats{}, err
65+
}
9966
}
10067

101-
sql, args, err := upsertCounterQuery.Values(counterID, change).ToSql()
102-
if err != nil {
103-
return datastore.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err)
104-
}
68+
var estimatedRelCount uint64
69+
if err := cds.readPool.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error {
70+
hasRows := false
71+
72+
for rows.Next() {
73+
hasRows = true
74+
values, err := rows.Values()
75+
if err != nil {
76+
log.Warn().Err(err).Msg("unable to read statistics")
77+
return nil
78+
}
79+
80+
// Find the row whose column_names contains the expected columns for the
81+
// full relationship.
82+
isFullRelationshipRow := false
83+
for index, fd := range rows.FieldDescriptions() {
84+
if fd.Name != "column_names" {
85+
continue
86+
}
87+
88+
columnNames, ok := values[index].([]any)
89+
if !ok {
90+
log.Warn().Msg("unable to read column names")
91+
return nil
92+
}
93+
94+
if slices.Contains(columnNames, "namespace") &&
95+
slices.Contains(columnNames, "object_id") &&
96+
slices.Contains(columnNames, "relation") &&
97+
slices.Contains(columnNames, "userset_namespace") &&
98+
slices.Contains(columnNames, "userset_object_id") &&
99+
slices.Contains(columnNames, "userset_relation") {
100+
isFullRelationshipRow = true
101+
break
102+
}
103+
}
104+
105+
if !isFullRelationshipRow {
106+
continue
107+
}
108+
109+
// Read the estimated relationship count.
110+
for index, fd := range rows.FieldDescriptions() {
111+
if fd.Name != "row_count" {
112+
continue
113+
}
114+
115+
rowCount, ok := values[index].(int64)
116+
if !ok {
117+
log.Warn().Msg("unable to read row count")
118+
return nil
119+
}
120+
121+
estimatedRelCount = uint64(rowCount)
122+
return nil
123+
}
124+
}
105125

106-
var timestamp decimal.Decimal
107-
if err := tx.QueryRow(ctx, sql, args...).Scan(&timestamp); err != nil {
108-
return datastore.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err)
126+
log.Warn().Bool("has-rows", hasRows).Msg("unable to find row count in statistics query result")
127+
return nil
128+
}, "SHOW STATISTICS FOR TABLE relation_tuple;"); err != nil {
129+
return datastore.Stats{}, fmt.Errorf("unable to query unique estimated row count: %w", err)
109130
}
110131

111-
return revisions.NewForHLC(timestamp)
132+
return datastore.Stats{
133+
UniqueID: uniqueID,
134+
EstimatedRelationshipCount: estimatedRelCount,
135+
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs),
136+
}, nil
112137
}

internal/testserver/datastore/crdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
)
2121

2222
const (
23-
CRDBTestVersionTag = "v22.2.0"
23+
CRDBTestVersionTag = "v23.1.16"
2424

2525
enableRangefeeds = `SET CLUSTER SETTING kv.rangefeed.enabled = true;`
2626
)

pkg/cmd/datastore/datastore.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er
382382
crdb.OverlapStrategy(opts.OverlapStrategy),
383383
crdb.WatchBufferLength(opts.WatchBufferLength),
384384
crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout),
385-
crdb.DisableStats(opts.DisableStats),
386385
crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics),
387386
crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing),
388387
crdb.ConnectRate(opts.ConnectRate),

0 commit comments

Comments
 (0)