@@ -18,7 +18,9 @@ package server
1818import (
1919 "encoding/json"
2020 "fmt"
21+ "math"
2122 "math/rand"
23+ "strconv"
2224 "sync"
2325 "sync/atomic"
2426 "testing"
@@ -991,6 +993,88 @@ func BenchmarkJetStreamPublish(b *testing.B) {
991993 }
992994}
993995
996+ func BenchmarkJetStreamMetaSnapshot (b * testing.B ) {
997+ c := createJetStreamClusterExplicit (b , "R3S" , 3 )
998+ defer c .shutdown ()
999+
1000+ setup := func (reqLevel string ) * jetStream {
1001+ ml := c .leader ()
1002+ acc , js := ml .globalAccount (), ml .getJetStream ()
1003+ n := js .getMetaGroup ()
1004+
1005+ // Create all streams and consumers.
1006+ numStreams := 200
1007+ numConsumers := 500
1008+ ci := & ClientInfo {Cluster : "R3S" , Account : globalAccountName }
1009+ js .mu .Lock ()
1010+ metadata := map [string ]string {JSRequiredLevelMetadataKey : reqLevel }
1011+ for i := 0 ; i < numStreams ; i ++ {
1012+ scfg := & StreamConfig {
1013+ Name : fmt .Sprintf ("STREAM-%d" , i ),
1014+ Subjects : []string {fmt .Sprintf ("SUBJECT-%d" , i )},
1015+ Metadata : metadata ,
1016+ }
1017+ cfg , _ := ml .checkStreamCfg (scfg , acc , false )
1018+ rg , _ := js .createGroupForStream (ci , & cfg )
1019+ sa := & streamAssignment {Group : rg , Sync : syncSubjForStream (), Config : & cfg , Client : ci , Created : time .Now ().UTC ()}
1020+ n .Propose (encodeAddStreamAssignment (sa ))
1021+
1022+ for j := 0 ; j < numConsumers ; j ++ {
1023+ ccfg := & ConsumerConfig {
1024+ Durable : fmt .Sprintf ("CONSUMER-%d" , j ),
1025+ Metadata : metadata ,
1026+ }
1027+ selectedLimits , _ , _ , _ := acc .selectLimits (ccfg .replicas (& cfg ))
1028+ srvLim := & ml .getOpts ().JetStreamLimits
1029+ setConsumerConfigDefaults (ccfg , & cfg , srvLim , selectedLimits , false )
1030+ rg = js .cluster .createGroupForConsumer (ccfg , sa )
1031+ ca := & consumerAssignment {Group : rg , Stream : cfg .Name , Name : ccfg .Durable , Config : ccfg , Client : ci , Created : time .Now ().UTC ()}
1032+ n .Propose (encodeAddConsumerAssignment (ca ))
1033+ }
1034+ }
1035+ js .mu .Unlock ()
1036+
1037+ // Wait for all servers to have created all assets.
1038+ checkFor (b , 20 * time .Second , 200 * time .Millisecond , func () error {
1039+ for _ , s := range c .servers {
1040+ sjs := s .getJetStream ()
1041+ sjs .mu .RLock ()
1042+ streams := sjs .cluster .streams [globalAccountName ]
1043+ if len (streams ) != numStreams {
1044+ sjs .mu .RUnlock ()
1045+ return fmt .Errorf ("expected %d streams, got %d" , numStreams , len (streams ))
1046+ }
1047+ for _ , sa := range streams {
1048+ if nc := len (sa .consumers ); nc != numConsumers {
1049+ sjs .mu .RUnlock ()
1050+ return fmt .Errorf ("expected %d consumers, got %d" , numConsumers , nc )
1051+ }
1052+ }
1053+ sjs .mu .RUnlock ()
1054+ }
1055+ return nil
1056+ })
1057+ return js
1058+ }
1059+
1060+ for _ , t := range []struct {
1061+ title string
1062+ reqLevel string
1063+ }{
1064+ {title : "Default" , reqLevel : "0" },
1065+ {title : "AllUnsupported" , reqLevel : strconv .Itoa (math .MaxInt )},
1066+ } {
1067+ b .Run (t .title , func (b * testing.B ) {
1068+ js := setup (t .reqLevel )
1069+ b .ResetTimer ()
1070+ for range b .N {
1071+ js .metaSnapshot ()
1072+ }
1073+ b .StopTimer ()
1074+ })
1075+ }
1076+ }
1077+
9941078func BenchmarkJetStreamCounters (b * testing.B ) {
9951079 const (
9961080 verbose = false
0 commit comments