Skip to content

Commit 3a5cdc3

Browse files
kozlovicneilalexander
authored andcommitted
[FIXED] MQTT: Timing out waiting to load x retained messages
This could happen even if the server restored all retained messages. The issue was caused by the fact that the check to see if all were restored was based on the message sequence matching the last sequence of the retained message stream, as opposed to the number of messages in the stream. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
1 parent 6574da4 commit 3a5cdc3

File tree

2 files changed

+67
-19
lines changed

2 files changed

+67
-19
lines changed

server/mqtt.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,9 @@ type mqttAccountSessionManager struct {
263263
retmsgs map[string]*mqttRetainedMsgRef // retained messages
264264
rmsCache *sync.Map // map[subject]mqttRetainedMsg
265265
jsa mqttJSA
266-
rrmLastSeq uint64 // Restore retained messages expected last sequence
267-
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
266+
rrmNum uint64 // Number of restored retained messages.
267+
rrmTotal uint64 // Total of retained messages to restore.
268+
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded.
268269
domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject.
269270
}
270271

@@ -1475,16 +1476,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
14751476
return nil, err
14761477
}
14771478

1478-
var lastSeq uint64
1479+
var rmTotal uint64
14791480
var rmDoneCh chan struct{}
14801481
st := si.State
1481-
if st.Msgs > 0 {
1482-
lastSeq = st.LastSeq
1483-
if lastSeq > 0 {
1484-
rmDoneCh = make(chan struct{})
1485-
as.rrmLastSeq = lastSeq
1486-
as.rrmDoneCh = rmDoneCh
1487-
}
1482+
if rmTotal = st.Msgs; rmTotal > 0 {
1483+
rmDoneCh = make(chan struct{})
1484+
as.rrmTotal = rmTotal
1485+
as.rrmDoneCh = rmDoneCh
14881486
}
14891487

14901488
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
@@ -1509,14 +1507,14 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
15091507
return nil, fmt.Errorf("create retained messages consumer for account %q: %v", accName, err)
15101508
}
15111509

1512-
if lastSeq > 0 {
1510+
if rmTotal > 0 {
15131511
ttl := time.NewTimer(mqttJSAPITimeout)
15141512
defer ttl.Stop()
15151513

15161514
select {
15171515
case <-rmDoneCh:
15181516
case <-ttl.C:
1519-
s.Warnf("Timing out waiting to load %v retained messages", st.Msgs)
1517+
s.Warnf("Timing out waiting to load %v retained messages", rmTotal)
15201518
case <-quitCh:
15211519
return nil, ErrServerNotRunning
15221520
}
@@ -1998,10 +1996,10 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
19981996
if err != nil {
19991997
return
20001998
}
2001-
// If lastSeq is 0 (nothing to recover, or done doing it) and this is
1999+
// If rrmTotal is 0 (nothing to recover, or done doing it) and this is
20022000
// from our own server, ignore.
20032001
as.mu.RLock()
2004-
if as.rrmLastSeq == 0 && rm.Origin == as.jsa.id {
2002+
if as.rrmTotal == 0 && rm.Origin == as.jsa.id {
20052003
as.mu.RUnlock()
20062004
return
20072005
}
@@ -2013,12 +2011,14 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
20132011
// Handle this retained message, no need to copy the bytes.
20142012
as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm, false)
20152013

2016-
// If we were recovering (lastSeq > 0), then check if we are done.
2014+
// If we were recovering (rrmTotal > 0), then check if we are done.
20172015
as.mu.Lock()
2018-
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
2019-
as.rrmLastSeq = 0
2020-
close(as.rrmDoneCh)
2021-
as.rrmDoneCh = nil
2016+
if as.rrmTotal > 0 {
2017+
if as.rrmNum++; as.rrmNum == as.rrmTotal {
2018+
as.rrmTotal = 0
2019+
close(as.rrmDoneCh)
2020+
as.rrmDoneCh = nil
2021+
}
20222022
}
20232023
as.mu.Unlock()
20242024
}

server/mqtt_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5232,6 +5232,54 @@ func TestMQTTRetainedMsgCleanup(t *testing.T) {
52325232
}
52335233
}
52345234

5235+
func TestMQTTRestoreRetainedMsgs(t *testing.T) {
5236+
o := testMQTTDefaultOptions()
5237+
s := testMQTTRunServer(t, o)
5238+
defer testMQTTShutdownServer(s)
5239+
5240+
ci := &mqttConnInfo{clientID: "retain", cleanSess: true}
5241+
c, r := testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
5242+
defer c.Close()
5243+
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
5244+
5245+
// Send several retained messages on different topic
5246+
testMQTTPublish(t, c, r, 1, false, true, "foo", 1, []byte("msg1"))
5247+
testMQTTPublish(t, c, r, 1, false, true, "bar", 1, []byte("msg2"))
5248+
testMQTTPublish(t, c, r, 1, false, true, "baz", 1, []byte("msg3"))
5249+
5250+
// Remove the two last ones (by sending empty body)
5251+
testMQTTPublish(t, c, r, 1, false, true, "bar", 1, []byte(""))
5252+
testMQTTPublish(t, c, r, 1, false, true, "baz", 1, []byte(""))
5253+
testMQTTFlush(t, c, nil, r)
5254+
testMQTTDisconnect(t, c, nil)
5255+
5256+
// Now restart the server. We had a bug where we would wait to restore retained
5257+
// messages based on stream last sequence, which was wrong.
5258+
s.Shutdown()
5259+
s, err := NewServer(o)
5260+
require_NoError(t, err)
5261+
l := &captureWarnLogger{warn: make(chan string, 10)}
5262+
s.SetLogger(l, false, false)
5263+
s.Start()
5264+
// s = testMQTTRunServer(t, o)
5265+
// defer testMQTTShutdownServer(s)
5266+
5267+
time.Sleep(500 * time.Millisecond)
5268+
start := time.Now()
5269+
c, r = testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
5270+
defer c.Close()
5271+
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
5272+
dur := time.Since(start)
5273+
if dur > 2*time.Second {
5274+
var warnMsg string
5275+
select {
5276+
case warnMsg = <-l.warn:
5277+
default:
5278+
}
5279+
t.Fatalf("Likely timing out restoring retained messages (%s)", warnMsg)
5280+
}
5281+
}
5282+
52355283
func TestMQTTConnAckFirstPacket(t *testing.T) {
52365284
o := testMQTTDefaultOptions()
52375285
o.NoLog, o.Debug, o.Trace = true, false, false

0 commit comments

Comments
 (0)