Skip to content

Commit de99adb

Browse files
serathiusrh-roman
authored andcommitted
Implement streaming proto encoding
1 parent 583d3c7 commit de99adb

File tree

15 files changed

+667
-32
lines changed

15 files changed

+667
-32
lines changed

pkg/features/kube_features.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,9 +703,13 @@ const (
703703
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
704704

705705
// owner: @serathius
706-
// Allow API server to encode collections item by item, instead of all at once.
706+
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
707707
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
708708

709+
// owner: serathius
710+
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
711+
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"
712+
709713
// owner: @robscott
710714
// kep: https://kep.k8s.io/2433
711715
//

pkg/features/versioned_kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
760760
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
761761
},
762762

763+
StreamingCollectionEncodingToProtobuf: {
764+
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
765+
},
766+
763767
SupplementalGroupsPolicy: {
764768
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
765769
},

pkg/registry/core/rest/storage_core_generic.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
8080
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
8181
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
8282
}
83+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
84+
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
85+
}
8386
if len(opts) != 0 {
8487
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
8588
}

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
895895
MediaType: "application/vnd.kubernetes.protobuf",
896896
MediaTypeType: "application",
897897
MediaTypeSubType: "vnd.kubernetes.protobuf",
898-
Serializer: protobuf.NewSerializer(creator, typer),
898+
Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{
899+
StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf),
900+
}),
899901
StreamSerializer: &runtime.StreamSerializerInfo{
900902
Serializer: protobuf.NewRawSerializer(creator, typer),
901903
Framer: protobuf.LengthDelimitedFramer,
@@ -978,6 +980,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
978980
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
979981
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
980982
}
983+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
984+
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
985+
}
981986
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
982987
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
983988
scaleScope.Namer = handlers.ContextBasedNaming{

staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
6161
mf, scheme, scheme,
6262
json.SerializerOptions{Yaml: true, Pretty: false, Strict: true},
6363
)
64-
protoSerializer := protobuf.NewSerializer(scheme, scheme)
64+
protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{
65+
StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf,
66+
})
6567
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)
6668

6769
serializers := []runtime.SerializerInfo{
@@ -113,7 +115,8 @@ type CodecFactoryOptions struct {
113115
// Pretty includes a pretty serializer along with the non-pretty one
114116
Pretty bool
115117

116-
StreamingCollectionsEncodingToJSON bool
118+
StreamingCollectionsEncodingToJSON bool
119+
StreamingCollectionsEncodingToProtobuf bool
117120

118121
serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
119122
}
@@ -155,6 +158,12 @@ func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
155158
}
156159
}
157160

