Skip to content

Commit 2189357

Browse files
Merge branch 'development' of github.com:globalsign/mgo into connection_pool_waiter
2 parents 2c43232 + 860240e commit 2189357

File tree

5 files changed

+210
-24
lines changed

5 files changed

+210
-24
lines changed

cluster.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,23 @@ import (
4848

4949
type mongoCluster struct {
5050
sync.RWMutex
51-
serverSynced sync.Cond
52-
userSeeds []string
53-
dynaSeeds []string
54-
servers mongoServers
55-
masters mongoServers
56-
references int
57-
syncing bool
58-
direct bool
59-
failFast bool
60-
syncCount uint
61-
setName string
62-
cachedIndex map[string]bool
63-
sync chan bool
64-
dial dialer
65-
appName string
51+
serverSynced sync.Cond
52+
userSeeds []string
53+
dynaSeeds []string
54+
servers mongoServers
55+
masters mongoServers
56+
references int
57+
syncing bool
58+
direct bool
59+
failFast bool
60+
syncCount uint
61+
setName string
62+
cachedIndex map[string]bool
63+
sync chan bool
64+
dial dialer
65+
appName string
66+
minPoolSize int
67+
maxIdleTimeMS int
6668
}
6769

6870
func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
@@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() {
437439
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
438440
cluster.RLock()
439441
server := cluster.servers.Search(tcpaddr.String())
442+
minPoolSize := cluster.minPoolSize
443+
maxIdleTimeMS := cluster.maxIdleTimeMS
440444
cluster.RUnlock()
441445
if server != nil {
442446
return server
443447
}
444-
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
448+
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
445449
}
446450

447451
func resolveAddr(addr string) (*net.TCPAddr, error) {

server.go

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type mongoServer struct {
5656
closed bool
5757
abended bool
5858
poolWaiter *sync.Cond
59+
minPoolSize int
60+
maxIdleTimeMS int
5961
}
6062

6163
type dialer struct {
@@ -77,18 +79,23 @@ type mongoServerInfo struct {
7779

7880
var defaultServerInfo mongoServerInfo
7981

80-
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer) *mongoServer {
82+
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
8183
server := &mongoServer{
82-
Addr: addr,
83-
ResolvedAddr: tcpaddr.String(),
84-
tcpaddr: tcpaddr,
85-
sync: syncChan,
86-
dial: dial,
87-
info: &defaultServerInfo,
88-
pingValue: time.Hour, // Push it back before an actual ping.
84+
Addr: addr,
85+
ResolvedAddr: tcpaddr.String(),
86+
tcpaddr: tcpaddr,
87+
sync: syncChan,
88+
dial: dial,
89+
info: &defaultServerInfo,
90+
pingValue: time.Hour, // Push it back before an actual ping.
91+
minPoolSize: minPoolSize,
92+
maxIdleTimeMS: maxIdleTimeMS,
8993
}
9094
server.poolWaiter = sync.NewCond(server)
9195
go server.pinger(true)
96+
if maxIdleTimeMS != 0 {
97+
go server.poolShrinker()
98+
}
9299
return server
93100
}
94101

@@ -277,6 +284,7 @@ func (server *mongoServer) close(waitForIdle bool) {
277284
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
278285
server.Lock()
279286
if !server.closed {
287+
socket.lastTimeUsed = time.Now()
280288
server.unusedSockets = append(server.unusedSockets, socket)
281289
}
282290
// If anybody is waiting for a connection, they should try now.
@@ -410,6 +418,53 @@ func (server *mongoServer) pinger(loop bool) {
410418
}
411419
}
412420

421+
func (server *mongoServer) poolShrinker() {
422+
ticker := time.NewTicker(1 * time.Minute)
423+
for _ = range ticker.C {
424+
if server.closed {
425+
ticker.Stop()
426+
return
427+
}
428+
server.Lock()
429+
unused := len(server.unusedSockets)
430+
if unused < server.minPoolSize {
431+
server.Unlock()
432+
continue
433+
}
434+
now := time.Now()
435+
end := 0
436+
reclaimMap := map[*mongoSocket]struct{}{}
437+
// Because the acquisition and recycle are done at the tail of array,
438+
// the head is always the oldest unused socket.
439+
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
440+
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
441+
break
442+
}
443+
end++
444+
reclaimMap[s] = struct{}{}
445+
}
446+
tbr := server.unusedSockets[:end]
447+
if end > 0 {
448+
next := make([]*mongoSocket, unused-end)
449+
copy(next, server.unusedSockets[end:])
450+
server.unusedSockets = next
451+
remainSockets := []*mongoSocket{}
452+
for _, s := range server.liveSockets {
453+
if _, ok := reclaimMap[s]; !ok {
454+
remainSockets = append(remainSockets, s)
455+
}
456+
}
457+
server.liveSockets = remainSockets
458+
stats.conn(-1*end, server.info.Master)
459+
}
460+
server.Unlock()
461+
462+
for _, s := range tbr {
463+
s.Close()
464+
}
465+
}
466+
}
467+
413468
type mongoServerSlice []*mongoServer
414469

415470
func (s mongoServerSlice) Len() int {

session.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,16 @@ const (
272272
// Defines the per-server socket pool limit. Defaults to 4096.
273273
// See Session.SetPoolLimit for details.
274274
//
275+
// minPoolSize=<limit>
276+
//
277+
// Defines the per-server socket pool minium size. Defaults to 0.
278+
//
279+
// maxIdleTimeMS=<millisecond>
280+
//
281+
// The maximum number of milliseconds that a connection can remain idle in the pool
282+
// before being removed and closed. If maxIdleTimeMS is 0, connections will never be
283+
// closed due to inactivity.
284+
//
275285
// appName=<appName>
276286
//
277287
// The identifier of this client application. This parameter is used to
@@ -323,6 +333,8 @@ func ParseURL(url string) (*DialInfo, error) {
323333
appName := ""
324334
readPreferenceMode := Primary
325335
var readPreferenceTagSets []bson.D
336+
minPoolSize := 0
337+
maxIdleTimeMS := 0
326338
for _, opt := range uinfo.options {
327339
switch opt.key {
328340
case "authSource":
@@ -369,6 +381,22 @@ func ParseURL(url string) (*DialInfo, error) {
369381
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
370382
}
371383
readPreferenceTagSets = append(readPreferenceTagSets, doc)
384+
case "minPoolSize":
385+
minPoolSize, err = strconv.Atoi(opt.value)
386+
if err != nil {
387+
return nil, errors.New("bad value for minPoolSize: " + opt.value)
388+
}
389+
if minPoolSize < 0 {
390+
return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value)
391+
}
392+
case "maxIdleTimeMS":
393+
maxIdleTimeMS, err = strconv.Atoi(opt.value)
394+
if err != nil {
395+
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
396+
}
397+
if maxIdleTimeMS < 0 {
398+
return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value)
399+
}
372400
case "connect":
373401
if opt.value == "direct" {
374402
direct = true
@@ -403,6 +431,8 @@ func ParseURL(url string) (*DialInfo, error) {
403431
TagSets: readPreferenceTagSets,
404432
},
405433
ReplicaSetName: setName,
434+
MinPoolSize: minPoolSize,
435+
MaxIdleTimeMS: maxIdleTimeMS,
406436
}
407437
return &info, nil
408438
}
@@ -481,6 +511,14 @@ type DialInfo struct {
481511
// cluster and establish connections with further servers too.
482512
Direct bool
483513

514+
// MinPoolSize defines The minimum number of connections in the connection pool.
515+
// Defaults to 0.
516+
MinPoolSize int
517+
518+
//The maximum number of milliseconds that a connection can remain idle in the pool
519+
// before being removed and closed.
520+
MaxIdleTimeMS int
521+
484522
// DialServer optionally specifies the dial function for establishing
485523
// connections with the MongoDB servers.
486524
DialServer func(addr *ServerAddr) (net.Conn, error)
@@ -563,6 +601,8 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
563601
if info.PoolTimeout > 0 {
564602
session.poolTimeout = info.PoolTimeout
565603
}
604+
cluster.minPoolSize = info.MinPoolSize
605+
cluster.maxIdleTimeMS = info.MaxIdleTimeMS
566606
cluster.Release()
567607

568608
// People get confused when we return a session that is not actually

session_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ import (
3030
"flag"
3131
"fmt"
3232
"math"
33+
"math/rand"
3334
"os"
3435
"runtime"
3536
"sort"
3637
"strconv"
3738
"strings"
39+
"sync"
3840
"testing"
3941
"time"
4042

@@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) {
166168
}
167169
}
168170

171+
func (s *S) TestMinPoolSize(c *C) {
172+
tests := []struct {
173+
url string
174+
size int
175+
fail bool
176+
}{
177+
{"localhost:40001?minPoolSize=0", 0, false},
178+
{"localhost:40001?minPoolSize=1", 1, false},
179+
{"localhost:40001?minPoolSize=-1", -1, true},
180+
{"localhost:40001?minPoolSize=-.", 0, true},
181+
}
182+
for _, test := range tests {
183+
info, err := mgo.ParseURL(test.url)
184+
if test.fail {
185+
c.Assert(err, NotNil)
186+
} else {
187+
c.Assert(err, IsNil)
188+
c.Assert(info.MinPoolSize, Equals, test.size)
189+
}
190+
}
191+
}
192+
193+
func (s *S) TestMaxIdleTimeMS(c *C) {
194+
tests := []struct {
195+
url string
196+
size int
197+
fail bool
198+
}{
199+
{"localhost:40001?maxIdleTimeMS=0", 0, false},
200+
{"localhost:40001?maxIdleTimeMS=1", 1, false},
201+
{"localhost:40001?maxIdleTimeMS=-1", -1, true},
202+
{"localhost:40001?maxIdleTimeMS=-.", 0, true},
203+
}
204+
for _, test := range tests {
205+
info, err := mgo.ParseURL(test.url)
206+
if test.fail {
207+
c.Assert(err, NotNil)
208+
} else {
209+
c.Assert(err, IsNil)
210+
c.Assert(info.MaxIdleTimeMS, Equals, test.size)
211+
}
212+
}
213+
}
214+
215+
func (s *S) TestPoolShrink(c *C) {
216+
if *fast {
217+
c.Skip("-fast")
218+
}
219+
oldSocket := mgo.GetStats().SocketsAlive
220+
221+
session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000")
222+
c.Assert(err, IsNil)
223+
defer session.Close()
224+
225+
parallel := 10
226+
res := make(chan error, parallel+1)
227+
wg := &sync.WaitGroup{}
228+
for i := 1; i < parallel; i++ {
229+
wg.Add(1)
230+
go func() {
231+
s := session.Copy()
232+
defer s.Close()
233+
result := struct{}{}
234+
err := s.Run("ping", &result)
235+
236+
//sleep random time to make the allocate and release in different sequence
237+
time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond)
238+
res <- err
239+
wg.Done()
240+
}()
241+
}
242+
wg.Wait()
243+
stats := mgo.GetStats()
244+
c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket)
245+
246+
// give some time for shrink the pool, the tick is set to 1 minute
247+
c.Log("Sleeping... 1 minute to for pool shrinking")
248+
time.Sleep(60 * time.Second)
249+
250+
stats = mgo.GetStats()
251+
c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket)
252+
c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false)
253+
}
254+
169255
func (s *S) TestURLReadPreferenceTags(c *C) {
170256
type test struct {
171257
url string

socket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type mongoSocket struct {
5454
dead error
5555
serverInfo *mongoServerInfo
5656
closeAfterIdle bool
57+
lastTimeUsed time.Time // for time based idle socket release
5758
sendMeta sync.Once
5859
}
5960

0 commit comments

Comments
 (0)