Skip to content

Commit 3fb0272

Browse files
committed
chore: add comments
1 parent 60b5c3b commit 3fb0272

File tree

6 files changed

+78
-10
lines changed

6 files changed

+78
-10
lines changed

v2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2437,7 +2437,7 @@ func TestWebSocketUpgradeFailures(t *testing.T) {
24372437
w.Header().Set(key, value)
24382438
}
24392439
w.WriteHeader(tc.statusCode)
2440-
fmt.Fprintf(w, `{"error": "WebSocket upgrade failed", "status": %d}`, tc.statusCode)
2440+
_, _ = fmt.Fprintf(w, `{"error": "WebSocket upgrade failed", "status": %d}`, tc.statusCode)
24412441
}))
24422442
defer server.Close()
24432443

v2/pkg/engine/datasource/httpclient/nethttpclient.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ const (
136136
sizeHintKey httpClientContext = "size-hint"
137137
)
138138

139+
// WithHTTPClientSizeHint allows the engine to keep track of response sizes per subgraph fetch
140+
// If a hint is supplied, we can create a buffer of size close to the required size
141+
// This reduces allocations by reducing the buffer grow calls, which always copies the buffer
139142
func WithHTTPClientSizeHint(ctx context.Context, size int) context.Context {
140143
return context.WithValue(ctx, sizeHintKey, size)
141144
}
@@ -144,6 +147,9 @@ func buffer(ctx context.Context) *bytes.Buffer {
144147
if sizeHint, ok := ctx.Value(sizeHintKey).(int); ok && sizeHint > 0 {
145148
return bytes.NewBuffer(make([]byte, 0, sizeHint))
146149
}
150+
// if we start with zero, doubling will take a while until we reach the required size
151+
// if we start with a high number, e.g. 1024, we just increase the memory usage of the engine
152+
// 64 seems to be a healthy middle ground
147153
return bytes.NewBuffer(make([]byte, 0, 64))
148154
}
149155