161+
func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator {
162+
return func(options *CodecFactoryOptions) {
163+
options.StreamingCollectionsEncodingToProtobuf = true
164+
}
165+
}
166+
158167
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
159168
// and conversion wrappers to define preferred internal and external versions. In the future,
160169
// as the internal version is used less, callers may instead use a defaulting serializer and
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package protobuf
18+
19+
import (
20+
"errors"
21+
"io"
22+
"math/bits"
23+
24+
"github.com/gogo/protobuf/proto"
25+
26+
"k8s.io/apimachinery/pkg/api/meta"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/conversion"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
)
31+
32+
var (
33+
errFieldCount = errors.New("expected ListType to have 3 fields")
34+
errTypeMetaField = errors.New("expected TypeMeta field to have TypeMeta type")
35+
errTypeMetaProtobufTag = errors.New(`expected TypeMeta protobuf field tag to be ""`)
36+
errListMetaField = errors.New("expected ListMeta field to have ListMeta type")
37+
errListMetaProtobufTag = errors.New(`expected ListMeta protobuf field tag to be "bytes,1,opt,name=metadata"`)
38+
errItemsProtobufTag = errors.New(`expected Items protobuf field tag to be "bytes,2,rep,name=items"`)
39+
errItemsSizer = errors.New(`expected Items elements to implement proto.Sizer`)
40+
)
41+
42+
// getStreamingListData implements list extraction logic for protobuf stream serialization.
43+
//
44+
// Reason for a custom logic instead of reusing accessors from meta package:
45+
// * Validate proto tags to prevent incompatibility with proto standard package.
46+
// * ListMetaAccessor doesn't distinguish empty from nil value.
47+
// * TypeAccessor reparsing "apiVersion" and serializing it with "{group}/{version}"
48+
func getStreamingListData(list runtime.Object) (data streamingListData, err error) {
49+
listValue, err := conversion.EnforcePtr(list)
50+
if err != nil {
51+
return data, err
52+
}
53+
listType := listValue.Type()
54+
if listType.NumField() != 3 {
55+
return data, errFieldCount
56+
}
57+
// TypeMeta: validated, but not returned as is not serialized.
58+
_, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
59+
if !ok {
60+
return data, errTypeMetaField
61+
}
62+
if listType.Field(0).Tag.Get("protobuf") != "" {
63+
return data, errTypeMetaProtobufTag
64+
}
65+
// ListMeta
66+
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
67+
if !ok {
68+
return data, errListMetaField
69+
}
70+
// if we were ever to relax the protobuf tag check we should update the hardcoded `0xa` below when writing ListMeta.
71+
if listType.Field(1).Tag.Get("protobuf") != "bytes,1,opt,name=metadata" {
72+
return data, errListMetaProtobufTag
73+
}
74+
data.listMeta = listMeta
75+
// Items; if we were ever to relax the protobuf tag check we should update the hardcoded `0x12` below when writing Items.
76+
if listType.Field(2).Tag.Get("protobuf") != "bytes,2,rep,name=items" {
77+
return data, errItemsProtobufTag
78+
}
79+
items, err := meta.ExtractList(list)
80+
if err != nil {
81+
return data, err
82+
}
83+
data.items = items
84+
data.totalSize, data.listMetaSize, data.itemsSizes, err = listSize(listMeta, items)
85+
return data, err
86+
}
87+
88+
type streamingListData struct {
89+
// totalSize is the total size of the serialized List object, including their proto headers/size bytes
90+
totalSize int
91+
92+
// listMetaSize caches results from .Size() call to listMeta, doesn't include header bytes (field identifier, size)
93+
listMetaSize int
94+
listMeta metav1.ListMeta
95+
96+
// itemsSizes caches results from .Size() call to items, doesn't include header bytes (field identifier, size)
97+
itemsSizes []int
98+
items []runtime.Object
99+
}
100+
101+
// listSize return size of ListMeta and items to be later used for preallocations.
102+
// listMetaSize and itemSizes do not include header bytes (field identifier, size).
103+
func listSize(listMeta metav1.ListMeta, items []runtime.Object) (totalSize, listMetaSize int, itemSizes []int, err error) {
104+
// ListMeta
105+
listMetaSize = listMeta.Size()
106+
totalSize += 1 + sovGenerated(uint64(listMetaSize)) + listMetaSize
107+
// Items
108+
itemSizes = make([]int, len(items))
109+
for i, item := range items {
110+
sizer, ok := item.(proto.Sizer)
111+
if !ok {
112+
return totalSize, listMetaSize, nil, errItemsSizer
113+
}
114+
n := sizer.Size()
115+
itemSizes[i] = n
116+
totalSize += 1 + sovGenerated(uint64(n)) + n
117+
}
118+
return totalSize, listMetaSize, itemSizes, nil
119+
}
120+
121+
func streamingEncodeUnknownList(w io.Writer, unk runtime.Unknown, listData streamingListData, memAlloc runtime.MemoryAllocator) error {
122+
_, err := w.Write(protoEncodingPrefix)
123+
if err != nil {
124+
return err
125+
}
126+
// encodeList is responsible for encoding the List into the unknown Raw.
127+
encodeList := func(writer io.Writer) (int, error) {
128+
return streamingEncodeList(writer, listData, memAlloc)
129+
}
130+
_, err = unk.MarshalToWriter(w, listData.totalSize, encodeList)
131+
return err
132+
}
133+
134+
func streamingEncodeList(w io.Writer, listData streamingListData, memAlloc runtime.MemoryAllocator) (size int, err error) {
135+
// ListMeta; 0xa = (1 << 3) | 2; field number: 1, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure
136+
n, err := doEncodeWithHeader(&listData.listMeta, w, 0xa, listData.listMetaSize, memAlloc)
137+
size += n
138+
if err != nil {
139+
return size, err
140+
}
141+
// Items; 0x12 = (2 << 3) | 2; field number: 2, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure
142+
for i, item := range listData.items {
143+
n, err := doEncodeWithHeader(item, w, 0x12, listData.itemsSizes[i], memAlloc)
144+
size += n
145+
if err != nil {
146+
return size, err
147+
}
148+
}
149+
return size, nil
150+
}
151+
152+
func writeVarintGenerated(w io.Writer, v int) (int, error) {
153+
buf := make([]byte, sovGenerated(uint64(v)))
154+
encodeVarintGenerated(buf, len(buf), uint64(v))
155+
return w.Write(buf)
156+
}
157+
158+
// sovGenerated is copied from `generated.pb.go` returns size of varint.
159+
func sovGenerated(v uint64) int {
160+
return (bits.Len64(v|1) + 6) / 7
161+
}
162+
163+
// encodeVarintGenerated is copied from `generated.pb.go` encodes varint.
164+
func encodeVarintGenerated(dAtA []byte, offset int, v uint64) int {
165+
offset -= sovGenerated(v)
166+
base := offset
167+
for v >= 1<<7 {
168+
dAtA[offset] = uint8(v&0x7f | 0x80)
169+
v >>= 7
170+
offset++
171+
}
172+
dAtA[offset] = uint8(v)
173+
return base
174+
}

0 commit comments

Comments
 (0)