Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

CloudEvents integration #404

Merged
merged 64 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
cf400e5
Update Event type struct
RaeesBhatti Mar 29, 2018
81d60cb
go fmt
RaeesBhatti Mar 29, 2018
51df7bd
Fix linting
RaeesBhatti Mar 29, 2018
c0bdb89
Fix a type
RaeesBhatti Mar 29, 2018
a2ce635
Update the schema
RaeesBhatti Mar 30, 2018
905fde5
Update README to reflect the CloudEvent
RaeesBhatti Mar 30, 2018
03fd349
Merge branch 'master' into cloudevents
RaeesBhatti Mar 30, 2018
a5e7cea
Write test for CloudEvents
RaeesBhatti Mar 30, 2018
e9df992
Merge branch 'master' into cloudevents
RaeesBhatti Apr 2, 2018
8ec5042
Update README to move definitons for API
RaeesBhatti Apr 2, 2018
3279ff6
Add reference links to API doc
RaeesBhatti Apr 2, 2018
762cff0
Reformat a bit
RaeesBhatti Apr 3, 2018
ce49e51
Update docs
RaeesBhatti Apr 3, 2018
4fb5991
Docs: Remove CloudEvents definition fields
RaeesBhatti Apr 3, 2018
7060f07
Add link to CloudEvents
RaeesBhatti Apr 3, 2018
579655f
var notation
RaeesBhatti Apr 3, 2018
46f2a6d
Try to parse custom events as CloudEvent
RaeesBhatti Apr 3, 2018
e112c82
Fix CloudEvent parsing
RaeesBhatti Apr 3, 2018
8363564
Update gometalinter cyclo-over param
RaeesBhatti Apr 3, 2018
1625a02
Add validation to Event struct
RaeesBhatti Apr 4, 2018
38978ab
Add default Source because it is required
RaeesBhatti Apr 4, 2018
c2ded5e
Move some checks out of parseCustomEventAsCloudEvent
RaeesBhatti Apr 4, 2018
88f54d7
Revert "Update gometalinter cyclo-over param"
RaeesBhatti Apr 4, 2018
0079e78
Move event parsing to Event.New func
RaeesBhatti Apr 4, 2018
42ad46c
Create NewHTTPEvent func
RaeesBhatti Apr 4, 2018
0c80f71
Fix a test
RaeesBhatti Apr 4, 2018
fd4178c
Change convention
RaeesBhatti Apr 4, 2018
ab0a8a0
Camelcase
RaeesBhatti Apr 4, 2018
ecb4674
Change convention
RaeesBhatti Apr 4, 2018
4400d63
Change comment to reflect that we're using CloudEvents format
RaeesBhatti Apr 4, 2018
a244ad5
Add comment ot NewHTTPEvent
RaeesBhatti Apr 4, 2018
92a8069
Update a comment
RaeesBhatti Apr 4, 2018
57a9646
Update source field in API docs
RaeesBhatti Apr 4, 2018
d477bc0
Validate Event struct
RaeesBhatti Apr 4, 2018
92ec35f
Add tests for event/event.go
RaeesBhatti Apr 4, 2018
c065b69
Correct a test
RaeesBhatti Apr 4, 2018
7bf1dfd
Fix a typo bug in Event.New
RaeesBhatti Apr 4, 2018
8dae643
Revert "Revert "Update gometalinter cyclo-over param""
RaeesBhatti Apr 4, 2018
423a226
Move TransformHeaders function
RaeesBhatti Apr 5, 2018
f6e8f21
add tests for internal http package
Apr 5, 2018
53102a4
add tests for event package
Apr 5, 2018
a44628c
Add extensions field if custom event not cloudevent
RaeesBhatti Apr 6, 2018
8e008f5
Update field names to camelcase
RaeesBhatti Apr 9, 2018
a2f5f24
Update some field names
RaeesBhatti Apr 9, 2018
bfbaeec
Update event example
RaeesBhatti Apr 9, 2018
22a27b0
Accept source param in new event func
RaeesBhatti Apr 9, 2018
614e9c3
Fix linter warning
RaeesBhatti Apr 10, 2018
9b48a3e
Fix test to include tranformationversion anchor
RaeesBhatti Apr 10, 2018
e4984f4
Change a field name
RaeesBhatti Apr 10, 2018
f1df8b5
Declare transformationVersion as const
RaeesBhatti Apr 10, 2018
a5441c4
Fix a loop
RaeesBhatti Apr 10, 2018
ba7969c
Fix tests
RaeesBhatti Apr 10, 2018
e4c9714
Use const
RaeesBhatti Apr 10, 2018
736037a
Use a fixed value for Source
RaeesBhatti Apr 11, 2018
cb04faa
Use pkg constant in test
RaeesBhatti Apr 11, 2018
e025f7e
Update source value in tests
RaeesBhatti Apr 11, 2018
9d0aa38
Update source in docs
RaeesBhatti Apr 11, 2018
00a1ca2
Move TransformationVersion const up
RaeesBhatti Apr 11, 2018
f1df44c
Ignore test generated files
RaeesBhatti Apr 11, 2018
b9b2efc
add more tests
Apr 5, 2018
afc1ba0
minor cleanup
Apr 12, 2018
7de8e5f
add omitempty tag for optional fields
Apr 12, 2018
f7242a3
don't log empty fields
Apr 12, 2018
f4ae257
rename var
Apr 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,27 @@ for invoking function. By default Events API runs on `:4000` port.