@@ -211,6 +217,8 @@ func makeHTTPRequest(client *http.Client, ctx context.Context, baseHeaders http.
211217
request.Header.Set(AcceptEncodingHeader, EncodingGzip)
212218
request.Header.Add(AcceptEncodingHeader, EncodingDeflate)
213219
if contentLength > 0 {
220+
// always set the Content-Length Header so that chunking can be avoided
221+
// and other parties can more efficiently parse
214222
request.Header.Set(ContentLengthHeader, fmt.Sprintf("%d", contentLength))
215223
}
216224

@@ -229,6 +237,12 @@ func makeHTTPRequest(client *http.Client, ctx context.Context, baseHeaders http.
229237
return nil, err
230238
}
231239

240+
// we intentionally don't use a pool of sorts here
241+
// we're buffering the response and then later, in the engine,
242+
// parse it into an JSON AST with the use of an arena, which is quite efficient
243+
// Through trial and error it turned out that it's best to leave this buffer to the GC
244+
// It'll know best the lifecycle of the buffer
245+
// Using an arena here just increased overall memory usage
232246
out := buffer(ctx)
233247
_, err = out.ReadFrom(respReader)
234248
if err != nil {

v2/pkg/engine/resolve/context.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,17 @@ type Context struct {
3636
SubgraphHeadersBuilder SubgraphHeadersBuilder
3737
}
3838

39+
// SubgraphHeadersBuilder allows the user of the engine to "define" the headers for a subgraph request
40+
// Instead of going back and forth between engine & transport,
41+
// you can simply define a function that returns headers for a Subgraph request
42+
// In addition to just the header, the implementer can return a hash for the header which will be used by request deduplication
3943
type SubgraphHeadersBuilder interface {
44+
// HeadersForSubgraph must return the headers and a hash for a Subgraph Request
45+
// The hash will be used for request deduplication
4046
HeadersForSubgraph(subgraphName string) (http.Header, uint64)
4147
}
4248

49+
// HeadersForSubgraphRequest returns headers and a hash for a request that the engine will make to a subgraph
4350
func (c *Context) HeadersForSubgraphRequest(subgraphName string) (http.Header, uint64) {
4451
if c.SubgraphHeadersBuilder == nil {
4552
return nil, 0

v2/pkg/engine/resolve/loader.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ type result struct {
137137
loaderHookContext context.Context
138138

139139
httpResponseContext *httpclient.ResponseContext
140-
out []byte
141-
singleFlightStats *singleFlightStats
140+
// out is the subgraph response body
141+
out []byte
142+
singleFlightStats *singleFlightStats
142143
}
143144

144145
func (r *result) init(postProcessing PostProcessingConfiguration, info *FetchInfo) {
@@ -182,6 +183,14 @@ type Loader struct {
182183

183184
taintedObjs taintedObjects
184185

186+
// jsonArena is the arena to allocation json, supplied by the Resolver
187+
// Disclaimer: this arena is NOT thread safe!
188+
// Only use from main goroutine
189+
// Don't Reset or Release, the Resolver handles this
190+
// Disclaimer: When parsing json into the arena, the underlying bytes must also be allocated on the arena!
191+
// This is very important to "tie" their lifecycles together
192+
// If you're not doing this, you will see segfaults
193+
// Example of correct usage in func "mergeResult"
185194
jsonArena arena.Arena
186195
sf *SingleFlight
187196
}
@@ -773,9 +782,11 @@ func (l *Loader) mergeErrors(res *result, fetchItem *FetchItem, value *astjson.V
773782
return err
774783
}
775784
}
776-
777-
// If the error propagation mode is pass-through, we append the errors to the root array
785+
// for efficiency purposes, resolvable.errors is not initialized
786+
// don't change this, it's measurable
787+
// downside: we have to verify it's initialized before appending to it
778788
l.resolvable.ensureErrorsInitialized()
789+
// If the error propagation mode is pass-through, we append the errors to the root array
779790
l.resolvable.errors.AppendArrayItems(value)
780791
return nil
781792
}
@@ -811,7 +822,9 @@ func (l *Loader) mergeErrors(res *result, fetchItem *FetchItem, value *astjson.V
811822
if err := l.addApolloRouterCompatibilityError(res); err != nil {
812823
return err
813824
}
814-
825+
// for efficiency purposes, resolvable.errors is not initialized
826+
// don't change this, it's measurable
827+
// downside: we have to verify it's initialized before appending to it
815828
l.resolvable.ensureErrorsInitialized()
816829
astjson.AppendToArray(l.resolvable.errors, errorObject)
817830

@@ -1066,7 +1079,9 @@ func (l *Loader) addApolloRouterCompatibilityError(res *result) error {
10661079
if err != nil {
10671080
return err
10681081
}
1069-
1082+
// for efficiency purposes, resolvable.errors is not initialized
1083+
// don't change this, it's measurable
1084+
// downside: we have to verify it's initialized before appending to it
10701085
l.resolvable.ensureErrorsInitialized()
10711086
astjson.AppendToArray(l.resolvable.errors, apolloRouterStatusError)
10721087

@@ -1081,6 +1096,9 @@ func (l *Loader) renderErrorsFailedDeps(fetchItem *FetchItem, res *result) error
10811096
return err
10821097
}
10831098
l.setSubgraphStatusCode([]*astjson.Value{errorObject}, res.statusCode)
1099+
// for efficiency purposes, resolvable.errors is not initialized
1100+
// don't change this, it's measurable
1101+
// downside: we have to verify it's initialized before appending to it
10841102
l.resolvable.ensureErrorsInitialized()
10851103
astjson.AppendToArray(l.resolvable.errors, errorObject)
10861104
return nil
@@ -1093,6 +1111,9 @@ func (l *Loader) renderErrorsFailedToFetch(fetchItem *FetchItem, res *result, re
10931111
return err
10941112
}
10951113
l.setSubgraphStatusCode([]*astjson.Value{errorObject}, res.statusCode)
1114+
// for efficiency purposes, resolvable.errors is not initialized
1115+
// don't change this, it's measurable
1116+
// downside: we have to verify it's initialized before appending to it
10961117
l.resolvable.ensureErrorsInitialized()
10971118
astjson.AppendToArray(l.resolvable.errors, errorObject)
10981119
return nil
@@ -1112,6 +1133,9 @@ func (l *Loader) renderErrorsStatusFallback(fetchItem *FetchItem, res *result, s
11121133
}
11131134

11141135
l.setSubgraphStatusCode([]*astjson.Value{errorObject}, res.statusCode)
1136+
// for efficiency purposes, resolvable.errors is not initialized
1137+
// don't change this, it's measurable
1138+
// downside: we have to verify it's initialized before appending to it
11151139
l.resolvable.ensureErrorsInitialized()
11161140
astjson.AppendToArray(l.resolvable.errors, errorObject)
11171141
return nil
@@ -1137,6 +1161,9 @@ func (l *Loader) renderAuthorizationRejectedErrors(fetchItem *FetchItem, res *re
11371161
}
11381162
pathPart := l.renderAtPathErrorPart(fetchItem.ResponsePath)
11391163
extensionErrorCode := fmt.Sprintf(`"extensions":{"code":"%s"}`, errorcodes.UnauthorizedFieldOrType)
1164+
// for efficiency purposes, resolvable.errors is not initialized
1165+
// don't change this, it's measurable
1166+
// downside: we have to verify it's initialized before appending to it
11401167
l.resolvable.ensureErrorsInitialized()
11411168
if res.ds.Name == "" {
11421169
for _, reason := range res.authorizationRejectedReasons {
@@ -1216,6 +1243,9 @@ func (l *Loader) renderRateLimitRejectedErrors(fetchItem *FetchItem, res *result
12161243
return err
12171244
}
12181245
}
1246+
// for efficiency purposes, resolvable.errors is not initialized
1247+
// don't change this, it's measurable
1248+
// downside: we have to verify it's initialized before appending to it
12191249
l.resolvable.ensureErrorsInitialized()
12201250
astjson.AppendToArray(l.resolvable.errors, errorObject)
12211251
return nil
@@ -1417,7 +1447,7 @@ func (l *Loader) loadBatchEntityFetch(ctx context.Context, fetchItem *FetchItem,
14171447
}
14181448
}
14191449
}
1420-
1450+
// I tried using arena here but it only worsened the situation
14211451
preparedInput := bytes.NewBuffer(make([]byte, 0, 64))
14221452
itemInput := bytes.NewBuffer(make([]byte, 0, 32))
14231453
keyGen := pool.Hash64.Get()
@@ -1579,6 +1609,7 @@ const (
15791609
operationTypeContextKey loaderContextKey = "operationType"
15801610
)
15811611

1612+
// GetOperationTypeFromContext can be used, e.g. by the transport, to check if the operation is a Mutation
15821613
func GetOperationTypeFromContext(ctx context.Context) ast.OperationType {
15831614
if ctx == nil {
15841615
return ast.OperationTypeQuery
@@ -1638,6 +1669,7 @@ func (l *Loader) loadByContext(ctx context.Context, source DataSource, fetchItem
16381669
return nil
16391670
}
16401671

1672+
// helps the http client to create buffers at the right size
16411673
ctx = httpclient.WithHTTPClientSizeHint(ctx, item.sizeHint)
16421674

16431675
defer l.sf.Finish(sfKey, fetchKey, item)
@@ -1851,6 +1883,9 @@ func (l *Loader) compactJSON(data []byte) ([]byte, error) {
18511883
return nil, err
18521884
}
18531885
out := dst.Bytes()
1886+
// don't use arena here or segfault
1887+
// it's also not a hot path and not important to optimize
1888+
// arena requires the parsed content to be on the arena as well
18541889
v, err := astjson.ParseBytes(out)
18551890
if err != nil {
18561891
return nil, err

v2/pkg/engine/resolve/resolvable.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ type Resolvable struct {
3131
errors *astjson.Value
3232
valueCompletion *astjson.Value
3333
skipAddingNullErrors bool
34-
34+
// astjsonArena is the arena to handle json, supplied by Resolver
35+
// not thread safe, but Resolvable is single threaded anyways
3536
astjsonArena arena.Arena
3637
parsers []*astjson.Parser
3738

@@ -111,6 +112,7 @@ func (r *Resolvable) Init(ctx *Context, initialData []byte, operationType ast.Op
111112
r.operationType = operationType
112113
r.renameTypeNames = ctx.RenameTypeNames
113114
r.data = astjson.ObjectValue(r.astjsonArena)
115+
// don't init errors! It will heavily increase memory usage
114116
r.errors = nil
115117
if initialData != nil {
116118
initialValue, err := astjson.ParseBytesWithArena(r.astjsonArena, initialData)
@@ -129,6 +131,7 @@ func (r *Resolvable) InitSubscription(ctx *Context, initialData []byte, postProc
129131
r.ctx = ctx
130132
r.operationType = ast.OperationTypeSubscription
131133
r.renameTypeNames = ctx.RenameTypeNames
134+
// don't init errors! It will heavily increase memory usage
132135
r.errors = nil
133136
if initialData != nil {
134137
initialValue, err := astjson.ParseBytesWithArena(r.astjsonArena, initialData)
@@ -167,6 +170,7 @@ func (r *Resolvable) ResolveNode(node Node, data *astjson.Value, out io.Writer)
167170
r.print = false
168171
r.printErr = nil
169172
r.authorizationError = nil
173+
// don't init errors! It will heavily increase memory usage
170174
r.errors = nil
171175

172176
hasErrors := r.walkNode(node, data)
@@ -233,6 +237,7 @@ func (r *Resolvable) Resolve(ctx context.Context, rootData *Object, fetchTree *F
233237
return r.printErr
234238
}
235239

240+
// ensureErrorsInitialized is used to lazily init r.errors if needed
236241
func (r *Resolvable) ensureErrorsInitialized() {
237242
if r.errors == nil {
238243
r.errors = astjson.ArrayValue(r.astjsonArena)

v2/pkg/engine/resolve/resolve.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,13 @@ type Resolver struct {
7272
// maxSubscriptionFetchTimeout defines the maximum time a subscription fetch can take before it is considered timed out
7373
maxSubscriptionFetchTimeout time.Duration
7474

75-
resolveArenaPool *ArenaPool
75+
// resolveArenaPool is the arena pool dedicated for Loader & Resolvable
76+
// ArenaPool automatically adjusts arena buffer sizes per workload
77+
// resolving & response buffering are very different tasks
78+
// as such, it was best to have two arena pools in terms of memory usage
79+
// A single pool for both was much less efficient
80+
resolveArenaPool *ArenaPool
81+
// responseBufferPool is the arena pool dedicated for response buffering before sending to the client
7682
responseBufferPool *ArenaPool
7783

7884
// Single flight cache for deduplicating requests across all loaders
@@ -246,6 +252,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
246252

247253
func newTools(options ResolverOptions, allowedExtensionFields map[string]struct{}, allowedErrorFields map[string]struct{}, sf *SingleFlight) *tools {
248254
return &tools{
255+
// we set the arena manually
249256
resolvable: NewResolvable(nil, options.ResolvableOptions),
250257
loader: &Loader{
251258
propagateSubgraphErrors: options.PropagateSubgraphErrors,

0 commit comments

Comments
 (0)