Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type mongoServer struct {
ResolvedAddr string
tcpaddr *net.TCPAddr
unusedSockets []*mongoSocket
// Contains nils and pointers to connected sockets. nils may be here for the purpose of reserving a spot for the
// purpose of counting towards the total connections per server limit.
liveSockets []*mongoSocket
closed bool
abended bool
Expand Down Expand Up @@ -140,8 +142,9 @@ func (server *mongoServer) AcquireSocket(poolLimit int, minPoolSize int, timeout
socket = &mongoSocket{
socketState: Connecting,
}
// hold our spot in the liveSockets slice
server.liveSockets = append(server.liveSockets, socket)
// hold a spot in the liveSockets slice to ensure connecting sockets are counted
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain as to why is this important and a close on a socket no longer needed (even if over the pool limit will be detrimental)? This is the thundering herd problem I am assuming (where we run out of connections), so might be a good comment to leave here for future readers.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing a comment explaining the pool limit on the mongoCluster socket, and adjusting terminology here to more obviously related it to the poolLimit parameter which can be traced back to mongoCluster.

In case it matters to anyone in the future, here's the relevant change that changed how pool limits work.

// against the total connection cap.
server.liveSockets = append(server.liveSockets, nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer the use of an atomicCounter for the livesocket count for bookkeeping, but that might need more code changes than this isolated one.

Copy link
Author

@charlievieth charlievieth Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but that requires additional coordination. Since the following example is flacky when n < limit - 1 and more than two processes are concurrently checking the limit:

	const limit = 8
	var p int64
	n := atomic.LoadInt64(&p)
	if n < limit && atomic.CompareAndSwapInt64(&p, n, n+1) {
		// do the thing
	} else {
		// if another process incremented p, we would need a 
		// bounded loop to do this properly and even that
		// isn't great since to be truly correct we would
		// also need a sync.Cond.
	}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, if we used atomics we'd end up with a sub-par mutex or a racy solution.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think an atomic counter really helps us.

Normally I'd prefer we do separate explicit book keeping of sockets in connecting states rather than sticking nils into liveSockets, but given that the upstream project is dead and our limited expected usage and lack of good testing, I think the minimal surgical change is preferred.

server.Unlock()
// release server lock so we can initiate concurrent connections to mongodb
err = server.Connect(timeout, socket)
Expand All @@ -155,11 +158,25 @@ func (server *mongoServer) AcquireSocket(poolLimit int, minPoolSize int, timeout
socket.Close()
return nil, abended, errServerClosed
}
// Replace a nil placeholder with the new socket,
// it does not matter which nil-placeholder we replace.
//
// The reason we do not publish the socket in liveSockets until after
// connection is completed, is that an unconnected socket is not safe
// for concurrent use. Concurrent mongoServer.Close() calls may race with us
// and otherwise call Close() on an unlocked not-yet-connected socket.
for i, s := range server.liveSockets {
Copy link

@scode scode Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel slightly uncomfortable with the fact that this is O(n) during connection acquisition (i.e. O(n^2) to open n new connections).

However, the constant cost is very low compared to the actual connection establishment and the connection caps we use are relatively small. I'd rather us pay this small cost than go with a more complicated fix with a higher risk of introducing a bug. Just wanted to call it out as a decision.

Copy link
Author

@charlievieth charlievieth Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but I think we'll have much bigger problems if the connection count becomes so large that this is unreasonably slow. I'm pretty sure it would need to be around two million before this takes anywhere near a millisecond (searching 1mil pointers took ~500μs on my machine, example test code below).

package main

import (
	"math/rand"
	"testing"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func FindNil(a []*uint64) int {
	for i, p := range a {
		if p == nil {
			return i
		}
	}
	return -1
}

func BenchmarkPointer(b *testing.B) {
	const N = 1000000
	a := make([]*uint64, N)
	for i := range a {
		u := uint64(i)
		a[i] = &u
	}
	n := rand.Intn(1000) // place near end
	a[len(a)-n-1] = nil
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		FindNil(a)
	}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Sorry, I didn't meant to make you do a test :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, I was actually curious about it how long it would take.

if s == nil {
server.liveSockets[i] = socket
break
}
}
server.Unlock()
} else {
// couldn't open connection to mongodb, releasing spot in liveSockets
server.Lock()
server.liveSockets = removeSocket(server.liveSockets, socket)
// remove the nil placeholder, it does not matter which one
server.liveSockets = removeSocket(server.liveSockets, nil)
server.Unlock()
}
}
Expand Down Expand Up @@ -238,7 +255,10 @@ func (server *mongoServer) Close() {
server.Unlock()
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
for i, s := range liveSockets {
s.Close()
// s may be nil if the socket is currently connecting; see AcquireSocket() for details.
if s != nil {
s.Close()
}
liveSockets[i] = nil
}
for i := range unusedSockets {
Expand Down