Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ endif
install: manifests kustomize ## Install CRDs into the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/crd | $(KUBECTL) apply -f -
## helm creates objects without aibrix prefix, hence deploying gateway components outside of kustomization
helm install eg oci://docker.io/envoyproxy/gateway-helm --version v1.1.0 -n aibrix-system
helm install eg oci://docker.io/envoyproxy/gateway-helm --version v1.1.0 -n aibrix-system --create-namespace
$(KUBECTL) wait --timeout=5m -n aibrix-system deployment/envoy-gateway --for=condition=Available
$(KUBECTL) apply -f config/gateway/gateway.yaml

Expand Down
45 changes: 44 additions & 1 deletion docs/development/app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,28 @@ curl http://localhost:8000/v1/chat/completions \
}'
```

```shell
kubectl delete -f docs/development/app/deployment.yaml
```


## Test with envoy gateway

Install envoy gateway and setup HTTP Route
```shell
- if setting up from scratch

make docker-build && make docker-build-plugins
make install && make deploy

OR

- if only want to test gateway plugins

docker build -t aibrix/plugins:v0.1.0 -f gateway.Dockerfile .
kind load docker-image aibrix/plugins:v0.1.0

make install && make deploy
kubectl -n aibrix-system apply -f docs/development/app/gateway-plugin.yaml
```

Check status
Expand Down Expand Up @@ -73,11 +84,43 @@ curl -v http://localhost:8888/v1/chat/completions \
"messages": [{"role": "user", "content": "Say this is a test!"}],
"temperature": 0.7
}'

# least-request based
for i in {1..10}; do
curl -v http://localhost:8888/v1/chat/completions \
-H "user: your-user-name" \
-H "routing-strategy: least-request" \
-H "model: llama2-70b" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer any_key" \
-d '{
"model": "llama2-70b",
"messages": [{"role": "user", "content": "Say this is a test!"}],
"temperature": 0.7
}' &
done

# throughput based
for i in {1..10}; do
curl -v http://localhost:8888/v1/chat/completions \
-H "user: your-user-name" \
-H "routing-strategy: throughput" \
-H "model: llama2-70b" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer any_key" \
-d '{
"model": "llama2-70b",
"messages": [{"role": "user", "content": "Say this is a test!"}],
"temperature": 0.7
}' &
done
```


Delete envoy gateway and corresponding objects
```shell
kubectl -n aibrix-system delete -f docs/development/app/gateway-plugin.yaml
OR
make undeploy && make uninstall
```

Expand Down
6 changes: 3 additions & 3 deletions docs/development/app/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ roleRef:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
namespace: aibrix-system
name: deployment-reader
rules:
- apiGroups: ["apps"]
Expand All @@ -110,11 +110,11 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: deployment-reader-binding
namespace: default
namespace: aibrix-system
subjects:
- kind: ServiceAccount
name: default
namespace: default
namespace: aibrix-system
roleRef:
kind: Role
name: deployment-reader
Expand Down
47 changes: 47 additions & 0 deletions docs/development/app/gateway-plugin.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
apiVersion: v1
kind: Service
metadata:
name: aibrix-gateway-plugins
namespace: aibrix-system
spec:
selector:
app: aibrix-gateway-plugins
ports:
- protocol: TCP
port: 50052
targetPort: 50052
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: aibrix-gateway-plugins
namespace: aibrix-system
spec:
replicas: 1
selector:
matchLabels:
app: aibrix-gateway-plugins
template:
metadata:
labels:
app: aibrix-gateway-plugins
spec:
containers:
- name: golang-app-container
image: aibrix/plugins:v0.1.0
ports:
- containerPort: 50052
env:
- name: REDIS_HOST
value: aibrix-redis-master
- name: REDIS_PORT
value: "6379"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace

78 changes: 66 additions & 12 deletions pkg/plugins/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
openai "github.com/sashabaranov/go-openai"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
resp = s.HandleResponseHeaders(req, targetPodIP)

case *extProcPb.ProcessingRequest_ResponseBody:
resp = s.HandleResponseBody(ctx, req, user)
resp = s.HandleResponseBody(ctx, req, user, targetPodIP)

default:
log.Printf("Unknown Request type %+v\n", v)
Expand All @@ -109,33 +110,32 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

func (s *Server) HandleRequestHeaders(ctx context.Context, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, string, string) {
log.Println("--- In RequestHeaders processing ...")
var user, model, targetPodIP string
var user, model, routingStrategy, targetPodIP string
r := req.Request
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)

log.Printf("Headers: %+v\n", h)
log.Printf("EndOfStream: %v\n", h.RequestHeaders.EndOfStream)

for _, n := range h.RequestHeaders.Headers.Headers {
if strings.ToLower(n.Key) == "user" {
user = string(n.RawValue)
}
if strings.ToLower(n.Key) == "model" {
model = string(n.RawValue)
}
if strings.ToLower(n.Key) == "routing-strategy" {
routingStrategy = string(n.RawValue)
}
if strings.ToLower(n.Key) == "target-pod" {
targetPodIP = string(n.RawValue)
}
}

klog.Infof("user: %v", user)

