Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ const (
// Enables a StatefulSet to start from an arbitrary non zero ordinal
StatefulSetStartOrdinal featuregate.Feature = "StatefulSetStartOrdinal"


// owner: @serathius
// Allow API server to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"

// owner: @robscott
// kep: https://kep.k8s.io/2433
// alpha: v1.21
Expand Down
11 changes: 11 additions & 0 deletions pkg/registry/core/rest/storage_core_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"

Expand Down Expand Up @@ -69,6 +72,14 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
NegotiatedSerializer: legacyscheme.Codecs,
}

opts := []serializer.CodecFactoryOptionsMutator{}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if len(opts) != 0 {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
}

eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
if err != nil {
return genericapiserver.APIGroupInfo{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync/atomic"
"time"

"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"

apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
Expand Down Expand Up @@ -826,6 +828,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped

// CRDs explicitly do not support protobuf, but some objects returned by the API server do
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
Expand All @@ -839,10 +842,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaTypeType: "application",
MediaTypeSubType: "json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, true),
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
Strict: true,
Strict: true,
StreamingCollectionsEncoding: streamingCollections,
}),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Expand Down Expand Up @@ -936,7 +940,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
scaleScope := *requestScopes[v.Name]
scaleConverter := scale.NewScaleConverter()
scaleScope.Subresource = "scale"
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
var opts []serializer.CodecFactoryOptionsMutator
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/api/meta/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
if err != nil {
return nil, err
}
if items.IsNil() {
return nil, nil
}
list := make([]runtime.Object, items.Len())
if len(list) == 0 {
return list, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,25 @@ type serializerType struct {
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType {
jsonSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
)
jsonSerializerType := serializerType{
AcceptContentTypes: []string{runtime.ContentTypeJSON},
ContentType: runtime.ContentTypeJSON,
FileExtensions: []string{"json"},
EncodesAsText: true,
Serializer: jsonSerializer,

Framer: json.Framer,
StreamSerializer: jsonSerializer,
StrictSerializer: json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
),

Framer: json.Framer,
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: jsonSerializer,
Framer: json.Framer,
},
}
if options.Pretty {
jsonSerializerType.PrettySerializer = json.NewSerializerWithOptions(
Expand Down Expand Up @@ -136,6 +144,10 @@ type CodecFactoryOptions struct {
Strict bool
// Pretty includes a pretty serializer along with the non-pretty one
Pretty bool

StreamingCollectionsEncodingToJSON bool

serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
}

// CodecFactoryOptionsMutator takes a pointer to an options struct and then modifies it.
Expand All @@ -162,6 +174,19 @@ func DisableStrict(options *CodecFactoryOptions) {
options.Strict = false
}

// WithSerializer configures a serializer to be supported in addition to the default serializers.
func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo) CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.serializers = append(options.serializers, f)
}
}

func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToJSON = true
}
}

// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package json

import (
"encoding/json"
"fmt"
"io"
"sort"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/util/sets"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) {
list, ok := obj.(*unstructured.UnstructuredList)
if ok {
return true, streamingEncodeUnstructuredList(w, list)
}
if _, ok := obj.(json.Marshaler); ok {
return false, nil
}
typeMeta, listMeta, items, err := getListMeta(obj)
if err == nil {
return true, streamingEncodeList(w, typeMeta, listMeta, items)
}
return false, nil
}

// getListMeta implements list extraction logic for json stream serialization.
//
// Reason for a custom logic instead of reusing accessors from meta package:
// * Validate json tags to prevent incompatibility with json standard package.
// * ListMetaAccessor doesn't distinguish empty from nil value.
// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}"
func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) {
listValue, err := conversion.EnforcePtr(list)
if err != nil {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
}
listType := listValue.Type()
if listType.NumField() != 3 {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields")
}
// TypeMeta
typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
if !ok {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type")
}
if listType.Field(0).Tag.Get("json") != ",inline" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`)
}
// ListMeta
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
if !ok {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type")
}
if listType.Field(1).Tag.Get("json") != "metadata,omitempty" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`)
}
// Items
items, err := meta.ExtractList(list)
if err != nil {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
}
if listType.Field(2).Tag.Get("json") != "items" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`)
}
return typeMeta, listMeta, items, nil
}

func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error {
// Start
if _, err := w.Write([]byte(`{`)); err != nil {
return err
}

// TypeMeta
if typeMeta.Kind != "" {
if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil {
return err
}
}
if typeMeta.APIVersion != "" {
if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil {
return err
}
}

// ListMeta
if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil {
return err
}

// Items
if err := encodeItemsObjectSlice(w, items); err != nil {
return err
}

// End
_, err := w.Write([]byte("}\n"))
return err
}

func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) {
if items == nil {
err := encodeKeyValuePair(w, "items", nil, nil)
return err
}
_, err = w.Write([]byte(`"items":[`))
if err != nil {
return err
}
suffix := []byte(",")
for i, item := range items {
if i == len(items)-1 {
suffix = nil
}
err := encodeValue(w, item, suffix)
if err != nil {
return err
}
}
_, err = w.Write([]byte("]"))
if err != nil {
return err
}
return err
}

func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error {
_, err := w.Write([]byte(`{`))
if err != nil {
return err
}
keys := sets.List(sets.KeySet(list.Object))
if _, exists := list.Object["items"]; !exists {
keys = append(keys, "items")
}
sort.Strings(keys)

suffix := []byte(",")
for i, key := range keys {
if i == len(keys)-1 {
suffix = nil
}
if key == "items" {
err = encodeItemsUnstructuredSlice(w, list.Items, suffix)
} else {
err = encodeKeyValuePair(w, key, list.Object[key], suffix)
}
if err != nil {
return err
}
}
_, err = w.Write([]byte("}\n"))
return err
}

func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) {
_, err = w.Write([]byte(`"items":[`))
if err != nil {
return err
}
comma := []byte(",")
for i, item := range items {
if i == len(items)-1 {
comma = nil
}
err := encodeValue(w, item.Object, comma)
if err != nil {
return err
}
}
_, err = w.Write([]byte("]"))
if err != nil {
return err
}
if len(suffix) > 0 {
_, err = w.Write(suffix)
}
return err
}

func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) {
err = encodeValue(w, key, []byte(":"))
if err != nil {
return err
}
err = encodeValue(w, value, suffix)
if err != nil {
return err
}
return err
}

func encodeValue(w io.Writer, value any, suffix []byte) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
_, err = w.Write(data)
if err != nil {
return err
}
if len(suffix) > 0 {
_, err = w.Write(suffix)
}
return err
}
Loading