Skip to content

Commit 0c7af02

Browse files
Zerpetlukebakken
andauthored
Add constants for Queue arguments (#145)
* Add constants for Queue arguments Queues accept optional arguments during queue declaration. Most of those arguments can be set using a policy (recommended), however, queue type must be set at queue declaration. To pave the way for Quorum and Stream queue adoption, this commit adds constants to easily set the queue type. Other common used arguments are part of this commit. Additional queue arguments can be added as needed/requested by the community. Signed-off-by: Aitor Pérez Cedres <[email protected]> * Add value even though it is not strictly necessary Signed-off-by: Aitor Pérez Cedres <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent 5bf455f commit 0c7af02

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

examples_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,3 +503,39 @@ func refreshJWToken(token string) (string, error) {
503503
// do OAuth2 things to refresh tokens
504504
return "so fresh!", nil
505505
}
506+
507+
func ExampleChannel_QueueDeclare_quorum() {
508+
conn, _ := amqp.Dial("amqp://localhost")
509+
ch, _ := conn.Channel()
510+
args := amqp.Table{ // queue args
511+
amqp.QueueTypeArg: amqp.QueueTypeQuorum,
512+
}
513+
q, _ := ch.QueueDeclare(
514+
"my-quorum-queue", // queue name
515+
true, // durable
516+
false, // auto-delete
517+
false, // exclusive
518+
false, // noWait
519+
args,
520+
)
521+
log.Printf("Declared queue: %s with arguments: %v", q.Name, args)
522+
}
523+
524+
func ExampleChannel_QueueDeclare_stream() {
525+
conn, _ := amqp.Dial("amqp://localhost")
526+
ch, _ := conn.Channel()
527+
q, _ := ch.QueueDeclare(
528+
"my-stream-queue", // queue name
529+
true, // durable
530+
false, // auto-delete
531+
false, // exclusive
532+
false, // noWait
533+
amqp.Table{ // queue args
534+
amqp.QueueTypeArg: amqp.QueueTypeStream,
535+
amqp.StreamMaxLenBytesArg: 5_000_000_000, // 5 Gb
536+
amqp.StreamMaxSegmentSizeBytesArg: 500_000_000, // 500 Mb
537+
amqp.StreamMaxAgeArg: "3D", // 3 days
538+
},
539+
)
540+
log.Printf("Declared queue: %s", q.Name)
541+
}

types.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,68 @@ type Decimal struct {
209209
Value int32
210210
}
211211

212+
// Most common queue argument keys in queue declaration. For a comprehensive list
213+
// of queue arguments, visit [RabbitMQ Queue docs].
214+
//
215+
// QueueTypeArg queue argument is used to declare quorum and stream queues.
216+
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
217+
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
218+
// Classic Queues counterparts. Check [feature comparison] docs for more
219+
// information.
220+
//
221+
// Queues can define their [max length] using QueueMaxLenArg and
222+
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
223+
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
224+
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
225+
//
226+
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
227+
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
228+
// This will set a time-to-live for **messages** in the queue.
229+
//
230+
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
231+
// maximum size of the stream. Please note that stream queues always keep, at
232+
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
233+
// to set time-based retention. Values are string with unit suffix. Valid
234+
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
235+
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
236+
// 500_000_000 bytes ~= 500 megabytes
237+
//
238+
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
239+
// [Stream retention]: https://rabbitmq.com/streams.html#retention
240+
// [max length]: https://rabbitmq.com/maxlength.html
241+
// [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl
242+
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
243+
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
244+
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
245+
const (
246+
QueueTypeArg = "x-queue-type"
247+
QueueMaxLenArg = "x-max-length"
248+
QueueMaxLenBytesArg = "x-max-length-bytes"
249+
StreamMaxLenBytesArg = "x-max-length-bytes"
250+
QueueOverflowArg = "x-overflow"
251+
QueueMessageTTLArg = "x-message-ttl"
252+
QueueTTLArg = "x-expires"
253+
StreamMaxAgeArg = "x-max-age"
254+
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
255+
)
256+
257+
// Values for queue arguments. Use as values for queue arguments during queue declaration.
258+
// The following argument table will create a classic queue, with max length set to 100 messages,
259+
// and a queue TTL of 30 minutes.
260+
// args := amqp.Table{
261+
// amqp.QueueTypeArg: QueueTypeClassic,
262+
// amqp.QueueMaxLenArg: 100,
263+
// amqp.QueueTTLArg: 1800000,
264+
// }
265+
const (
266+
QueueTypeClassic = "classic"
267+
QueueTypeQuorum = "quorum"
268+
QueueTypeStream = "stream"
269+
QueueOverflowDropHead = "drop-head"
270+
QueueOverflowRejectPublish = "reject-publish"
271+
QueueOverflowRejectPublishDLX = "reject-publish-dlx"
272+
)
273+
212274
// Table stores user supplied fields of the following types:
213275
//
214276
// bool

0 commit comments

Comments
 (0)