Skip to content

Commit 8dc4155

Browse files
z1chengdeads2k
authored andcommitted
UPSTREAM: 130281: Implement chunking for gzip encoder in deferredResponseWriter
Signed-off-by: z1cheng <[email protected]>
1 parent 0c14b39 commit 8dc4155

File tree

2 files changed

+310
-6
lines changed

2 files changed

+310
-6
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ const (
156156
// (usually the entire object), and if the size is smaller no gzipping will be performed
157157
// if the client requests it.
158158
defaultGzipThresholdBytes = 128 * 1024
159+
// Use the length of the first write of streaming implementations.
160+
// TODO: Update when streaming proto is implemented
161+
firstWriteStreamingThresholdBytes = 1
159162
)
160163

161164
// negotiateContentEncoding returns a supported client-requested content encoding for the
@@ -191,14 +194,53 @@ type deferredResponseWriter struct {
191194
statusCode int
192195
contentEncoding string
193196

194-
hasWritten bool
195-
hw http.ResponseWriter
196-
w io.Writer
197+
hasBuffered bool
198+
buffer []byte
199+
hasWritten bool
200+
hw http.ResponseWriter
201+
w io.Writer
197202

198203
ctx context.Context
199204
}
200205

201206
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
207+
switch {
208+
case w.hasWritten:
209+
// already written, cannot buffer
210+
return w.unbufferedWrite(p)
211+
212+
case w.contentEncoding != "gzip":
213+
// non-gzip, no need to buffer
214+
return w.unbufferedWrite(p)
215+
216+
case !w.hasBuffered && len(p) > defaultGzipThresholdBytes:
217+
// not yet buffered, first write is long enough to trigger gzip, no need to buffer
218+
return w.unbufferedWrite(p)
219+
220+
case !w.hasBuffered && len(p) > firstWriteStreamingThresholdBytes:
221+
// not yet buffered, first write is longer than expected for streaming scenarios that would require buffering, no need to buffer
222+
return w.unbufferedWrite(p)
223+
224+
default:
225+
if !w.hasBuffered {
226+
w.hasBuffered = true
227+
// Start at 80 bytes to avoid rapid reallocation of the buffer.
228+
// The minimum size of a 0-item serialized list object is 80 bytes:
229+
// {"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"1"},"items":[]}\n
230+
w.buffer = make([]byte, 0, max(80, len(p)))
231+
}
232+
w.buffer = append(w.buffer, p...)
233+
var err error
234+
if len(w.buffer) > defaultGzipThresholdBytes {
235+
// we've accumulated enough to trigger gzip, write and clear buffer
236+
_, err = w.unbufferedWrite(w.buffer)
237+
w.buffer = nil
238+
}
239+
return len(p), err
240+
}
241+
}
242+
243+
func (w *deferredResponseWriter) unbufferedWrite(p []byte) (n int, err error) {
202244
ctx := w.ctx
203245
span := tracing.SpanFromContext(ctx)
204246
// This Step usually wraps in-memory object serialization.
@@ -244,11 +286,17 @@ func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
244286
return w.w.Write(p)
245287
}
246288

