Skip to content

Commit 3375f58

Browse files
[FIXED] Gateway RS+/- blocks on account fetch (#7449)
The methods `processGatewayRSub` and `processGatewayRUnsub` can call into `updateInterestForAccountOnGateway`, which in turn calls `s.LookupAccount`. If the account is not known it will be fetched if there's an account resolver. But, since this is done while in the read loop of the gateway, this blocks not only receiving the response for the account fetch (when using a NATS resolver) but also any other operation like PING/PONG, which then could lead to a stale connection. The fix proposed by this PR is to not allow fetching the account inline of the gateway read loop. Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
2 parents 98bfe6d + c54d016 commit 3375f58

File tree

4 files changed

+69
-4
lines changed

4 files changed

+69
-4
lines changed

server/gateway_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
"github.com/nats-io/nats-server/v2/logger"
3535
"github.com/nats-io/nats.go"
36+
"github.com/nats-io/nkeys"
3637
"golang.org/x/crypto/ocsp"
3738

3839
. "github.com/nats-io/nats-server/v2/internal/ocsp"
@@ -7557,3 +7558,59 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) {
75577558
require_Equal(t, r.out.wdl, 5*time.Second)
75587559
})
75597560
}
7561+
7562+
func TestGatewayProcessRSubNoBlockingAccountFetch(t *testing.T) {
7563+
createAccountPubKey := func() string {
7564+
kp, err := nkeys.CreateAccount()
7565+
require_NoError(t, err)
7566+
pubkey, err := kp.PublicKey()
7567+
require_NoError(t, err)
7568+
return pubkey
7569+
}
7570+
sysPub := createAccountPubKey()
7571+
accPub := createAccountPubKey()
7572+
dir := t.TempDir()
7573+
conf := createConfFile(t, []byte(fmt.Sprintf(`
7574+
listen: 127.0.0.1:-1
7575+
server_name: srv
7576+
operator: %s
7577+
system_account: %s
7578+
resolver: {
7579+
type: cache
7580+
dir: '%s'
7581+
timeout: "2s"
7582+
}
7583+
gateway: {
7584+
name: "clust-B"
7585+
listen: 127.0.0.1:-1
7586+
}
7587+
`, ojwt, sysPub, dir)))
7588+
s, _ := RunServerWithConfig(conf)
7589+
defer s.Shutdown()
7590+
7591+
// Set up a mock gateway client.
7592+
c := s.createInternalAccountClient()
7593+
c.mu.Lock()
7594+
c.gw = &gateway{}
7595+
c.gw.outsim = &sync.Map{}
7596+
c.nc = &net.IPConn{}
7597+
c.mu.Unlock()
7598+
7599+
// Receiving a R+ should not be blocking, since we're in the gateway's readLoop.
7600+
start := time.Now()
7601+
require_NoError(t, c.processGatewayRSub(fmt.Appendf(nil, "%s subj queue 0", accPub)))
7602+
c.mu.Lock()
7603+
subs := len(c.subs)
7604+
c.mu.Unlock()
7605+
require_Len(t, subs, 1)
7606+
require_LessThan(t, time.Since(start), 100*time.Millisecond)
7607+
7608+
// Receiving a R- should not be blocking, since we're in the gateway's readLoop.
7609+
start = time.Now()
7610+
require_NoError(t, c.processGatewayRUnsub(fmt.Appendf(nil, "%s subj queue", accPub)))
7611+
c.mu.Lock()
7612+
subs = len(c.subs)
7613+
c.mu.Unlock()
7614+
require_Len(t, subs, 0)
7615+
require_LessThan(t, time.Since(start), 100*time.Millisecond)
7616+
}

server/jetstream_super_cluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4411,7 +4411,7 @@ func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *tes
44114411
if gw.Name == opts.Gateway.Name {
44124412
continue
44134413
}
4414-
checkGWInterestOnlyMode(t, s, gw.Name, apub)
4414+
checkGWInterestOnlyModeOrNotPresent(t, s, gw.Name, apub, true)
44154415
}
44164416
}
44174417
s = sc.serverByName(si.Cluster.Leader)

server/leafnode.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2426,7 +2426,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
24262426

24272427
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
24282428
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
2429-
acc, err := s.LookupAccount(accName)
2429+
// Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
2430+
acc, err := s.lookupOrFetchAccount(accName, false)
24302431
if acc == nil || err != nil {
24312432
s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
24322433
return

server/server.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,6 +2051,13 @@ func (s *Server) setRouteInfo(acc *Account) {
20512051
// associated with an account name.
20522052
// Lock MUST NOT be held upon entry.
20532053
func (s *Server) lookupAccount(name string) (*Account, error) {
2054+
return s.lookupOrFetchAccount(name, true)
2055+
}
2056+
2057+
// lookupOrFetchAccount is a function to return the account structure
2058+
// associated with an account name.
2059+
// Lock MUST NOT be held upon entry.
2060+
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
20542061
var acc *Account
20552062
if v, ok := s.accounts.Load(name); ok {
20562063
acc = v.(*Account)
@@ -2060,7 +2067,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) {
20602067
// return the latest information from the resolver.
20612068
if acc.IsExpired() {
20622069
s.Debugf("Requested account [%s] has expired", name)
2063-
if s.AccountResolver() != nil {
2070+
if s.AccountResolver() != nil && fetch {
20642071
if err := s.updateAccount(acc); err != nil {
20652072
// This error could mask expired, so just return expired here.
20662073
return nil, ErrAccountExpired
@@ -2072,7 +2079,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) {
20722079
return acc, nil
20732080
}
20742081
// If we have a resolver see if it can fetch the account.
2075-
if s.AccountResolver() == nil {
2082+
if s.AccountResolver() == nil || !fetch {
20762083
return nil, ErrMissingAccount
20772084
}
20782085
return s.fetchAccount(name)

0 commit comments

Comments
 (0)