Skip to content

Commit 6c89940

Browse files
committed
fix #1338
1 parent 8595bce commit 6c89940

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

common/mux/server.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,14 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link,
4444
uplinkReader, uplinkWriter := pipe.New(opts...)
4545
downlinkReader, downlinkWriter := pipe.New(opts...)
4646

47-
worker := &ServerWorker{
48-
dispatcher: s.dispatcher,
49-
link: &vio.Link{
50-
Reader: uplinkReader,
51-
Writer: downlinkWriter,
52-
},
53-
sessionManager: NewSessionManager(),
47+
_, err := NewServerWorker(ctx, s.dispatcher, &vio.Link{
48+
Reader: uplinkReader,
49+
Writer: downlinkWriter,
50+
})
51+
if err != nil {
52+
return nil, err
5453
}
55-
go worker.run(ctx)
54+
5655
return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
5756
}
5857

@@ -72,6 +71,16 @@ type ServerWorker struct {
7271
sessionManager *SessionManager
7372
}
7473

74+
func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *vio.Link) (*ServerWorker, error) {
75+
worker := &ServerWorker{
76+
dispatcher: d,
77+
link: link,
78+
sessionManager: NewSessionManager(),
79+
}
80+
go worker.run(ctx)
81+
return worker, nil
82+
}
83+
7584
func handle(ctx context.Context, s *Session, output buf.Writer) {
7685
writer := NewResponseWriter(s.ID, output, s.transferType)
7786
if err := buf.Copy(s.input, writer); err != nil {
@@ -142,7 +151,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
142151

143152
s, found := w.sessionManager.Get(meta.SessionID)
144153
if !found {
145-
buf.Copy(NewStreamReader(reader), buf.Discard)
154+
return buf.Copy(NewStreamReader(reader), buf.Discard)
146155
}
147156

148157
rr := s.NewReader(reader)

0 commit comments

Comments
 (0)