247-
func (w *deferredResponseWriter) Close() error {
289+
func (w *deferredResponseWriter) Close() (err error) {
248290
if !w.hasWritten {
249-
return nil
291+
if !w.hasBuffered {
292+
return nil
293+
}
294+
// never reached defaultGzipThresholdBytes, no need to do the gzip writer cleanup
295+
_, err := w.unbufferedWrite(w.buffer)
296+
w.buffer = nil
297+
return err
250298
}
251-
var err error
299+
252300
switch t := w.w.(type) {
253301
case *gzip.Writer:
254302
err = t.Close()

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"k8s.io/apimachinery/pkg/runtime"
4545
"k8s.io/apimachinery/pkg/runtime/schema"
4646
jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json"
47+
rand2 "k8s.io/apimachinery/pkg/util/rand"
4748
"k8s.io/apimachinery/pkg/util/uuid"
4849
"k8s.io/apiserver/pkg/features"
4950
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -378,6 +379,261 @@ func TestSerializeObject(t *testing.T) {
378379
}
379380
}
380381

382+
func TestDeferredResponseWriter_Write(t *testing.T) {
383+
smallChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes-1)
384+
largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1)
385+
386+
tests := []struct {
387+
name string
388+
chunks [][]byte
389+
expectGzip bool
390+
expectHeaders http.Header
391+
}{
392+
{
393+
name: "no writes",
394+
chunks: nil,
395+
expectGzip: false,
396+
expectHeaders: http.Header{},
397+
},
398+
{
399+
name: "one empty write",
400+
chunks: [][]byte{{}},
401+
expectGzip: false,
402+
expectHeaders: http.Header{
403+
"Content-Type": []string{"text/plain"},
404+
},
405+
},
406+
{
407+
name: "one single byte write",
408+
chunks: [][]byte{{'{'}},
409+
expectGzip: false,
410+
expectHeaders: http.Header{
411+
"Content-Type": []string{"text/plain"},
412+
},
413+
},
414+
{
415+
name: "one small chunk write",
416+
chunks: [][]byte{smallChunk},
417+
expectGzip: false,
418+
expectHeaders: http.Header{
419+
"Content-Type": []string{"text/plain"},
420+
},
421+
},
422+
{
423+
name: "two small chunk writes",
424+
chunks: [][]byte{smallChunk, smallChunk},
425+
expectGzip: false,
426+
expectHeaders: http.Header{
427+
"Content-Type": []string{"text/plain"},
428+
},
429+
},
430+
{
431+
name: "one single byte and one small chunk write",
432+
chunks: [][]byte{{'{'}, smallChunk},
433+
expectGzip: false,
434+
expectHeaders: http.Header{
435+
"Content-Type": []string{"text/plain"},
436+
},
437+
},
438+
{
439+
name: "two single bytes and one small chunk write",
440+
chunks: [][]byte{{'{'}, {'{'}, smallChunk},
441+
expectGzip: true,
442+
expectHeaders: http.Header{
443+
"Content-Type": []string{"text/plain"},
444+
"Content-Encoding": []string{"gzip"},
445+
"Vary": []string{"Accept-Encoding"},
446+
},
447+
},
448+
{
449+
name: "one large chunk writes",
450+
chunks: [][]byte{largeChunk},
451+
expectGzip: true,
452+
expectHeaders: http.Header{
453+
"Content-Type": []string{"text/plain"},
454+
"Content-Encoding": []string{"gzip"},
455+
"Vary": []string{"Accept-Encoding"},
456+
},
457+
},
458+
{
459+
name: "two large chunk writes",
460+
chunks: [][]byte{largeChunk, largeChunk},
461+
expectGzip: true,
462+
expectHeaders: http.Header{
463+
"Content-Type": []string{"text/plain"},
464+
"Content-Encoding": []string{"gzip"},
465+
"Vary": []string{"Accept-Encoding"},
466+
},
467+
},
468+
{
469+
name: "one small chunk and one large chunk write",
470+
chunks: [][]byte{smallChunk, largeChunk},
471+
expectGzip: false,
472+
expectHeaders: http.Header{
473+
"Content-Type": []string{"text/plain"},
474+
},
475+
},
476+
}
477+
478+
for _, tt := range tests {
479+
t.Run(tt.name, func(t *testing.T) {
480+
mockResponseWriter := httptest.NewRecorder()
481+
482+
drw := &deferredResponseWriter{
483+
mediaType: "text/plain",
484+
statusCode: 200,
485+
contentEncoding: "gzip",
486+
hw: mockResponseWriter,
487+
ctx: context.Background(),
488+
}
489+
490+
fullPayload := []byte{}
491+
492+
for _, chunk := range tt.chunks {
493+
n, err := drw.Write(chunk)
494+
495+
if err != nil {
496+
t.Fatalf("unexpected error while writing chunk: %v", err)
497+
}
498+
if n != len(chunk) {
499+
t.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n)
500+
}
501+
502+
fullPayload = append(fullPayload, chunk...)
503+
}
504+
505+
err := drw.Close()
506+
if err != nil {
507+
t.Fatalf("unexpected error when closing deferredResponseWriter: %v", err)
508+
}
509+
510+
res := mockResponseWriter.Result()
511+
512+
if res.StatusCode != http.StatusOK {
513+
t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode)
514+
}
515+
if !reflect.DeepEqual(res.Header, tt.expectHeaders) {
516+
t.Fatal(cmp.Diff(tt.expectHeaders, res.Header))
517+
}
518+
519+
resBytes, err := io.ReadAll(res.Body)
520+
if err != nil {
521+
t.Fatalf("unexpected error occurred while reading response body: %v", err)
522+
}
523+
524+
if tt.expectGzip {
525+
gr, err := gzip.NewReader(bytes.NewReader(resBytes))
526+
if err != nil {
527+
t.Fatalf("failed to create gzip reader: %v", err)
528+
}
529+
530+
decompressed, err := io.ReadAll(gr)
531+
if err != nil {
532+
t.Fatalf("failed to decompress: %v", err)
533+
}
534+
535+
if !bytes.Equal(fullPayload, decompressed) {
536+
t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed)
537+
}
538+
} else {
539+
if !bytes.Equal(fullPayload, resBytes) {
540+
t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes)
541+
}
542+
}
543+
})
544+
}
545+
}
546+
547+
func benchmarkChunkingGzip(b *testing.B, count int, chunk []byte) {
548+
mockResponseWriter := httptest.NewRecorder()
549+
mockResponseWriter.Body = nil
550+
551+
drw := &deferredResponseWriter{
552+
mediaType: "text/plain",
553+
statusCode: 200,
554+
contentEncoding: "gzip",
555+
hw: mockResponseWriter,
556+
ctx: context.Background(),
557+
}
558+
b.ResetTimer()
559+
for i := 0; i < count; i++ {
560+
n, err := drw.Write(chunk)
561+
if err != nil {
562+
b.Fatalf("unexpected error while writing chunk: %v", err)
563+
}
564+
if n != len(chunk) {
565+
b.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n)
566+
}
567+
}
568+
err := drw.Close()
569+
if err != nil {
570+
b.Fatalf("unexpected error when closing deferredResponseWriter: %v", err)
571+
}
572+
res := mockResponseWriter.Result()
573+
if res.StatusCode != http.StatusOK {
574+
b.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode)
575+
}
576+
}
577+
578+
func BenchmarkChunkingGzip(b *testing.B) {
579+
tests := []struct {
580+
count int
581+
size int
582+
}{
583+
{
584+
count: 100,
585+
size: 1_000,
586+
},
587+
{
588+
count: 100,
589+
size: 100_000,
590+
},
591+
{
592+
count: 1_000,
593+
size: 100_000,
594+
},
595+
{
596+
count: 1_000,
597+
size: 1_000_000,
598+
},
599+
{
600+
count: 10_000,
601+
size: 100_000,
602+
},
603+
{
604+
count: 100_000,
605+
size: 10_000,
606+
},
607+
{
608+
count: 1,
609+
size: 100_000,
610+
},
611+
{
612+
count: 1,
613+
size: 1_000_000,
614+
},
615+
{
616+
count: 1,
617+
size: 10_000_000,
618+
},
619+
{
620+
count: 1,
621+
size: 100_000_000,
622+
},
623+
{
624+
count: 1,
625+
size: 1_000_000_000,
626+
},
627+
}
628+
629+
for _, t := range tests {
630+
b.Run(fmt.Sprintf("Count=%d/Size=%d", t.count, t.size), func(b *testing.B) {
631+
chunk := []byte(rand2.String(t.size))
632+
benchmarkChunkingGzip(b, t.count, chunk)
633+
})
634+
}
635+
}
636+
381637
func randTime(t *time.Time, r *rand.Rand) {
382638
*t = time.Unix(r.Int63n(1000*365*24*60*60), r.Int63())
383639
}

0 commit comments

Comments
 (0)