### Event Definition

All data that passes through the Event Gateway is formatted as an Event, based on our default Event schema:
All data that passes through the Event Gateway is formatted as an CloudEvent, based on CloudEvent schema:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"as a CloudEvent"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify spec version here.


* `event` - `string` - the event name
* `id` - `string` - the event's instance universally unique ID (provided by the event gateway)
* `receivedAt` - `number` - the time (milliseconds) when the Event was received by the Event Gateway (provided by the event gateway)
* `data` - type depends on `dataType` - the event payload
* `dataType` - `string` - the mime type of `data` payload
* `event-type` - `string` - the event name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove list of fields from here and leave only example and link to original spec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to remove the example too or just this list of fields?

* `event-id` - `string` - the event's instance universally unique ID (provided by the event gateway)
* `cloud-events-version` - `string` - the version of CloudEvent definition
* `source` - `string` - source for the event
* `event-time` - `string` - RFC 3339 formatted time when the Event was received by the Event Gateway (provided by the event gateway)
* `data` - type depends on `content-type` - the event payload
* `content-type` - `string` - the mime type of `data` payload

Example:

```json
{
"event": "myapp.user.created",
"id": "66dfc31d-6844-42fd-b1a7-a489a49f65f3",
"receivedAt": 1500897327098,
"event-type": "myapp.user.created",
"event-id": "66dfc31d-6844-42fd-b1a7-a489a49f65f3",
"cloud-events-version": "0.1",
"source": "", // TBD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need to be updated.

"event-time": "1990-12-31T23:59:60Z",
"data": { "foo": "bar" },
"dataType": "application/json"
"content-type": "application/json"
}
```

Expand Down
43 changes: 28 additions & 15 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,34 @@ import (
"go.uber.org/zap/zapcore"

uuid "github.com/satori/go.uuid"
"github.com/serverless/event-gateway/internal/zap"
)

// Event is a default event structure. All data that passes through the Event Gateway is formatted as an Event, based on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comment here that this struct is based on CloudEvents spec?

// this schema.
type Event struct {
Type Type `json:"event"`
ID string `json:"id"`
ReceivedAt uint64 `json:"receivedAt"`
Data interface{} `json:"data"`
DataType string `json:"dataType"`
EventType Type `json:"event-type"`
EventTypeVersion string `json:"event-type-version"`
CloudEventsVersion string `json:"cloud-events-version"`
Source string `json:"source"`
EventID string `json:"event-id"`
EventTime time.Time `json:"event-time"`
SchemaURL string `json:"schema-url"`
ContentType string `json:"content-type"`
Extensions zap.MapStringInterface `json:"extensions"`
Data interface{} `json:"data"`
}

// New return new instance of Event.
func New(eventType Type, mime string, payload interface{}) *Event {
return &Event{
Type: eventType,
ID: uuid.NewV4().String(),
ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
DataType: mime,
Data: payload,
EventType: eventType,
CloudEventsVersion: "0.1",
Source: "",
EventID: uuid.NewV4().String(),
EventTime: time.Now(),
ContentType: mime,
Data: payload,
}
}

Expand All @@ -42,17 +50,22 @@ const TypeHTTP = Type("http")

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface
func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("type", string(e.Type))
enc.AddString("id", e.ID)
enc.AddUint64("receivedAt", e.ReceivedAt)
enc.AddString("event-type", string(e.EventType))
enc.AddString("event-type-version", e.EventTypeVersion)
enc.AddString("cloud-events-version", e.CloudEventsVersion)
enc.AddString("source", e.Source)
enc.AddString("event-id", e.EventID)
enc.AddString("event-time", e.EventTime.String())
enc.AddString("schema-url", e.SchemaURL)
enc.AddString("content-type", e.ContentType)
e.Extensions.MarshalLogObject(enc)
payload, _ := json.Marshal(e.Data)
enc.AddString("data", string(payload))
enc.AddString("dataType", e.DataType)

return nil
}

// IsSystem indicates if th event is a system event.
func (e Event) IsSystem() bool {
return strings.HasPrefix(string(e.Type), "gateway.")
return strings.HasPrefix(string(e.EventType), "gateway.")
}
21 changes: 20 additions & 1 deletion internal/zap/strings.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
package zap

import "go.uber.org/zap/zapcore"
import (
"encoding/json"
"go.uber.org/zap/zapcore"
)

// Strings is a string array that implements MarshalLogArray.
type Strings []string

