|
13 | 13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | 14 | // See the License for the specific language governing permissions and
|
15 | 15 | // limitations under the License.
|
| 16 | +// Copyright 2021 Nitric Pty Ltd. |
| 17 | + |
16 | 18 | package run
|
| 19 | + |
| 20 | +import ( |
| 21 | + "strings" |
| 22 | + "time" |
| 23 | + |
| 24 | + "github.com/fasthttp/router" |
| 25 | + "github.com/valyala/fasthttp" |
| 26 | + |
| 27 | + "github.com/nitrictech/nitric/pkg/plugins/gateway" |
| 28 | + "github.com/nitrictech/nitric/pkg/triggers" |
| 29 | + "github.com/nitrictech/nitric/pkg/utils" |
| 30 | + "github.com/nitrictech/nitric/pkg/worker" |
| 31 | +) |
| 32 | + |
| 33 | +type HttpMiddleware func(*fasthttp.RequestCtx, worker.WorkerPool) bool |
| 34 | + |
| 35 | +type BaseHttpGateway struct { |
| 36 | + address string |
| 37 | + server *fasthttp.Server |
| 38 | + gateway.UnimplementedGatewayPlugin |
| 39 | + |
| 40 | + pool worker.WorkerPool |
| 41 | +} |
| 42 | + |
| 43 | +//func apiWorkerFilter (apiName string) func(w worker.Worker) bool { |
| 44 | +// return func(w worker.Worker) bool { |
| 45 | +// if api, ok := w.(*worker.RouteWorker); ok { |
| 46 | +// return api.Api() == apiName |
| 47 | +// } |
| 48 | + |
| 49 | +// return false |
| 50 | +// } |
| 51 | +//} |
| 52 | + |
| 53 | +func (s *BaseHttpGateway) api(ctx *fasthttp.RequestCtx) { |
| 54 | + apiName := ctx.UserValue("name") |
| 55 | + // Rewrite the URL of the request to remove the /api/{name} subroute |
| 56 | + pathParts := utils.SplitPath(string(ctx.Path())) |
| 57 | + // remove first two path parts |
| 58 | + newPathParts := pathParts[2:] |
| 59 | + |
| 60 | + newPath := strings.Join(newPathParts, "/") |
| 61 | + |
| 62 | + // Rewrite the path |
| 63 | + ctx.URI().SetPath(newPath) |
| 64 | + |
| 65 | + httpReq := triggers.FromHttpRequest(ctx) |
| 66 | + |
| 67 | + s.pool.GetWorker(&worker.GetWorkerOptions{ |
| 68 | + Http: httpReq, |
| 69 | + //Filter: apiWorkerFilter(apiName), |
| 70 | + }) |
| 71 | + |
| 72 | + // Filter workers by a specific named API |
| 73 | + |
| 74 | +} |
| 75 | + |
| 76 | +func (s *BaseHttpGateway) schedule(ctx *fasthttp.RequestCtx) { |
| 77 | + scheduleName := ctx.UserValue("name") |
| 78 | + // Filter workers by schedule workers |
| 79 | +} |
| 80 | + |
| 81 | +//func (s *BaseHttpGateway) httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) { |
| 82 | +// return func(ctx *fasthttp.RequestCtx) { |
| 83 | +// if s.mw != nil { |
| 84 | +// if !s.mw(ctx, pool) { |
| 85 | +// // middleware has indicated that is has processed the request |
| 86 | +// // so we can exit here |
| 87 | +// return |
| 88 | +// } |
| 89 | +// } |
| 90 | + |
| 91 | +// httpTrigger := triggers.FromHttpRequest(ctx) |
| 92 | +// wrkr, err := pool.GetWorker(&worker.GetWorkerOptions{ |
| 93 | +// Http: httpTrigger, |
| 94 | +// }) |
| 95 | + |
| 96 | +// if err != nil { |
| 97 | +// ctx.Error("Unable to get worker to handle request", 500) |
| 98 | +// return |
| 99 | +// } |
| 100 | + |
| 101 | +// response, err := wrkr.HandleHttpRequest(httpTrigger) |
| 102 | + |
| 103 | +// if err != nil { |
| 104 | +// ctx.Error(fmt.Sprintf("Error handling HTTP Request: %v", err), 500) |
| 105 | +// return |
| 106 | +// } |
| 107 | + |
| 108 | +// if response.Header != nil { |
| 109 | +// response.Header.CopyTo(&ctx.Response.Header) |
| 110 | +// } |
| 111 | + |
| 112 | +// // Avoid content length header duplication |
| 113 | +// ctx.Response.Header.Del("Content-Length") |
| 114 | +// ctx.Response.SetStatusCode(response.StatusCode) |
| 115 | +// ctx.Response.SetBody(response.Body) |
| 116 | +// } |
| 117 | +//} |
| 118 | + |
| 119 | +func (s *BaseHttpGateway) Start(pool worker.WorkerPool) error { |
| 120 | + s.pool = pool |
| 121 | + |
| 122 | + // Setup routes |
| 123 | + r := router.New() |
| 124 | + // Make a request for an API gateway |
| 125 | + r.ANY("/apis/{name}/{any:*}", s.api) |
| 126 | + // TODO: Make a request to a specific registered function |
| 127 | + // r.ANY("/function/{name}/{any:*}", s.function) |
| 128 | + // Make a request to trigger a schedule |
| 129 | + r.POST("/schedules/{name}", s.schedule) |
| 130 | + |
| 131 | + s.server = &fasthttp.Server{ |
| 132 | + IdleTimeout: time.Second * 1, |
| 133 | + CloseOnShutdown: true, |
| 134 | + Handler: r.Handler, |
| 135 | + } |
| 136 | + |
| 137 | + return s.server.ListenAndServe(s.address) |
| 138 | +} |
| 139 | + |
| 140 | +func (s *BaseHttpGateway) Stop() error { |
| 141 | + if s.server != nil { |
| 142 | + return s.server.Shutdown() |
| 143 | + } |
| 144 | + return nil |
| 145 | +} |
| 146 | + |
| 147 | +// Create new HTTP gateway |
| 148 | +// XXX: No External Args for function atm (currently the plugin loader does not pass any argument information) |
| 149 | +func New(mw HttpMiddleware) (gateway.GatewayService, error) { |
| 150 | + address := utils.GetEnv("GATEWAY_ADDRESS", ":9001") |
| 151 | + |
| 152 | + return &BaseHttpGateway{ |
| 153 | + address: address, |
| 154 | + }, nil |
| 155 | +} |
0 commit comments