Skip to content

Commit c54d016

Browse files
[FIXED] Gateway RS+/- blocks on account fetch
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 192d2bf commit c54d016

File tree

4 files changed

+69
-4
lines changed

4 files changed

+69
-4
lines changed

server/gateway_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
"github.com/nats-io/nats-server/v2/logger"
3535
"github.com/nats-io/nats.go"
36+
"github.com/nats-io/nkeys"
3637
"golang.org/x/crypto/ocsp"
3738

3839
. "github.com/nats-io/nats-server/v2/internal/ocsp"
@@ -7557,3 +7558,59 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) {
75577558
require_Equal(t, r.out.wdl, 5*time.Second)
75587559
})
75597560
}
7561+
7562+
func TestGatewayProcessRSubNoBlockingAccountFetch(t *testing.T) {
7563+
createAccountPubKey := func() string {
7564+
kp, err := nkeys.CreateAccount()
7565+
require_NoError(t, err)
7566+
pubkey, err := kp.PublicKey()
7567+
require_NoError(t, err)
7568+
return pubkey
7569+
}
7570+
sysPub := createAccountPubKey()
7571+
accPub := createAccountPubKey()
7572+
dir := t.TempDir()
7573+
conf := createConfFile(t, []byte(fmt.Sprintf(`
7574+
listen: 127.0.0.1:-1
7575+
server_name: srv
7576+
operator: %s
7577+
system_account: %s
7578+
resolver: {
7579+
type: cache
7580+
dir: '%s'
7581+
timeout: "2s"
7582+
}
7583+
gateway: {
7584+
name: "clust-B"
7585+
listen: 127.0.0.1:-1
7586+
}
7587+
`, ojwt, sysPub, dir)))
7588+
s, _ := RunServerWithConfig(conf)
7589+
defer s.Shutdown()
7590+
7591+
// Set up a mock gateway client.
7592+
c := s.createInternalAccountClient()
7593+
c.mu.Lock()
7594+
c.gw = &gateway{}
7595+
c.gw.outsim = &sync.Map{}
7596+
c.nc = &net.IPConn{}
7597+
c.mu.Unlock()
7598+
7599+
// Receiving a R+ should not be blocking, since we're in the gateway's readLoop.
7600+
start := time.Now()
7601+
require_NoError(t, c.processGatewayRSub(fmt.Appendf(nil, "%s subj queue 0", accPub)))
7602+
c.mu.Lock()
7603+
subs := len(c.subs)
7604+
c.mu.Unlock()
7605+
require_Len(t, subs, 1)
7606+
require_LessThan(t, time.Since(start), 100*time.Millisecond)
7607+
7608+
// Receiving a R- should not be blocking, since we're in the gateway's readLoop.
7609+
start = time.Now()
7610+
require_NoError(t, c.processGatewayRUnsub(fmt.Appendf(nil, "%s subj queue", accPub)))
7611+
c.mu.Lock()
7612+
subs = len(c.subs)
7613+
c.mu.Unlock()
7614+
require_Len(t, subs, 0)
7615+
require_LessThan(t, time.Since(start), 100*time.Millisecond)
7616+
}

server/jetstream_super_cluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4411,7 +4411,7 @@ func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *tes
44114411
if gw.Name == opts.Gateway.Name {
44124412
continue
44134413
}
4414-
checkGWInterestOnlyMode(t, s, gw.Name, apub)
4414+
checkGWInterestOnlyModeOrNotPresent(t, s, gw.Name, apub, true)
44154415
}
44164416
}
44174417
s = sc.serverByName(si.Cluster.Leader)

server/leafnode.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2426,7 +2426,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
24262426

24272427
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
24282428
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
2429-
acc, err := s.LookupAccount(accName)
2429+
// Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
2430+
acc, err := s.lookupOrFetchAccount(accName, false)
24302431
if acc == nil || err != nil {
24312432
s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
24322433
return

server/server.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,6 +2049,13 @@ func (s *Server) setRouteInfo(acc *Account) {
20492049
// associated with an account name.
20502050
// Lock MUST NOT be held upon entry.
20512051
func (s *Server) lookupAccount(name string) (*Account, error) {
2052+
return s.lookupOrFetchAccount(name, true)
2053+
}
2054+
2055+
// lookupOrFetchAccount is a function to return the account structure
2056+
// associated with an account name.
2057+
// Lock MUST NOT be held upon entry.
2058+
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
20522059
var acc *Account
20532060
if v, ok := s.accounts.Load(name); ok {
20542061
acc = v.(*Account)
@@ -2058,7 +2065,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) {
20582065
// return the latest information from the resolver.
20592066
if acc.IsExpired() {
20602067
s.Debugf("Requested account [%s] has expired", name)
2061-
if s.AccountResolver() != nil {
2068+
if s.AccountResolver() != nil && fetch {
20622069
if err := s.updateAccount(acc); err != nil {
20632070
// This error could mask expired, so just return expired here.
20642071
return nil, ErrAccountExpired
@@ -2070,7 +2077,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) {
20702077
return acc, nil
20712078
}
20722079
// If we have a resolver see if it can fetch the account.
2073-
if s.AccountResolver() == nil {
2080+
if s.AccountResolver() == nil || !fetch {
20742081
return nil, ErrMissingAccount
20752082
}
20762083
return s.fetchAccount(name)

0 commit comments

Comments
 (0)