// MapStringInterface is a map that implements MarshalLogObject.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move it under MarshalLogArray function

type MapStringInterface map[string]interface{}

// MarshalLogArray implementation
func (ss Strings) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, s := range ss {
enc.AppendString(s)
}
return nil
}

// MarshalLogObject implementation
func (msi MapStringInterface) MarshalLogObject(enc zapcore.ObjectEncoder) error {
for key, val := range msi {
v, err := json.Marshal(val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need json.Marshal here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because val is type interface{}, we cannot use it in enc.AddString()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not casting to string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because It can be a nested object or an array.

if err != nil {
enc.AddString(key, string(v))
} else {
return err
}
}
return nil
}
4 changes: 2 additions & 2 deletions plugin/example/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func (s *Simple) Subscriptions() []plugin.Subscription {

// React is called for every event that plugin subscribed to.
func (s *Simple) React(instance event.Event) error {
switch instance.Type {
switch instance.EventType {
case event.SystemEventReceivedType:
received := instance.Data.(event.SystemEventReceivedData)
log.Printf("received gateway.received.event for event: %q", received.Event.Type)
log.Printf("received gateway.received.event for event: %q", received.Event.EventType)
break
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (m *Manager) Kill() {
func (m *Manager) React(event *event.Event) error {
for _, plugin := range m.Plugins {
for _, subscription := range plugin.Subscriptions {
if subscription.EventType == event.Type {
if subscription.EventType == event.EventType {
err := plugin.Reacter.React(*event)
if err != nil {
m.Log.Debug("Plugin returned error.",
Expand Down
2 changes: 1 addition & 1 deletion router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (router *Router) eventFromRequest(r *http.Request) (*eventpkg.Event, string
}
}

if event.Type == eventpkg.TypeHTTP {
if event.EventType == eventpkg.TypeHTTP {
event.Data = &eventpkg.HTTPEvent{
Headers: headers,
Query: r.URL.Query(),
Expand Down
8 changes: 4 additions & 4 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if event.Type == eventpkg.TypeInvoke {
if event.EventType == eventpkg.TypeInvoke {
functionID := function.ID(r.Header.Get(headerFunctionID))
space := r.Header.Get(headerSpace)
if space == "" {
Expand All @@ -100,7 +100,7 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {

metricEventsProcessed.WithLabelValues(space, "invoke").Inc()
} else if !event.IsSystem() {
reportReceivedEvent(event.ID)
reportReceivedEvent(event.EventID)

router.enqueueWork(path, event)
w.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -447,9 +447,9 @@ func (router *Router) loop() {

// processEvent call all functions subscribed for an event
func (router *Router) processEvent(e backlogEvent) {
reportEventOutOfQueue(e.event.ID)
reportEventOutOfQueue(e.event.EventID)

subscribers := router.targetCache.SubscribersOfEvent(e.path, e.event.Type)
subscribers := router.targetCache.SubscribersOfEvent(e.path, e.event.EventType)
for _, subscriber := range subscribers {
router.callFunction(subscriber.Space, subscriber.ID, e.event)
}
Expand Down
38 changes: 38 additions & 0 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,44 @@ func TestRouterServeHTTP_Encoding(t *testing.T) {
}
}

func TestRouterServeHTTP_CloudEvents(t *testing.T) {
var contentType = "application/json"
ctrl := gomock.NewController(t)
defer ctrl.Finish()
testListServer := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
testevent := event.Event{}
json.NewDecoder(r.Body).Decode(&testevent)
defer r.Body.Close()

assert.Equal(t, testevent.EventType, event.TypeHTTP)
assert.Equal(t, testevent.CloudEventsVersion, "0.1")
assert.NotEqual(t, testevent.Source, nil)
assert.NotEqual(t, len(testevent.EventID), 0)
assert.NotEqual(t, testevent.EventTime, nil)
assert.Equal(t, testevent.ContentType, contentType)
}))
defer testListServer.Close()
target := mock.NewMockTargeter(ctrl)
someFunc := function.Function{
Space: "",
ID: "somefunc",
ProviderType: httpprovider.Type,
Provider: httpprovider.HTTP{
URL: testListServer.URL,
},
}
target.EXPECT().HTTPBackingFunction(http.MethodPost, "/").Return("", &someFunc.ID, pathtree.Params{}, nil)
target.EXPECT().Function("", someFunc.ID).Return(&someFunc)
target.EXPECT().SubscribersOfEvent(gomock.Any(), gomock.Any()).Return([]router.FunctionInfo{}).MaxTimes(3)
router := testrouter(target)

req, _ := http.NewRequest(http.MethodPost, "/", strings.NewReader(`{"some": "thing"}`))
req.Header.Set("content-type", contentType)
recorder := httptest.NewRecorder()
router.ServeHTTP(recorder, req)
}

func TestRouterServeHTTP_ErrorOnCustomEventEmittedWithNonPostMethod(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down