Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ type mqttAccountSessionManager struct {
retmsgs map[string]*mqttRetainedMsgRef // retained messages
rmsCache *sync.Map // map[subject]mqttRetainedMsg
jsa mqttJSA
rrmLastSeq uint64 // Restore retained messages expected last sequence
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
rrmNum uint64 // Number of restored retained messages.
rrmTotal uint64 // Total of retained messages to restore.
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded.
domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject.
}

Expand Down Expand Up @@ -1475,16 +1476,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
return nil, err
}

var lastSeq uint64
var rmTotal uint64
var rmDoneCh chan struct{}
st := si.State
if st.Msgs > 0 {
lastSeq = st.LastSeq
if lastSeq > 0 {
rmDoneCh = make(chan struct{})
as.rrmLastSeq = lastSeq
as.rrmDoneCh = rmDoneCh
}
if rmTotal = st.Msgs; rmTotal > 0 {
rmDoneCh = make(chan struct{})
as.rrmTotal = rmTotal
as.rrmDoneCh = rmDoneCh
}

// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
Expand All @@ -1509,14 +1507,14 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
return nil, fmt.Errorf("create retained messages consumer for account %q: %v", accName, err)
}

if lastSeq > 0 {
if rmTotal > 0 {
ttl := time.NewTimer(mqttJSAPITimeout)
defer ttl.Stop()

select {
case <-rmDoneCh:
case <-ttl.C:
s.Warnf("Timing out waiting to load %v retained messages", st.Msgs)
s.Warnf("Timing out waiting to load %v retained messages", rmTotal)
case <-quitCh:
return nil, ErrServerNotRunning
}
Expand Down Expand Up @@ -1998,10 +1996,10 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
if err != nil {
return
}
// If lastSeq is 0 (nothing to recover, or done doing it) and this is
// If rrmTotal is 0 (nothing to recover, or done doing it) and this is
// from our own server, ignore.
as.mu.RLock()
if as.rrmLastSeq == 0 && rm.Origin == as.jsa.id {
if as.rrmTotal == 0 && rm.Origin == as.jsa.id {
as.mu.RUnlock()
return
}
Expand All @@ -2013,12 +2011,14 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
// Handle this retained message, no need to copy the bytes.
as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm, false)

// If we were recovering (lastSeq > 0), then check if we are done.
// If we were recovering (rrmTotal > 0), then check if we are done.
as.mu.Lock()
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
as.rrmLastSeq = 0
close(as.rrmDoneCh)
as.rrmDoneCh = nil
if as.rrmTotal > 0 {
if as.rrmNum++; as.rrmNum == as.rrmTotal {
as.rrmTotal = 0
close(as.rrmDoneCh)
as.rrmDoneCh = nil
}
}
as.mu.Unlock()
}
Expand Down
48 changes: 48 additions & 0 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5232,6 +5232,54 @@ func TestMQTTRetainedMsgCleanup(t *testing.T) {
}
}

func TestMQTTRestoreRetainedMsgs(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

ci := &mqttConnInfo{clientID: "retain", cleanSess: true}
c, r := testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

// Send several retained messages on different topic
testMQTTPublish(t, c, r, 1, false, true, "foo", 1, []byte("msg1"))
testMQTTPublish(t, c, r, 1, false, true, "bar", 1, []byte("msg2"))
testMQTTPublish(t, c, r, 1, false, true, "baz", 1, []byte("msg3"))

// Remove the two last ones (by sending empty body)
testMQTTPublish(t, c, r, 1, false, true, "bar", 1, []byte(""))
testMQTTPublish(t, c, r, 1, false, true, "baz", 1, []byte(""))
testMQTTFlush(t, c, nil, r)
testMQTTDisconnect(t, c, nil)

// Now restart the server. We had a bug where we would wait to restore retained
// messages based on stream last sequence, which was wrong.
s.Shutdown()
s, err := NewServer(o)
require_NoError(t, err)
l := &captureWarnLogger{warn: make(chan string, 10)}
s.SetLogger(l, false, false)
s.Start()
// s = testMQTTRunServer(t, o)
// defer testMQTTShutdownServer(s)

time.Sleep(500 * time.Millisecond)
start := time.Now()
c, r = testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
dur := time.Since(start)
if dur > 2*time.Second {
var warnMsg string
select {
case warnMsg = <-l.warn:
default:
}
t.Fatalf("Likely timing out restoring retained messages (%s)", warnMsg)
}
}

func TestMQTTConnAckFirstPacket(t *testing.T) {
o := testMQTTDefaultOptions()
o.NoLog, o.Debug, o.Trace = true, false, false
Expand Down