@@ -10,7 +10,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1010See the License for the specific language governing permissions and
1111limitations under the License.
1212*/
13- package server
13+ package v1alpha1
1414
1515import (
1616 "context"
@@ -26,7 +26,7 @@ import (
2626 "strings"
2727
2828 "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
29- packages "github.com/kubeapps/kubeapps/cmd/kubeapps-apis/gen/ core/packages/v1alpha1 "
29+ "github.com/kubeapps/kubeapps/cmd/kubeapps-apis/core"
3030 plugins "github.com/kubeapps/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1"
3131 "github.com/kubeapps/kubeapps/pkg/kube"
3232 "google.golang.org/grpc"
@@ -46,33 +46,24 @@ const (
4646 clustersCAFilesPrefix = "/etc/additional-clusters-cafiles"
4747)
4848
49- // KubernetesConfigGetter is a function type used by plugins to get a k8s config
50- type KubernetesConfigGetter func (ctx context.Context , cluster string ) (* rest.Config , error )
51-
52- // pkgsPluginWithServer stores the plugin detail together with its implementation.
53- type pkgsPluginWithServer struct {
54- plugin * plugins.Plugin
55- server packages.PackagesServiceServer
49+ // PluginWithServer keeps a record of a GRPC server and its plugin detail.
50+ type PluginWithServer struct {
51+ Plugin * plugins.Plugin
52+ Server interface {}
5653}
5754
5855// coreServer implements the API defined in cmd/kubeapps-api-service/core/core.proto
5956type pluginsServer struct {
6057 plugins.UnimplementedPluginsServiceServer
6158
62- // The slice of plugins is initialised when registering plugins during NewPluginsServer.
63- plugins []* plugins.Plugin
64-
65- // packagesPlugins contains plugin server implementations which satisfy
66- // the core server packages.v1alpha1 interface.
67- // TODO: Update the plugins server to be able to register different versions
68- // of core plugins.
69- packagesPlugins []* pkgsPluginWithServer
59+ // The slice of pluginsWithServers is initialised when registering pluginsWithServers during NewPluginsServer.
60+ pluginsWithServers []PluginWithServer
7061
7162 // The parsed config for clusters in a multi-cluster setup.
7263 clustersConfig kube.ClustersConfig
7364}
7465
75- func NewPluginsServer (serveOpts ServeOptions , registrar grpc.ServiceRegistrar , gwArgs gwHandlerArgs ) (* pluginsServer , error ) {
66+ func NewPluginsServer (serveOpts core. ServeOptions , registrar grpc.ServiceRegistrar , gwArgs core. GatewayHandlerArgs ) (* pluginsServer , error ) {
7667 // Store the serveOptions in the global 'pluginsServeOpts' variable
7768
7869 // Find all .so plugins in the specified plugins directory.
@@ -90,22 +81,18 @@ func NewPluginsServer(serveOpts ServeOptions, registrar grpc.ServiceRegistrar, g
9081 }
9182 ps .clustersConfig = clustersConfig
9283
93- pluginDetails , err : = ps .registerPlugins (pluginPaths , registrar , gwArgs , serveOpts )
84+ err = ps .registerPlugins (pluginPaths , registrar , gwArgs , serveOpts )
9485 if err != nil {
9586 return nil , fmt .Errorf ("failed to register plugins: %w" , err )
9687 }
9788
98- sortPlugins (pluginDetails )
99-
100- ps .plugins = pluginDetails
101-
10289 return ps , nil
10390}
10491
10592// sortPlugins returns a consistently ordered slice.
106- func sortPlugins (p []* plugins. Plugin ) {
93+ func sortPlugins (p []PluginWithServer ) {
10794 sort .Slice (p , func (i , j int ) bool {
108- return p [i ].Name < p [j ].Name || (p [i ].Name == p [j ].Name && p [i ].Version < p [j ].Version )
95+ return p [i ].Plugin . Name < p [j ].Plugin . Name || (p [i ].Plugin . Name == p [j ].Plugin . Name && p [i ].Plugin . Version < p [j ]. Plugin .Version )
10996 })
11097}
11198
@@ -114,95 +101,101 @@ func (s *pluginsServer) GetConfiguredPlugins(ctx context.Context, in *plugins.Ge
114101 // this gets logged twice (liveness and readiness checks) every 10 seconds and
115102 // really adds a lot of noise to the logs, so lowering verbosity
116103 log .V (4 ).Infof ("+core GetConfiguredPlugins" )
104+ pluginDetails := make ([]* plugins.Plugin , len (s .pluginsWithServers ))
105+ for i , p := range s .pluginsWithServers {
106+ pluginDetails [i ] = p .Plugin
107+ }
117108 return & plugins.GetConfiguredPluginsResponse {
118- Plugins : s . plugins ,
109+ Plugins : pluginDetails ,
119110 }, nil
120111}
121112
122113// registerPlugins opens each plugin, looks up the register function and calls it with the registrar.
123- func (s * pluginsServer ) registerPlugins (pluginPaths []string , grpcReg grpc.ServiceRegistrar , gwArgs gwHandlerArgs , serveOpts ServeOptions ) ([] * plugins. Plugin , error ) {
124- pluginDetails := []* plugins. Plugin {}
114+ func (s * pluginsServer ) registerPlugins (pluginPaths []string , grpcReg grpc.ServiceRegistrar , gwArgs core. GatewayHandlerArgs , serveOpts core. ServeOptions ) error {
115+ pluginsWithServers := []PluginWithServer {}
125116
126117 configGetter , err := createConfigGetter (serveOpts , s .clustersConfig )
127118 if err != nil {
128- return nil , fmt .Errorf ("unable to create a ClientGetter: %w" , err )
119+ fmt .Errorf ("unable to create a ClientGetter: %w" , err )
129120 }
130121
131122 for _ , pluginPath := range pluginPaths {
132123 p , err := plugin .Open (pluginPath )
133124 if err != nil {
134- return nil , fmt .Errorf ("unable to open plugin %q: %w" , pluginPath , err )
125+ fmt .Errorf ("unable to open plugin %q: %w" , pluginPath , err )
135126 }
136127
137128 var pluginDetail * plugins.Plugin
138129 if pluginDetail , err = getPluginDetail (p , pluginPath ); err != nil {
139- return nil , err
140- } else {
141- pluginDetails = append (pluginDetails , pluginDetail )
130+ return err
142131 }
143132
144- if err = s .registerGRPC (p , pluginDetail , grpcReg , configGetter , serveOpts .PluginConfigPath ); err != nil {
145- return nil , err
133+ if grpcServer , err := s .registerGRPC (p , pluginDetail , grpcReg , configGetter , serveOpts .PluginConfigPath ); err != nil {
134+ return err
135+ } else {
136+ pluginsWithServers = append (pluginsWithServers , PluginWithServer {
137+ Plugin : pluginDetail ,
138+ Server : grpcServer ,
139+ })
146140 }
147141
148142 if err = registerHTTP (p , pluginDetail , gwArgs ); err != nil {
149- return nil , err
143+ return err
150144 }
151145
152146 log .Infof ("Successfully registered plugin %q" , pluginPath )
153147 }
154- return pluginDetails , nil
148+
149+ sortPlugins (pluginsWithServers )
150+
151+ s .pluginsWithServers = pluginsWithServers
152+
153+ return nil
155154}
156155
157156// registerGRPC finds and calls the required function for registering the plugin for the GRPC server.
158157func (s * pluginsServer ) registerGRPC (p * plugin.Plugin , pluginDetail * plugins.Plugin , registrar grpc.ServiceRegistrar ,
159- clientGetter KubernetesConfigGetter , pluginConfigPath string ) error {
158+ clientGetter core. KubernetesConfigGetter , pluginConfigPath string ) ( interface {}, error ) {
160159 grpcRegFn , err := p .Lookup (grpcRegisterFunction )
161160 if err != nil {
162- return fmt .Errorf ("unable to lookup %q for %v: %w" , grpcRegisterFunction , pluginDetail , err )
161+ return nil , fmt .Errorf ("unable to lookup %q for %v: %w" , grpcRegisterFunction , pluginDetail , err )
163162 }
164- type grpcRegisterFunctionType = func (grpc.ServiceRegistrar , KubernetesConfigGetter , kube.ClustersConfig , string ) (interface {}, error )
163+ type grpcRegisterFunctionType = func (grpc.ServiceRegistrar , core. KubernetesConfigGetter , kube.ClustersConfig , string ) (interface {}, error )
165164
166165 grpcFn , ok := grpcRegFn .(grpcRegisterFunctionType )
167166 if ! ok {
168- var dummyFn grpcRegisterFunctionType = func (grpc.ServiceRegistrar , KubernetesConfigGetter , kube.ClustersConfig , string ) (interface {}, error ) {
167+ var dummyFn grpcRegisterFunctionType = func (grpc.ServiceRegistrar , core. KubernetesConfigGetter , kube.ClustersConfig , string ) (interface {}, error ) {
169168 return nil , nil
170169 }
171- return fmt .Errorf ("unable to use %q in plugin %v due to mismatched signature.\n want: %T\n got: %T" , grpcRegisterFunction , pluginDetail , dummyFn , grpcRegFn )
170+ return nil , fmt .Errorf ("unable to use %q in plugin %v due to mismatched signature.\n want: %T\n got: %T" , grpcRegisterFunction , pluginDetail , dummyFn , grpcRegFn )
172171 }
173172
174173 server , err := grpcFn (registrar , clientGetter , s .clustersConfig , pluginConfigPath )
175174 if err != nil {
176- return fmt .Errorf ("plug-in %q failed to register due to: %v" , pluginDetail , err )
175+ return nil , fmt .Errorf ("plug-in %q failed to register due to: %v" , pluginDetail , err )
177176 } else if server == nil {
178- return fmt .Errorf ("registration for plug-in %v failed due to: %T returned nil when non-nil value was expected" , pluginDetail , grpcFn )
177+ return nil , fmt .Errorf ("registration for plug-in %v failed due to: %T returned nil when non-nil value was expected" , pluginDetail , grpcFn )
179178 }
180179
181- return s . registerPluginsSatisfyingCoreAPIs ( server , pluginDetail )
180+ return server , nil
182181}
183182
184- // registerPluginsImplementingCoreAPIs checks a plugin implementation to see
185- // if it implements a core api (such as `packages.v1alpha1`) and if so,
186- // keeps a (typed) reference to the implementation for use on aggregate APIs.
187- func (s * pluginsServer ) registerPluginsSatisfyingCoreAPIs (pluginSrv interface {}, pluginDetail * plugins.Plugin ) error {
188- // The following check if the service implements an interface is what
189- // grpc-go itself does, see:
190- // https://github.com/grpc/grpc-go/blob/v1.38.0/server.go#L621
191- serverType := reflect .TypeOf (pluginSrv )
192- corePackagesType := reflect .TypeOf ((* packages .PackagesServiceServer )(nil )).Elem ()
193-
194- if serverType .Implements (corePackagesType ) {
195- pkgsSrv , ok := pluginSrv .(packages.PackagesServiceServer )
196- if ! ok {
197- return fmt .Errorf ("Unable to convert plugin %v to core PackagesServicesServer although it implements the same." , pluginDetail )
183+ // GetPluginsSatisfyingInterface returns the registered plugins which satisfy a
184+ // particular interface. Currently this is used to return the plugins that satisfy
185+ // the core.packaging interface for the core packaging server.
186+ func (s * pluginsServer ) GetPluginsSatisfyingInterface (targetInterface reflect.Type ) []PluginWithServer {
187+ satisfiedPlugins := []PluginWithServer {}
188+ for _ , pluginSrv := range s .pluginsWithServers {
189+ // The following check if the service implements an interface is what
190+ // grpc-go itself does, see:
191+ // https://github.com/grpc/grpc-go/blob/v1.38.0/server.go#L621
192+ serverType := reflect .TypeOf (pluginSrv .Server )
193+
194+ if serverType .Implements (targetInterface ) {
195+ satisfiedPlugins = append (satisfiedPlugins , pluginSrv )
198196 }
199- s .packagesPlugins = append (s .packagesPlugins , & pkgsPluginWithServer {
200- plugin : pluginDetail ,
201- server : pkgsSrv ,
202- })
203- log .Infof ("Plugin %v implements core.packages.v1alpha1. Registered for aggregation." , pluginDetail )
204197 }
205- return nil
198+ return satisfiedPlugins
206199}
207200
208201// getPluginDetail returns a core.plugins.Plugin as defined by the plugin itself.
@@ -224,7 +217,7 @@ func getPluginDetail(p *plugin.Plugin, pluginPath string) (*plugins.Plugin, erro
224217}
225218
226219// registerHTTP finds and calls the required function for registering the plugin for the HTTP gateway server.
227- func registerHTTP (p * plugin.Plugin , pluginDetail * plugins.Plugin , gwArgs gwHandlerArgs ) error {
220+ func registerHTTP (p * plugin.Plugin , pluginDetail * plugins.Plugin , gwArgs core. GatewayHandlerArgs ) error {
228221 gwRegFn , err := p .Lookup (gatewayRegisterFunction )
229222 if err != nil {
230223 return fmt .Errorf ("unable to lookup %q for %v: %w" , gatewayRegisterFunction , pluginDetail , err )
@@ -237,7 +230,7 @@ func registerHTTP(p *plugin.Plugin, pluginDetail *plugins.Plugin, gwArgs gwHandl
237230 var dummyFn gatewayRegisterFunctionType = func (context.Context , * runtime.ServeMux , string , []grpc.DialOption ) error { return nil }
238231 return fmt .Errorf ("unable to use %q in plugin %v due to mismatched signature.\n want: %T\n got: %T" , gatewayRegisterFunction , pluginDetail , dummyFn , gwRegFn )
239232 }
240- return gwfn (gwArgs .ctx , gwArgs .mux , gwArgs .addr , gwArgs .dialOptions )
233+ return gwfn (gwArgs .Ctx , gwArgs .Mux , gwArgs .Addr , gwArgs .DialOptions )
241234}
242235
243236// listSOFiles returns the absolute paths of all .so files found in any of the provided plugin directories.
@@ -274,7 +267,7 @@ func listSOFiles(fsys fs.FS, pluginDirs []string) ([]string, error) {
274267// createConfigGetter returns a function closure for creating the k8s config to interact with the cluster.
275268// The returned function utilizes the user credential present in the request context.
276269// The plugins just have to call this function passing the context in order to retrieve the configured k8s client
277- func createConfigGetter (serveOpts ServeOptions , clustersConfig kube.ClustersConfig ) (KubernetesConfigGetter , error ) {
270+ func createConfigGetter (serveOpts core. ServeOptions , clustersConfig kube.ClustersConfig ) (core. KubernetesConfigGetter , error ) {
278271 var restConfig * rest.Config
279272 var err error
280273
@@ -305,7 +298,7 @@ func createConfigGetter(serveOpts ServeOptions, clustersConfig kube.ClustersConf
305298
306299// createClientGetter takes the required params and returns the closure fuction.
307300// it's splitted for testing this fn separately
308- func createConfigGetterWithParams (inClusterConfig * rest.Config , serveOpts ServeOptions , clustersConfig kube.ClustersConfig ) (KubernetesConfigGetter , error ) {
301+ func createConfigGetterWithParams (inClusterConfig * rest.Config , serveOpts core. ServeOptions , clustersConfig kube.ClustersConfig ) (core. KubernetesConfigGetter , error ) {
309302 // return the closure fuction that takes the context, but preserving the required scope,
310303 // 'inClusterConfig' and 'config'
311304 return func (ctx context.Context , cluster string ) (* rest.Config , error ) {
@@ -360,7 +353,7 @@ func extractToken(ctx context.Context) (string, error) {
360353
361354// getClustersConfigFromServeOpts get the serveOptions and calls parseClusterConfig with the proper values
362355// returning a kube.ClustersConfig
363- func getClustersConfigFromServeOpts (serveOpts ServeOptions ) (kube.ClustersConfig , error ) {
356+ func getClustersConfigFromServeOpts (serveOpts core. ServeOptions ) (kube.ClustersConfig , error ) {
364357 if serveOpts .ClustersConfigPath == "" {
365358 if serveOpts .UnsafeLocalDevKubeconfig {
366359 // if using a local kubeconfig (dev purposes), this ClusterConfig file is not strictly required
0 commit comments