Skip to content

Commit 5bf455f

Browse files
fadamsZerpet
andauthored
Improve use of buffers (#142)
* Improve performance of Channel sendOpen Adding an sendUnflushed method to Connection that allows us to use the write buffer more efficiently by writing all the Frames of the message, and *then* flushing the buffer, rather than flushing each Frame. This significantly improves the performance of basicPublish for small messages where the bulk of the CPU load tends towards Syscall * Update comments in connection module Signed-off-by: Aitor Pérez Cedres <[email protected]> Co-authored-by: Aitor Pérez Cedres <[email protected]>
1 parent 1e67c9e commit 5bf455f

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

channel.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,14 +237,23 @@ func (ch *Channel) sendOpen(msg message) (err error) {
237237
return ch.sendClosed(msg)
238238
}
239239

240-
if err = ch.connection.send(&methodFrame{
240+
// We use sendUnflushed() in this method as sending the message requires
241+
// sending multiple Frames (methodFrame, headerFrame, N x bodyFrame).
242+
// Flushing after each Frame is inefficient, as it negates much of the
243+
// benefit of using a buffered writer and results in more syscalls than
244+
// necessary. Flushing buffers after every frame can have a significant
245+
// performance impact when sending (e.g. basicPublish) small messages,
246+
// so sendUnflushed() performs an *Unflushed* write, but is otherwise
247+
// equivalent to the send() method. We later use the separate flush
248+
// method to explicitly flush the buffer after all Frames are written.
249+
if err = ch.connection.sendUnflushed(&methodFrame{
241250
ChannelId: ch.id,
242251
Method: content,
243252
}); err != nil {
244253
return
245254
}
246255

247-
if err = ch.connection.send(&headerFrame{
256+
if err = ch.connection.sendUnflushed(&headerFrame{
248257
ChannelId: ch.id,
249258
ClassId: class,
250259
Size: uint64(len(body)),
@@ -259,13 +268,17 @@ func (ch *Channel) sendOpen(msg message) (err error) {
259268
j = len(body)
260269
}
261270

262-
if err = ch.connection.send(&bodyFrame{
271+
if err = ch.connection.sendUnflushed(&bodyFrame{
263272
ChannelId: ch.id,
264273
Body: body[i:j],
265274
}); err != nil {
266275
return
267276
}
268277
}
278+
279+
// Flush the buffer only after all the Frames that comprise the Message
280+
// have been written to maximise benefits of using a buffered writer.
281+
err = ch.connection.flush()
269282
} else {
270283
// If the channel is closed, use Channel.sendClosed()
271284
if ch.IsClosed() {

connection.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,66 @@ func (c *Connection) send(f frame) error {
447447
return err
448448
}
449449

450+
// sendUnflushed performs an *Unflushed* write. It is otherwise equivalent to
451+
// send(), and we provide a separate flush() function to explicitly flush the
452+
// buffer after all Frames are written.
453+
//
454+
// Why is this a thing?
455+
//
456+
// send() method uses writer.WriteFrame(), which will write the Frame then
457+
// flush the buffer. For cases like the sendOpen() method on Channel, which
458+
// sends multiple Frames (methodFrame, headerFrame, N x bodyFrame), flushing
459+
// after each Frame is inefficient as it negates much of the benefit of using a
460+
// buffered writer, and results in more syscalls than necessary. Flushing buffers
461+
// after every frame can have a significant performance impact when sending
462+
// (basicPublish) small messages, so this method performs an *Unflushed* write
463+
// but is otherwise equivalent to send() method, and we provide a separate
464+
// flush method to explicitly flush the buffer after all Frames are written.
465+
func (c *Connection) sendUnflushed(f frame) error {
466+
if c.IsClosed() {
467+
return ErrClosed
468+
}
469+
470+
c.sendM.Lock()
471+
err := f.write(c.writer.w)
472+
c.sendM.Unlock()
473+
474+
if err != nil {
475+
// shutdown could be re-entrant from signaling notify chans
476+
go c.shutdown(&Error{
477+
Code: FrameError,
478+
Reason: err.Error(),
479+
})
480+
}
481+
482+
return err
483+
}
484+
485+
// This method is intended to be used with sendUnflushed() to explicitly flush
486+
// the buffer after all required Frames have been written to the buffer.
487+
func (c *Connection) flush() (err error) {
488+
if buf, ok := c.writer.w.(*bufio.Writer); ok {
489+
err = buf.Flush()
490+
491+
// Moving send notifier to flush increases basicPublish for the small message
492+
// case. As sendUnflushed + flush is used for the case of sending semantically
493+
// related Frames (e.g. a Message like basicPublish) there is no real advantage
494+
// to sending per Frame vice per "group of related Frames" and for the case of
495+
// small messages time.Now() is (relatively) expensive.
496+
if err == nil {
497+
// Broadcast we sent a frame, reducing heartbeats, only
498+
// if there is something that can receive - like a non-reentrant
499+
// call or if the heartbeater isn't running
500+
select {
501+
case c.sends <- time.Now():
502+
default:
503+
}
504+
}
505+
}
506+
507+
return
508+
}
509+
450510
func (c *Connection) shutdown(err *Error) {
451511
atomic.StoreInt32(&c.closed, 1)
452512

0 commit comments

Comments
 (0)