Skip to content

Commit 7b33da6

Browse files
authored
Merge pull request globalsign#16 from joomcode/feature/busy-socket-reuse
Fix busy socket reuse and kill iterator from another connection
2 parents ec7e089 + 917c501 commit 7b33da6

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

session.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4279,10 +4279,20 @@ func (iter *Iter) Close() error {
42794279
}
42804280
return err
42814281
}
4282+
42824283
socket, err := iter.acquireSocket()
42834284
if err == nil {
4285+
var assassin *mongoSocket
42844286
// TODO Batch kills.
4285-
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
4287+
if socket.isBusy() {
4288+
assassin, _, _ = iter.server.acquireSocketInternal(socket.dialInfo, false)
4289+
}
4290+
if assassin != nil {
4291+
err = assassin.Query(&killCursorsOp{[]int64{cursorId}})
4292+
assassin.Release()
4293+
} else {
4294+
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
4295+
}
42864296
socket.Release()
42874297
}
42884298

socket.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,12 @@ func newSocket(server *mongoServer, conn net.Conn, info *DialInfo) *mongoSocket
204204
stats.socketsAlive(+1)
205205
debugf("Socket %p to %s: initialized", socket, socket.addr)
206206
socket.resetNonce()
207-
go socket.readLoop()
207+
go func() {
208+
socket.readLoop()
209+
for _, _ = range socket.replyFuncs {
210+
socket.Release()
211+
}
212+
}()
208213
return socket
209214
}
210215

@@ -643,6 +648,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
643648
request := &requests[i]
644649
setInt32(buf, request.bufferPos+4, int32(requestId))
645650
socket.replyFuncs[requestId] = request.replyFunc
651+
socket.references++
646652
requestId++
647653
}
648654
socket.Unlock()
@@ -767,10 +773,20 @@ func (socket *mongoSocket) readLoop() {
767773
}
768774
socket.Unlock()
769775

776+
if replyFunc != nil {
777+
socket.Release()
778+
}
770779
// XXX Do bound checking against totalLen.
771780
}
772781
}
773782

783+
func (socket *mongoSocket) isBusy() bool {
784+
socket.Lock()
785+
busy := len(socket.replyFuncs) != 0
786+
socket.Unlock()
787+
return busy
788+
}
789+
774790
var emptyHeader = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
775791

776792
func addHeader(b []byte, opcode int) []byte {

0 commit comments

Comments
 (0)