Skip to content

Commit 2ec933c

Browse files
authored
[ADDED] FetchHeartbeat option for Fetch and FetchBytes (#1548)
Signed-off-by: Piotr Piotrowski <[email protected]>
1 parent 1c24aa7 commit 2ec933c

File tree

4 files changed

+301
-17
lines changed

4 files changed

+301
-17
lines changed

jetstream/consumer.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ type (
5252
// defaults to 30 seconds and can be configured using FetchMaxWait
5353
// option.
5454
//
55+
// By default, Fetch uses a 5s idle heartbeat for requests longer than
56+
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
57+
// This can be configured using FetchHeartbeat option. If a client does
58+
// not receive a heartbeat message from a stream for more than 2 times
59+
// the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
60+
//
5561
// Fetch is non-blocking and returns MessageBatch, exposing a channel
5662
// for delivered messages.
5763
//
@@ -65,6 +71,12 @@ type (
6571
// timeout defaults to 30 seconds and can be configured using
6672
// FetchMaxWait option.
6773
//
74+
// By default, FetchBytes uses a 5s idle heartbeat for requests longer than
75+
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
76+
// This can be configured using FetchHeartbeat option. If a client does
77+
// not receive a heartbeat message from a stream for more than 2 times
78+
// the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
79+
//
6880
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
6981
// for delivered messages.
7082
//
@@ -75,9 +87,7 @@ type (
7587
// FetchNoWait is used to retrieve up to a provided number of messages
7688
// from a stream. Unlike Fetch, FetchNoWait will only deliver messages
7789
// that are currently available in the stream and will not wait for new
78-
// messages to arrive, even if batch size is not met. FetchNoWait
79-
// timeout defaults to 30 seconds and can be configured using
80-
// FetchMaxWait option.
90+
// messages to arrive, even if batch size is not met.
8191
//
8292
// FetchNoWait is non-blocking and returns MessageBatch, exposing a
8393
// channel for delivered messages.

jetstream/jetstream_options.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt {
260260
}
261261

262262
// FetchMaxWait sets custom timeout for fetching predefined batch of messages.
263+
//
264+
// If not provided, a default of 30 seconds will be used.
263265
func FetchMaxWait(timeout time.Duration) FetchOpt {
264266
return func(req *pullRequest) error {
265267
if timeout <= 0 {
@@ -270,6 +272,24 @@ func FetchMaxWait(timeout time.Duration) FetchOpt {
270272
}
271273
}
272274

275+
// FetchHeartbeat sets custom heartbeat for individual fetch request. If a
276+
// client does not receive a heartbeat message from a stream for more than 2
277+
// times the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
278+
//
279+
// Heartbeat value has to be lower than FetchMaxWait / 2.
280+
//
281+
// If not provided, heartbeat will is set to 5s for requests with FetchMaxWait > 30s
282+
// and disabled otherwise.
283+
func FetchHeartbeat(hb time.Duration) FetchOpt {
284+
return func(req *pullRequest) error {
285+
if hb <= 0 {
286+
return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption)
287+
}
288+
req.Heartbeat = hb
289+
return nil
290+
}
291+
}
292+
273293
// WithDeletedDetails can be used to display the information about messages
274294
// deleted from a stream on a stream info request
275295
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {

jetstream/pull.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -729,17 +729,26 @@ func (s *pullSubscription) Drain() {
729729
// It will wait up to provided expiry time if not all messages are available.
730730
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
731731
req := &pullRequest{
732-
Batch: batch,
733-
Expires: DefaultExpires,
732+
Batch: batch,
733+
Expires: DefaultExpires,
734+
Heartbeat: unset,
734735
}
735736
for _, opt := range opts {
736737
if err := opt(req); err != nil {
737738
return nil, err
738739
}
739740
}
740-
// for longer pulls, set heartbeat value
741-
if req.Expires >= 10*time.Second {
742-
req.Heartbeat = 5 * time.Second
741+
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
742+
// and disable it for shorter pulls
743+
if req.Heartbeat == unset {
744+
if req.Expires >= 10*time.Second {
745+
req.Heartbeat = 5 * time.Second
746+
} else {
747+
req.Heartbeat = 0
748+
}
749+
}
750+
if req.Expires < 2*req.Heartbeat {
751+
return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
743752
}
744753

745754
return p.fetch(req)
@@ -748,26 +757,35 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
748757
// FetchBytes is used to retrieve up to a provided bytes from the stream.
749758
func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {
750759
req := &pullRequest{
751-
Batch: 1000000,
752-
MaxBytes: maxBytes,
753-
Expires: DefaultExpires,
760+
Batch: 1000000,
761+
MaxBytes: maxBytes,
762+
Expires: DefaultExpires,
763+
Heartbeat: unset,
754764
}
755765
for _, opt := range opts {
756766
if err := opt(req); err != nil {
757767
return nil, err
758768
}
759769
}
760-
// for longer pulls, set heartbeat value
761-
if req.Expires >= 10*time.Second {
762-
req.Heartbeat = 5 * time.Second
770+
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
771+
// and disable it for shorter pulls
772+
if req.Heartbeat == unset {
773+
if req.Expires >= 10*time.Second {
774+
req.Heartbeat = 5 * time.Second
775+
} else {
776+
req.Heartbeat = 0
777+
}
778+
}
779+
if req.Expires < 2*req.Heartbeat {
780+
return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
763781
}
764782

765783
return p.fetch(req)
766784
}
767785

768786
// FetchNoWait sends a single request to retrieve given number of messages.
769-
// If there are any messages available at the time of sending request,
770-
// FetchNoWait will return immediately.
787+
// FetchNoWait will only return messages that are available at the time of the
788+
// request. It will not wait for more messages to arrive.
771789
func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) {
772790
req := &pullRequest{
773791
Batch: batch,
@@ -842,6 +860,10 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
842860
return
843861
}
844862
p.Unlock()
863+
case err := <-sub.errs:
864+
res.err = err
865+
res.done = true
866+
return
845867
case <-time.After(req.Expires + 1*time.Second):
846868
res.done = true
847869
return

0 commit comments

Comments
 (0)