-
Notifications
You must be signed in to change notification settings - Fork 98
CloudEvents integration #404
Changes from 55 commits
cf400e5
81d60cb
51df7bd
c0bdb89
a2ce635
905fde5
03fd349
a5e7cea
e9df992
8ec5042
3279ff6
762cff0
ce49e51
4fb5991
7060f07
579655f
46f2a6d
e112c82
8363564
1625a02
38978ab
c2ded5e
88f54d7
0079e78
42ad46c
0c80f71
fd4178c
ab0a8a0
ecb4674
4400d63
a244ad5
92a8069
57a9646
d477bc0
92ec35f
c065b69
7bf1dfd
8dae643
423a226
f6e8f21
53102a4
a44628c
8e008f5
a2f5f24
bfbaeec
22a27b0
614e9c3
9b48a3e
e4984f4
f1df8b5
a5441c4
ba7969c
e4c9714
736037a
cb04faa
e025f7e
9d0aa38
00a1ca2
f1df44c
b9b2efc
afc1ba0
7de8e5f
f7242a3
f4ae257
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,23 +16,19 @@ for invoking function. By default Events API runs on `:4000` port. | |
|
||
### Event Definition | ||
|
||
All data that passes through the Event Gateway is formatted as an Event, based on our default Event schema: | ||
|
||
* `event` - `string` - the event name | ||
* `id` - `string` - the event's instance universally unique ID (provided by the event gateway) | ||
* `receivedAt` - `number` - the time (milliseconds) when the Event was received by the Event Gateway (provided by the event gateway) | ||
* `data` - type depends on `dataType` - the event payload | ||
* `dataType` - `string` - the mime type of `data` payload | ||
All data that passes through the Event Gateway is formatted as a CloudEvent, based on [CloudEvents v0.1 schema](https://github.com/cloudevents/spec/blob/master/spec.md): | ||
|
||
Example: | ||
|
||
```json | ||
{ | ||
"event": "myapp.user.created", | ||
"id": "66dfc31d-6844-42fd-b1a7-a489a49f65f3", | ||
"receivedAt": 1500897327098, | ||
"eventType": "myapp.user.created", | ||
"eventID": "66dfc31d-6844-42fd-b1a7-a489a49f65f3", | ||
"cloudEventsVersion": "0.1", | ||
"source": "https://slsgateway.com/", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @elsteelbrain please update the example |
||
"eventTime": "1990-12-31T23:59:60Z", | ||
"data": { "foo": "bar" }, | ||
"dataType": "application/json" | ||
"contentType": "application/json" | ||
} | ||
``` | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,57 +2,136 @@ package event | |
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"strings" | ||
"time" | ||
|
||
"go.uber.org/zap/zapcore" | ||
|
||
uuid "github.com/satori/go.uuid" | ||
"github.com/serverless/event-gateway/internal/zap" | ||
validator "gopkg.in/go-playground/validator.v9" | ||
) | ||
|
||
// Event is a default event structure. All data that passes through the Event Gateway is formatted as an Event, based on | ||
// this schema. | ||
// Type uniquely identifies an event type. | ||
type Type string | ||
|
||
// TypeInvoke is a special type of event for sync function invocation. | ||
const TypeInvoke = Type("invoke") | ||
|
||
// TypeHTTP is a special type of event for sync http subscriptions. | ||
const TypeHTTP = Type("http") | ||
|
||
const ( | ||
mimeJSON = "application/json" | ||
mimeFormMultipart = "multipart/form-data" | ||
mimeFormURLEncoded = "application/x-www-form-urlencoded" | ||
TransformationVersion = "0.1" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's define it outside of this mime const, under Type consts. |
||
) | ||
|
||
// Event is a default event structure. All data that passes through the Event Gateway | ||
// is formatted to a format defined CloudEvents v0.1 spec. | ||
type Event struct { | ||
Type Type `json:"event"` | ||
ID string `json:"id"` | ||
ReceivedAt uint64 `json:"receivedAt"` | ||
Data interface{} `json:"data"` | ||
DataType string `json:"dataType"` | ||
EventType Type `json:"eventType" validate:"required"` | ||
EventTypeVersion string `json:"eventTypeVersion"` | ||
CloudEventsVersion string `json:"cloudEventsVersion" validate:"required"` | ||
Source string `json:"source" validate:"url,required"` | ||
EventID string `json:"eventID" validate:"required"` | ||
EventTime time.Time `json:"eventTime"` | ||
SchemaURL string `json:"schemaURL"` | ||
ContentType string `json:"contentType"` | ||
Extensions zap.MapStringInterface `json:"extensions"` | ||
Data interface{} `json:"data"` | ||
} | ||
|
||
// New return new instance of Event. | ||
func New(eventType Type, mime string, payload interface{}) *Event { | ||
return &Event{ | ||
Type: eventType, | ||
ID: uuid.NewV4().String(), | ||
ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)), | ||
DataType: mime, | ||
Data: payload, | ||
event := &Event{ | ||
EventType: eventType, | ||
CloudEventsVersion: "0.1", | ||
Source: "https://serverless.com/event-gateway/#transformationVersion=" + TransformationVersion, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The correct address for the page is |
||
EventID: uuid.NewV4().String(), | ||
EventTime: time.Now(), | ||
ContentType: mime, | ||
Data: payload, | ||
} | ||
} | ||
|
||
// Type uniquely identifies an event type. | ||
type Type string | ||
// it's a custom event, possibly CloudEvent | ||
if eventType != TypeHTTP && eventType != TypeInvoke { | ||
cloudEvent, err := parseAsCloudEvent(eventType, mime, payload) | ||
if err == nil { | ||
event = cloudEvent | ||
} else { | ||
event.Extensions = zap.MapStringInterface{ | ||
"eventgateway": map[string]interface{}{ | ||
"transformed": true, | ||
"transformation-version": TransformationVersion, | ||
}, | ||
} | ||
} | ||
} | ||
|
||
// TypeInvoke is a special type of event for sync function invocation. | ||
const TypeInvoke = Type("invoke") | ||
// Because event.Data is []bytes here, it will be base64 encoded by default when being sent to remote function, | ||
// which is why we change the event.Data type to "string" for forms, so that, it is left intact. | ||
if eventbody, ok := event.Data.([]byte); ok && len(eventbody) > 0 { | ||
switch { | ||
case mime == mimeJSON: | ||
json.Unmarshal(eventbody, &event.Data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you verify that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be already parsed as this is how AWS Lambda integration with AWS services looks like. That's why we need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JSON is kind of first class citizen in EG. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
case strings.HasPrefix(mime, mimeFormMultipart), mime == mimeFormURLEncoded: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is used in different PR. We'll import that when we merge, or do you want me to merge it now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, sorry, yeah, let's leave it out here. We will merge it as a part of this second PR. |
||
event.Data = string(eventbody) | ||
} | ||
} | ||
|
||
// TypeHTTP is a special type of event for sync http subscriptions. | ||
const TypeHTTP = Type("http") | ||
return event | ||
} | ||
|
||
// MarshalLogObject is a part of zapcore.ObjectMarshaler interface | ||
func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error { | ||
enc.AddString("type", string(e.Type)) | ||
enc.AddString("id", e.ID) | ||
enc.AddUint64("receivedAt", e.ReceivedAt) | ||
enc.AddString("eventType", string(e.EventType)) | ||
enc.AddString("eventTypeVersion", e.EventTypeVersion) | ||
enc.AddString("cloudEventsVersion", e.CloudEventsVersion) | ||
enc.AddString("source", e.Source) | ||
enc.AddString("eventID", e.EventID) | ||
enc.AddString("eventTime", e.EventTime.String()) | ||
enc.AddString("schemaURL", e.SchemaURL) | ||
enc.AddString("contentType", e.ContentType) | ||
e.Extensions.MarshalLogObject(enc) | ||
payload, _ := json.Marshal(e.Data) | ||
enc.AddString("data", string(payload)) | ||
enc.AddString("dataType", e.DataType) | ||
|
||
return nil | ||
} | ||
|
||
// IsSystem indicates if th event is a system event. | ||
// IsSystem indicates if the event is a system event. | ||
func (e Event) IsSystem() bool { | ||
return strings.HasPrefix(string(e.Type), "gateway.") | ||
return strings.HasPrefix(string(e.EventType), "gateway.") | ||
} | ||
|
||
func parseAsCloudEvent(eventType Type, mime string, payload interface{}) (*Event, error) { | ||
if mime != mimeJSON { | ||
return nil, errors.New("content type is not json") | ||
} | ||
body, ok := payload.([]byte) | ||
if ok { | ||
validate := validator.New() | ||
|
||
customEvent := &Event{} | ||
err := json.Unmarshal(body, customEvent) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = validate.Struct(customEvent) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if eventType != customEvent.EventType { | ||
return nil, errors.New("wrong event type") | ||
} | ||
|
||
return customEvent, nil | ||
} | ||
|
||
return nil, errors.New("couldn't cast to []byte") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
package event_test | ||
|
||
import ( | ||
"testing" | ||
|
||
eventpkg "github.com/serverless/event-gateway/event" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/serverless/event-gateway/internal/zap" | ||
) | ||
|
||
func TestNew(t *testing.T) { | ||
for _, testCase := range newTests { | ||
result := eventpkg.New(testCase.eventType, testCase.mime, testCase.payload) | ||
|
||
assert.NotEqual(t, result.EventID, "") | ||
assert.Equal(t, testCase.expectedEvent.EventType, result.EventType) | ||
assert.Equal(t, testCase.expectedEvent.CloudEventsVersion, result.CloudEventsVersion) | ||
assert.Equal(t, testCase.expectedEvent.Source, result.Source) | ||
assert.Equal(t, testCase.expectedEvent.ContentType, result.ContentType) | ||
assert.Equal(t, testCase.expectedEvent.Data, result.Data) | ||
assert.Equal(t, testCase.expectedEvent.Extensions, result.Extensions) | ||
} | ||
} | ||
|
||
var newTests = []struct { | ||
eventType eventpkg.Type | ||
mime string | ||
payload interface{} | ||
expectedEvent eventpkg.Event | ||
}{ | ||
{ // not CloudEvent | ||
eventpkg.Type("user.created"), | ||
"application/json", | ||
[]byte("test"), | ||
eventpkg.Event{ | ||
EventType: eventpkg.Type("user.created"), | ||
CloudEventsVersion: eventpkg.TransformationVersion, | ||
Source: "https://slsgateway.com#transformationVersion=0.1", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tests have to be updated. |
||
ContentType: "application/json", | ||
Data: []byte("test"), | ||
Extensions: zap.MapStringInterface{ | ||
"eventgateway": map[string]interface{}{ | ||
"transformed": true, | ||
"transformation-version": eventpkg.TransformationVersion, | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ // System event | ||
eventpkg.Type("user.created"), | ||
"application/json", | ||
eventpkg.SystemEventReceivedData{}, | ||
eventpkg.Event{ | ||
EventType: eventpkg.Type("user.created"), | ||
CloudEventsVersion: eventpkg.TransformationVersion, | ||
Source: "https://slsgateway.com#transformationVersion=0.1", | ||
ContentType: "application/json", | ||
Data: eventpkg.SystemEventReceivedData{}, | ||
Extensions: zap.MapStringInterface{ | ||
"eventgateway": map[string]interface{}{ | ||
"transformed": true, | ||
"transformation-version": eventpkg.TransformationVersion, | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
// valid CloudEvent | ||
eventpkg.Type("user.created"), | ||
"application/json", | ||
[]byte(`{ | ||
"eventType": "user.created", | ||
"cloudEventsVersion": "`+ eventpkg.TransformationVersion +`", | ||
"source": "https://example.com/", | ||
"eventID": "6f6ada3b-0aa2-4b3c-989a-91ffc6405f11", | ||
"contentType": "text/plain", | ||
"data": "test" | ||
}`), | ||
eventpkg.Event{ | ||
EventType: eventpkg.Type("user.created"), | ||
CloudEventsVersion: eventpkg.TransformationVersion, | ||
Source: "https://example.com/", | ||
ContentType: "text/plain", | ||
Data: "test", | ||
}, | ||
}, | ||
{ | ||
// type mismatch | ||
eventpkg.Type("user.deleted"), | ||
"application/json", | ||
[]byte(`{ | ||
"eventType": "user.created", | ||
"cloudEventsVersion": "`+ eventpkg.TransformationVersion +`", | ||
"source": "https://example.com/", | ||
"eventID": "6f6ada3b-0aa2-4b3c-989a-91ffc6405f11", | ||
"contentType": "text/plain", | ||
"data": "test" | ||
}`), | ||
eventpkg.Event{ | ||
EventType: eventpkg.Type("user.deleted"), | ||
CloudEventsVersion: eventpkg.TransformationVersion, | ||
Source: "https://slsgateway.com#transformationVersion=0.1", | ||
ContentType: "application/json", | ||
Data: map[string]interface{}{ | ||
"eventType": "user.created", | ||
"cloudEventsVersion": eventpkg.TransformationVersion, | ||
"source": "https://example.com/", | ||
"eventID": "6f6ada3b-0aa2-4b3c-989a-91ffc6405f11", | ||
"contentType": "text/plain", | ||
"data": "test", | ||
}, | ||
Extensions: zap.MapStringInterface{ | ||
"eventgateway": map[string]interface{}{ | ||
"transformed": true, | ||
"transformation-version": eventpkg.TransformationVersion, | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
// invalid CloudEvent (missing required fields) | ||
eventpkg.Type("user.created"), | ||
"application/json", | ||
[]byte(`{ | ||
"eventType": "user.created", | ||
"cloudEventsVersion": "0.1" | ||
}`), | ||
eventpkg.Event{ | ||
EventType: eventpkg.Type("user.created"), | ||
CloudEventsVersion: eventpkg.TransformationVersion, | ||
Source: "https://slsgateway.com#transformationVersion=0.1", | ||
ContentType: "application/json", | ||
Data: map[string]interface{}{ | ||
"eventType": "user.created", | ||
"cloudEventsVersion": eventpkg.TransformationVersion, | ||
}, | ||
Extensions: zap.MapStringInterface{ | ||
"eventgateway": map[string]interface{}{ | ||
"transformed": true, | ||
"transformation-version": eventpkg.TransformationVersion, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package http | ||
|
||
import ( | ||
httppkg "net/http" | ||
"strings" | ||
) | ||
|
||
// FlattenHeader takes http.Header and flatten value array | ||
// (map[string][]string -> map[string]string) so it's easier | ||
// to access headers by user. | ||
func FlattenHeader(req httppkg.Header) map[string]string { | ||
headers := map[string]string{} | ||
for key, header := range req { | ||
headers[key] = header[0] | ||
if len(header) > 1 { | ||
headers[key] = strings.Join(header, ", ") | ||
} | ||
} | ||
|
||
return headers | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexdebrie can you talk a final look here. Should we add something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mthenw @elsteelbrain Docs are fine by me