Skip to content

Commit b05597d

Browse files
committed
fix passive connection in mux. fix #1167
1 parent c9e35bb commit b05597d

File tree

8 files changed

+50
-30
lines changed

8 files changed

+50
-30
lines changed

app/dispatcher/default.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type cachedReader struct {
2828
}
2929

3030
func (r *cachedReader) Cache(b *buf.Buffer) {
31-
mb, _ := r.reader.ReadMultiBufferWithTimeout(time.Millisecond * 100)
31+
mb, _ := r.reader.ReadMultiBufferTimeout(time.Millisecond * 100)
3232
if !mb.IsEmpty() {
3333
common.Must(r.cache.WriteMultiBuffer(mb))
3434
}
@@ -47,6 +47,16 @@ func (r *cachedReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
4747
return r.reader.ReadMultiBuffer()
4848
}
4949

50+
func (r *cachedReader) ReadMultiBufferTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
51+
if !r.cache.IsEmpty() {
52+
mb := r.cache
53+
r.cache = nil
54+
return mb, nil
55+
}
56+
57+
return r.reader.ReadMultiBufferTimeout(timeout)
58+
}
59+
5060
func (r *cachedReader) CloseError() {
5161
r.cache.Release()
5262
r.reader.CloseError()

app/proxyman/mux/mux.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,17 @@ func (m *Client) monitor() {
147147
}
148148
}
149149

150-
func copyFirstPayload(reader *pipe.Reader, writer *Writer) error {
151-
data, err := reader.ReadMultiBufferWithTimeout(time.Millisecond * 200)
152-
if err == buf.ErrReadTimeout {
153-
return writer.writeMetaOnly()
150+
func writeFirstPayload(reader buf.Reader, writer *Writer) error {
151+
err := buf.CopyOnceTimeout(reader, writer, time.Millisecond*200)
152+
if err == buf.ErrNotTimeoutReader || err == buf.ErrReadTimeout {
153+
return writer.WriteMultiBuffer(buf.MultiBuffer{})
154154
}
155155

156156
if err != nil {
157157
return err
158158
}
159159

160-
return writer.WriteMultiBuffer(data)
160+
return nil
161161
}
162162

163163
func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
@@ -172,13 +172,11 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
172172
defer writer.Close() // nolint: errcheck
173173

174174
newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx))
175-
if pReader, ok := s.input.(*pipe.Reader); ok {
176-
if err := copyFirstPayload(pReader, writer); err != nil {
177-
newError("failed to fetch first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
178-
writer.hasError = true
179-
pipe.CloseError(s.input)
180-
return
181-
}
175+
if err := writeFirstPayload(s.input, writer); err != nil {
176+
newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
177+
writer.hasError = true
178+
pipe.CloseError(s.input)
179+
return
182180
}
183181

184182
if err := buf.Copy(s.input, writer); err != nil {

common/buf/copy.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package buf
22

33
import (
44
"io"
5+
"time"
56

67
"v2ray.com/core/common/errors"
78
"v2ray.com/core/common/signal"
@@ -112,3 +113,17 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
112113
}
113114
return nil
114115
}
116+
117+
var ErrNotTimeoutReader = newError("not a TimeoutReader")
118+
119+
func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error {
120+
timeoutReader, ok := reader.(TimeoutReader)
121+
if !ok {
122+
return ErrNotTimeoutReader
123+
}
124+
mb, err := timeoutReader.ReadMultiBufferTimeout(timeout)
125+
if err != nil {
126+
return err
127+
}
128+
return writer.WriteMultiBuffer(mb)
129+
}

common/buf/io.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ var ErrReadTimeout = newError("IO timeout")
1616

1717
// TimeoutReader is a reader that returns error if Read() operation takes longer than the given timeout.
1818
type TimeoutReader interface {
19-
ReadTimeout(time.Duration) (MultiBuffer, error)
19+
ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
2020
}
2121

2222
// Writer extends io.Writer with MultiBuffer.

proxy/vmess/outbound/outbound.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"v2ray.com/core/common/session"
1010
"v2ray.com/core/common/task"
1111

12-
"v2ray.com/core/transport/pipe"
13-
1412
"v2ray.com/core"
1513
"v2ray.com/core/common"
1614
"v2ray.com/core/common/buf"
@@ -118,16 +116,8 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
118116
}
119117

120118
bodyWriter := session.EncodeRequestBody(request, writer)
121-
if tReader, ok := input.(*pipe.Reader); ok {
122-
firstPayload, err := tReader.ReadMultiBufferWithTimeout(time.Millisecond * 500)
123-
if err != nil && err != buf.ErrReadTimeout {
124-
return newError("failed to get first payload").Base(err)
125-
}
126-
if !firstPayload.IsEmpty() {
127-
if err := bodyWriter.WriteMultiBuffer(firstPayload); err != nil {
128-
return newError("failed to write first payload").Base(err)
129-
}
130-
}
119+
if err := buf.CopyOnceTimeout(input, bodyWriter, time.Millisecond*500); err != nil && err != buf.ErrNotTimeoutReader && err != buf.ErrReadTimeout {
120+
return newError("failed to write first payload").Base(err)
131121
}
132122

133123
if err := writer.SetBuffered(false); err != nil {

transport/pipe/impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
8181
}
8282
}
8383

84-
func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
84+
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
8585
timer := time.After(d)
8686
for {
8787
data, err := p.readMultiBufferInternal()

transport/pipe/pipe_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,10 @@ func TestPipeWriteMultiThread(t *testing.T) {
118118
assert(err, IsNil)
119119
assert(b[0].Bytes(), Equals, []byte{'a', 'b', 'c', 'd'})
120120
}
121+
122+
func TestInterfaces(t *testing.T) {
123+
assert := With(t)
124+
125+
assert((*Reader)(nil), Implements, (*buf.Reader)(nil))
126+
assert((*Reader)(nil), Implements, (*buf.TimeoutReader)(nil))
127+
}

transport/pipe/reader.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ func (r *Reader) ReadMultiBuffer() (buf.MultiBuffer, error) {
1616
return r.pipe.ReadMultiBuffer()
1717
}
1818

19-
// ReadMultiBufferWithTimeout reads content from a pipe within the given duration, or returns buf.ErrTimeout otherwise.
20-
func (r *Reader) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
21-
return r.pipe.ReadMultiBufferWithTimeout(d)
19+
// ReadMultiBufferTimeout reads content from a pipe within the given duration, or returns buf.ErrTimeout otherwise.
20+
func (r *Reader) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
21+
return r.pipe.ReadMultiBufferTimeout(d)
2222
}
2323

2424
// CloseError sets the pipe to error state. Both reading and writing from/to the pipe will return io.ErrClosedPipe.

0 commit comments

Comments
 (0)