// TODO (varun): add check if user exists in backend storage
// if no user name present in the request headers
if user == "" {
klog.Infoln("user does not exists")
return nil, user, targetPodIP
}
klog.Infof("user: %v", user)
code, err := s.checkRPM(ctx, user)
if err != nil {
return &extProcPb.ProcessingResponse{
Expand Down Expand Up @@ -211,9 +211,21 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, req *extProcPb.Proces
}, user, targetPodIP
}

// TODO (varun): evaluate how to enable selection of routing algorithm
route := routing.NewRandomRouter()
targetPodIP, _ = route.Get(ctx, pods.Items)
targetPodIP, err = s.SelectTargetPod(ctx, routingStrategy, pods.Items)
if err != nil {
return &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_InternalServerError,
},
Details: err.Error(),
Body: "error on selecting target pod",
},
},
}, user, targetPodIP
}

headers := []*configPb.HeaderValueOption{{
Header: &configPb.HeaderValue{
Key: "x-went-into-req-headers",
Expand All @@ -227,6 +239,22 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, req *extProcPb.Proces
RawValue: []byte(targetPodIP),
},
})

reqCount, err := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_REQUEST_COUNT", targetPodIP), 1)
if err != nil {
return &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_InternalServerError,
},
Details: err.Error(),
Body: "error on updating request count",
},
},
}, user, targetPodIP
}
klog.Infof("RequestStart: SelectedTargetPodIP: %s, PodRequestCount: %v", targetPodIP, reqCount)
}

resp := &extProcPb.ProcessingResponse{
Expand Down Expand Up @@ -308,7 +336,7 @@ func (s *Server) HandleResponseHeaders(req *extProcPb.ProcessingRequest, targetP
}
}

func (s *Server) HandleResponseBody(ctx context.Context, req *extProcPb.ProcessingRequest, user string) *extProcPb.ProcessingResponse {
func (s *Server) HandleResponseBody(ctx context.Context, req *extProcPb.ProcessingRequest, user string, targetPodIP string) *extProcPb.ProcessingResponse {
log.Println("--- In ResponseBody processing")

r := req.Request
Expand Down Expand Up @@ -358,6 +386,15 @@ func (s *Server) HandleResponseBody(ctx context.Context, req *extProcPb.Processi
}
klog.Infof("Updated RPM: %v, TPM: %v for user: %v", rpm, tpm, user)

if targetPodIP != "" {
podTpm, err := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_THROUGHPUT", targetPodIP), int64(res.Usage.TotalTokens))
if err != nil {
klog.Error(err)
} else {
klog.Infof("RequestEnd: SelectedTargetPodIP: %s, PodThroughput: %v", targetPodIP, podTpm)
}
}

return &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
ResponseBody: &extProcPb.BodyResponse{
Expand Down Expand Up @@ -397,7 +434,7 @@ func (s *Server) checkRPM(ctx context.Context, user string) (envoyTypePb.StatusC
}
klog.Infof("rmpCurrent: %v, rpmLimit: %v", rpmCurrent, rpmLimit)
if rpmCurrent >= rpmLimit {
err := fmt.Errorf("requests per limit of:%v, reached for user: %v", rpmLimit, user)
err := fmt.Errorf("requests per limit of: %v, reached for user: %v", rpmLimit, user)
klog.Errorln(err)
return envoyTypePb.StatusCode_TooManyRequests, err
}
Expand Down Expand Up @@ -425,3 +462,20 @@ func (s *Server) checkTPM(ctx context.Context, user string) (envoyTypePb.StatusC

return envoyTypePb.StatusCode_OK, nil
}

func (s *Server) SelectTargetPod(ctx context.Context, routingStrategy string, pods []corev1.Pod) (string, error) {
// TODO (varun): evaluate how to enable selection of routing algorithm
var route routing.Router
switch routingStrategy {
case "random":
route = routing.NewRandomRouter()
case "least-request":
route = routing.NewLeastRequestRouter(s.ratelimiter)
case "throughput":
route = routing.NewThroughputRouter(s.ratelimiter)
default:
return "", nil
}

return route.Get(ctx, pods)
}
57 changes: 57 additions & 0 deletions pkg/plugins/gateway/routing_algorithms/least_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package routingalgorithms

import (
"context"
"fmt"
"math"

ratelimiter "github.com/aibrix/aibrix/pkg/plugins/gateway/rate_limiter"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

type leastRequestRouter struct {
ratelimiter ratelimiter.AccountRateLimiter
}

func NewLeastRequestRouter(ratelimiter ratelimiter.AccountRateLimiter) Router {
return leastRequestRouter{
ratelimiter: ratelimiter,
}
}

func (r leastRequestRouter) Get(ctx context.Context, pods []v1.Pod) (string, error) {
var targetPodIP string
minCount := math.MaxInt

for _, pod := range pods {
podIP := pod.Status.PodIP + ":8000"
reqCount, err := r.ratelimiter.Get(ctx, fmt.Sprintf("%v_REQUEST_COUNT", podIP))
if err != nil {
return "", err
}
klog.Infof("PodIP: %s, PodRequestCount: %v", podIP, reqCount)
if reqCount <= int64(minCount) {
minCount = int(reqCount)
targetPodIP = podIP
}
}

return targetPodIP, nil // TODO (varun): remove static port
}
Loading