Skip to content

Reapply "Sort the tail log messages (#1046)" #1102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions src/pkg/clouds/aws/ecs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/DefangLabs/defang/src/pkg/clouds/aws"
Expand All @@ -29,39 +28,40 @@ func getLogGroupIdentifier(arnOrId string) string {
func QueryAndTailLogGroups(ctx context.Context, start, end time.Time, logGroups ...LogGroupInput) (LiveTailStream, error) {
ctx, cancel := context.WithCancel(ctx)

e := &eventStream{
cancel: cancel,
ch: make(chan types.StartLiveTailResponseStream),
}
errCh := make(chan error)

// We must close the channel when all log groups are done
var wg sync.WaitGroup
var err error
var eventCh chan LogEvent
for _, lgi := range logGroups {
var es LiveTailStream
es, err = QueryAndTailLogGroup(ctx, lgi, start, end)
es, err := QueryAndTailLogGroup(ctx, lgi, start, end)
if err != nil {
break // abort if there is any fatal error
cancel()
return nil, err
}
wg.Add(1)
go func() {
defer es.Close()
defer wg.Done()
// FIXME: this should *merge* the events from all log groups
e.err = e.pipeEvents(ctx, es)
}()
newCh := LiveTailStreamToChannel(ctx, es, errCh)
eventCh = mergeLogEventChan(eventCh, newCh)
}

go func() {
wg.Wait()
close(e.ch)
}()

if err != nil {
cancel() // abort any goroutines (caller won't call Close)
return nil, err
e := &eventStream{
cancel: cancel,
ch: make(chan types.StartLiveTailResponseStream),
}

go func() {
defer close(e.ch)
for {
select {
case event := <-eventCh:
e.ch <- &types.StartLiveTailResponseStreamMemberSessionUpdate{
Value: types.LiveTailSessionUpdate{SessionResults: []types.LiveTailSessionLogEvent{event}},
}
case err := <-errCh:
e.err = err
return // defered close of e.ch will unblock the caller to pick up the error
case <-ctx.Done():
return
}
}
}()
return e, nil
}

Expand Down
39 changes: 39 additions & 0 deletions src/pkg/clouds/aws/ecs/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,42 @@ func (es *eventStream) pipeEvents(ctx context.Context, tailStream LiveTailStream
}
}
}

func LiveTailStreamToChannel(ctx context.Context, tailStream LiveTailStream, errCh chan<- error) chan LogEvent {
eventCh := make(chan LogEvent)
go func() {
defer close(eventCh)
defer tailStream.Close()
for {
// Double select to make sure context cancellation is not blocked by either the receive or send
// See: https://stackoverflow.com/questions/60030756/what-does-it-mean-when-one-channel-uses-two-arrows-to-write-to-another-channel
select {
case stream := <-tailStream.Events(): // blocking
if err := tailStream.Err(); err != nil {
errCh <- err
return
}
if stream == nil {
continue
}
events, err := GetLogEvents(stream)
if err != nil {
errCh <- err
return
}
for _, event := range events {
select {
case eventCh <- event:
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
case <-ctx.Done(): // blocking
errCh <- ctx.Err()
return
}
}
}()
return eventCh
}
1 change: 0 additions & 1 deletion src/protos/io/defang/v1/fabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ message GenerateFilesRequest {
bool agree_tos = 3;
bool training_opt_out = 4; // only valid for Pro users
string model_id = 5; // only valid for Pro users

}

message File {
Expand Down
Loading