Skip to content

Commit ab9becb

Browse files
committed
fix: [TKC-5111] use env id for queue/start workflow events (#7339)
(cherry picked from commit 79d3565)
1 parent 3b59c95 commit ab9becb

File tree

8 files changed

+27
-22
lines changed

8 files changed

+27
-22
lines changed

cmd/api-server/services/controlplane.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func CreateControlPlane(ctx context.Context, cfg *config.Config, eventsEmitter *
6666
commands := controlplane.CreateCommands(cfg.StorageBucket, storageClient, testWorkflowOutputRepository, testWorkflowResultsRepository, artifactStorage)
6767

6868
enqueuer := scheduling.NewEnqueuer(log.DefaultLogger, testWorkflowsClient, testWorkflowTemplatesClient, testWorkflowResultsRepository, eventsEmitter,
69-
cfg.GlobalWorkflowTemplateName, cfg.GlobalWorkflowTemplateInline != "")
69+
envID, cfg.GlobalWorkflowTemplateName, cfg.GlobalWorkflowTemplateInline != "")
7070
scheduler := factory.NewScheduler()
7171
executionController := factory.NewExecutionController()
7272
executionQuerier := factory.NewExecutionQuerier()

pkg/api/v1/testkube/model_event_extended.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@ func NewEvent(t *EventType, resource *EventResource, id string) Event {
3131
}
3232
}
3333

34-
func NewEventQueueTestWorkflow(execution *TestWorkflowExecution) Event {
35-
groupId := ""
36-
if execution != nil {
37-
groupId = execution.GroupId
38-
}
39-
34+
func NewEventQueueTestWorkflow(execution *TestWorkflowExecution, groupId string) Event {
4035
return Event{
4136
Id: uuid.NewString(),
4237
GroupId: groupId,
@@ -45,12 +40,7 @@ func NewEventQueueTestWorkflow(execution *TestWorkflowExecution) Event {
4540
}
4641
}
4742

48-
func NewEventStartTestWorkflow(execution *TestWorkflowExecution) Event {
49-
groupId := ""
50-
if execution != nil {
51-
groupId = execution.GroupId
52-
}
53-
43+
func NewEventStartTestWorkflow(execution *TestWorkflowExecution, groupId string) Event {
5444
return Event{
5545
Id: uuid.NewString(),
5646
GroupId: groupId,

pkg/api/v1/testkube/model_event_extended_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,15 @@ func TestEvent_IsSuccess(t *testing.T) {
122122
}
123123
})
124124
}
125+
126+
func TestNewWorkflowEvents_UseExplicitRoutingGroup(t *testing.T) {
127+
t.Parallel()
128+
129+
execution := &TestWorkflowExecution{Id: "execution-1", GroupId: "execution-group-1"}
130+
131+
queueEvent := NewEventQueueTestWorkflow(execution, "env-123")
132+
startEvent := NewEventStartTestWorkflow(execution, "env-123")
133+
134+
assert.Equal(t, "env-123", queueEvent.GroupId)
135+
assert.Equal(t, "env-123", startEvent.GroupId)
136+
}

pkg/controlplane/agent_grpc_execution_updates.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ func (s *Server) GetExecutionUpdates(ctx context.Context, _ *executionv1.GetExec
7777
log.Warnw("error marking execution as starting", "err", err)
7878
}
7979

80-
// Dispatch event for WebHooks and friends
81-
s.emitter.Notify(testkube.NewEventStartTestWorkflow(&exe))
80+
// Dispatch event for WebHooks and friends using the environment ID as the routing group.
81+
s.emitter.Notify(testkube.NewEventStartTestWorkflow(&exe, s.envID))
8282
default:
8383
log.Warnw("unexpected state", "id", exe.Id, "status", *exe.Result.Status)
8484
}

pkg/controlplane/scheduling/enqueuer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Enqueuer struct {
2828
templateRepository testworkflowtemplateclient.TestWorkflowTemplateClient
2929
executionRepository testworkflow.Repository
3030
emitter *event.Emitter
31+
envID string
3132
globalTemplateName string
3233
hasInlinedGlobalTemplate bool
3334
}
@@ -38,6 +39,7 @@ func NewEnqueuer(
3839
templateRepository testworkflowtemplateclient.TestWorkflowTemplateClient,
3940
executionRepository testworkflow.Repository,
4041
emitter *event.Emitter,
42+
envID string,
4143
globalTemplateName string,
4244
hasInlinedGlobalTemplate bool,
4345
) Enqueuer {
@@ -47,6 +49,7 @@ func NewEnqueuer(
4749
templateRepository: templateRepository,
4850
executionRepository: executionRepository,
4951
emitter: emitter,
52+
envID: envID,
5053
globalTemplateName: globalTemplateName,
5154
hasInlinedGlobalTemplate: hasInlinedGlobalTemplate,
5255
}
@@ -393,7 +396,7 @@ func (e *Enqueuer) persistExecution(ctx context.Context, executions []*testworkf
393396
// dispatchExecutionEvents dispatches events related to queueing.
394397
func (e *Enqueuer) dispatchExecutionEvents(executions []testkube.TestWorkflowExecution) {
395398
for _, execution := range executions {
396-
e.emitter.Notify(testkube.NewEventQueueTestWorkflow(&execution))
399+
e.emitter.Notify(testkube.NewEventQueueTestWorkflow(&execution, e.envID))
397400
}
398401
}
399402

pkg/event/bus/nats_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestNATS_Integration(t *testing.T) {
8282

8383
// given event
8484

85-
event := testkube.NewEventStartTestWorkflow(testkube.NewQueuedExecution())
85+
event := testkube.NewEventStartTestWorkflow(testkube.NewQueuedExecution(), "")
8686
event.Id = "123"
8787

8888
// and connection

pkg/event/emitter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,10 @@ func TestEmitter_GroupedListenersReceiveQueuedAndStartedWorkflowEvents(t *testin
215215
time.Sleep(50 * time.Millisecond)
216216

217217
execution := testkube.NewExecutionWithID("executionID-grouped", "grouped")
218-
execution.GroupId = "env-123"
218+
execution.GroupId = "execution-group-123"
219219

220-
emitter.Notify(testkube.NewEventQueueTestWorkflow(execution))
221-
emitter.Notify(testkube.NewEventStartTestWorkflow(execution))
220+
emitter.Notify(testkube.NewEventQueueTestWorkflow(execution, "env-123"))
221+
emitter.Notify(testkube.NewEventStartTestWorkflow(execution, "env-123"))
222222

223223
assert.Eventually(t, func() bool {
224224
return listener.GetNotificationCount() == 2 && len(listener.GetReceivedEventTypes()) == 2

pkg/event/kind/websocket/listener_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestWebsocketListener(t *testing.T) {
2020
}}
2121

2222
// when
23-
result := l.Notify(testkube.NewEventStartTestWorkflow(testkube.NewQueuedExecution()))
23+
result := l.Notify(testkube.NewEventStartTestWorkflow(testkube.NewQueuedExecution(), ""))
2424

2525
// then
2626
assert.Equal(t, "", result.Error_)
@@ -31,7 +31,7 @@ func TestWebsocketListenerNoClients(t *testing.T) {
3131
l := NewWebsocketListener()
3232

3333
// when
34-
result := l.Notify(testkube.NewEventStartTestWorkflow(testkube.NewQueuedExecution()))
34+
result := l.Notify(testkube.NewEventStartTestWorkflow(testkube.NewQueuedExecution(), ""))
3535

3636
// then - not an error when no clients are connected
3737
assert.Equal(t, "", result.Error_)

0 commit comments

Comments
 (0)