Skip to content

Commit 8de64b2

Browse files
charlieviethscode
authored andcommitted
Fix a race that can occur while closing a socket that is connecting (#41)
1 parent 446a62a commit 8de64b2

File tree

2 files changed

+30
-4
lines changed

2 files changed

+30
-4
lines changed

cluster.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ type mongoCluster struct {
6262
sync chan bool
6363
dial dialer
6464
maxSocketReuseTime time.Duration
65+
// The maximum number of sockets connected to the same server in a cluster. If the limit is reached for a server,
66+
// the attempt to establish the connection will fail (not block).
67+
//
68+
// The purpose of the pool limit is primarily to protect the mongo cluster itself. Connections are expensive on
69+
// the server (thread-per-connection), and the failure modes of a mongo cluster once you begin hitting connection
70+
// limits can be catastrophic.
6571
poolLimit int
6672
minPoolSize int
6773
}

server.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ type mongoServer struct {
4949
ResolvedAddr string
5050
tcpaddr *net.TCPAddr
5151
unusedSockets []*mongoSocket
52+
// Contains nils and pointers to connected sockets. nils may be here for the purpose of reserving a spot for the
53+
// purpose of counting towards the total connections per server limit.
5254
liveSockets []*mongoSocket
5355
closed bool
5456
abended bool
@@ -140,8 +142,9 @@ func (server *mongoServer) AcquireSocket(poolLimit int, minPoolSize int, timeout
140142
socket = &mongoSocket{
141143
socketState: Connecting,
142144
}
143-
// hold our spot in the liveSockets slice
144-
server.liveSockets = append(server.liveSockets, socket)
145+
// hold a spot in the liveSockets slice to ensure connecting sockets are counted
146+
// against the pool limit.
147+
server.liveSockets = append(server.liveSockets, nil)
145148
server.Unlock()
146149
// release server lock so we can initiate concurrent connections to mongodb
147150
err = server.Connect(timeout, socket)
@@ -155,11 +158,25 @@ func (server *mongoServer) AcquireSocket(poolLimit int, minPoolSize int, timeout
155158
socket.Close()
156159
return nil, abended, errServerClosed
157160
}
161+
// Replace a nil placeholder with the new socket,
162+
// it does not matter which nil-placeholder we replace.
163+
//
164+
// The reason we do not publish the socket in liveSockets until after
165+
// connection is completed, is that an unconnected socket is not safe
166+
// for concurrent use. Concurrent mongoServer.Close() calls may race with us
167+
// and otherwise call Close() on an unlocked not-yet-connected socket.
168+
for i, s := range server.liveSockets {
169+
if s == nil {
170+
server.liveSockets[i] = socket
171+
break
172+
}
173+
}
158174
server.Unlock()
159175
} else {
160176
// couldn't open connection to mongodb, releasing spot in liveSockets
161177
server.Lock()
162-
server.liveSockets = removeSocket(server.liveSockets, socket)
178+
// remove the nil placeholder, it does not matter which one
179+
server.liveSockets = removeSocket(server.liveSockets, nil)
163180
server.Unlock()
164181
}
165182
}
@@ -238,7 +255,10 @@ func (server *mongoServer) Close() {
238255
server.Unlock()
239256
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
240257
for i, s := range liveSockets {
241-
s.Close()
258+
// s may be nil if the socket is currently connecting; see AcquireSocket() for details.
259+
if s != nil {
260+
s.Close()
261+
}
242262
liveSockets[i] = nil
243263
}
244264
for i := range unusedSockets {

0 commit comments

Comments
 (0)