Skip to content

Commit bae773b

Browse files
jyecuschtjholm
authored andcommitted
feat(run): Add local events plugin.
1 parent 7a93849 commit bae773b

File tree

2 files changed

+92
-0
lines changed

2 files changed

+92
-0
lines changed

pkg/run/events.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright Nitric Pty Ltd.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at:
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
package run
17+
18+
import (
19+
"encoding/json"
20+
"fmt"
21+
22+
"github.com/nitrictech/nitric/pkg/plugins/errors"
23+
"github.com/nitrictech/nitric/pkg/plugins/errors/codes"
24+
"github.com/nitrictech/nitric/pkg/plugins/events"
25+
"github.com/nitrictech/nitric/pkg/triggers"
26+
"github.com/nitrictech/nitric/pkg/worker"
27+
)
28+
29+
type WorkerPoolEventService struct {
30+
events.UnimplementedeventsPlugin
31+
pool worker.WorkerPool
32+
}
33+
34+
// Publish a message to a given topic
35+
func (s *WorkerPoolEventService) Publish(topic string, event *events.NitricEvent) error {
36+
newErr := errors.ErrorsWithScope(
37+
"WorkerPoolEventService.Publish",
38+
map[string]interface{}{
39+
"topic": topic,
40+
"event": event,
41+
},
42+
)
43+
44+
requestId := event.ID
45+
// payloadType := event.PayloadType
46+
payload := event.Payload
47+
48+
marshaledPayload, err := json.Marshal(payload)
49+
if err != nil {
50+
return newErr(
51+
codes.Internal,
52+
"error marshalling event payload",
53+
err,
54+
)
55+
}
56+
57+
evt := &triggers.Event{
58+
ID: requestId,
59+
Topic: topic,
60+
Payload: marshaledPayload,
61+
}
62+
63+
// get all scribers to this event
64+
targets := s.pool.GetWorkers(&worker.GetWorkerOptions{
65+
Event: evt,
66+
})
67+
68+
fmt.Println(fmt.Sprintf("Publishing event to: %s", targets))
69+
for _, target := range targets {
70+
err = target.HandleEvent(evt)
71+
if err != nil {
72+
// this is likely an error in the user's handler, we don't want it to bring the server down.
73+
// just log and move on.
74+
fmt.Println(err)
75+
}
76+
}
77+
78+
return nil
79+
}
80+
81+
// Create new Dev EventService
82+
func NewEvents(pool worker.WorkerPool) (events.EventService, error) {
83+
return &WorkerPoolEventService{
84+
pool: pool,
85+
}, nil
86+
}

pkg/run/run.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ func (l *localServices) Start() error {
114114
MaxWorkers: 100,
115115
})
116116

117+
ev, err := NewEvents(pool)
118+
if err != nil {
119+
return err
120+
}
121+
117122
// Start a new gateway plugin
118123
gw, err := NewGateway()
119124
if err != nil {
@@ -130,6 +135,7 @@ func (l *localServices) Start() error {
130135
StoragePlugin: sp,
131136
DocumentPlugin: dp,
132137
GatewayPlugin: gw,
138+
EventsPlugin: ev,
133139
Pool: pool,
134140
TolerateMissingServices: true,
135141
})

0 commit comments

Comments
 (0)