Skip to content

Circular buffer fixes #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 85 additions & 41 deletions src/rt/circular_buffer.cpp
Original file line number Diff line number Diff line change
@@ -4,22 +4,13 @@

#include "rust_internal.h"

bool
is_power_of_two(size_t value) {
if (value > 0) {
return (value & (value - 1)) == 0;
}
return false;
}

circular_buffer::circular_buffer(rust_dom *dom, size_t unit_sz) :
dom(dom),
unit_sz(unit_sz),
_buffer_sz(next_power_of_two(
INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz)),
_buffer_sz(initial_size()),
_next(0),
_unread(0),
_buffer((uint8_t *)dom->calloc(_buffer_sz)) {
_buffer((uint8_t *)dom->malloc(_buffer_sz)) {

A(dom, unit_sz, "Unit size must be larger than zero.");

@@ -39,17 +30,38 @@ circular_buffer::~circular_buffer() {
dom->free(_buffer);
}

size_t
circular_buffer::initial_size() {
I(dom, unit_sz > 0);
return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz;
}

/**
* Copies the unread data from this buffer to the "dst" address.
*/
void
circular_buffer::transfer(void *dst) {
I(dom, dst);
I(dom, is_power_of_two(_buffer_sz));
I(dom, _unread <= _buffer_sz);

uint8_t *ptr = (uint8_t *) dst;
for (size_t i = 0; i < _unread; i += unit_sz) {
memcpy(&ptr[i], &_buffer[(_next + i) & (_buffer_sz - 1)], unit_sz);

// First copy from _next to either the end of the unread
// items or the end of the buffer
size_t head_sz;
if (_next + _unread <= _buffer_sz) {
head_sz = _unread;
} else {
head_sz = _buffer_sz - _next;
}
I(dom, _next + head_sz <= _buffer_sz);
memcpy(ptr, _buffer + _next, head_sz);

// Then copy any other items from the beginning of the buffer
I(dom, _unread >= head_sz);
size_t tail_sz = _unread - head_sz;
I(dom, head_sz + tail_sz <= _buffer_sz);
memcpy(ptr + head_sz, _buffer, tail_sz);
}

/**
@@ -60,35 +72,37 @@ void
circular_buffer::enqueue(void *src) {
I(dom, src);
I(dom, _unread <= _buffer_sz);
I(dom, _buffer);

// Grow if necessary.
if (_unread == _buffer_sz) {
size_t new_buffer_sz = _buffer_sz << 1;
I(dom, new_buffer_sz <= MAX_CIRCULAR_BUFFFER_SIZE);
void *new_buffer = dom->malloc(new_buffer_sz);
transfer(new_buffer);
dom->free(_buffer);
_buffer = (uint8_t *)new_buffer;
_next = 0;
_buffer_sz = new_buffer_sz;
grow();
}

dom->log(rust_log::MEM | rust_log::COMM,
"circular_buffer enqueue "
"unread: %d, buffer_sz: %d, unit_sz: %d",
_unread, _buffer_sz, unit_sz);
"unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
_unread, _next, _buffer_sz, unit_sz);

I(dom, is_power_of_two(_buffer_sz));
I(dom, _unread < _buffer_sz);
I(dom, _unread + unit_sz <= _buffer_sz);

// Copy data
size_t i = (_next + _unread) & (_buffer_sz - 1);
memcpy(&_buffer[i], src, unit_sz);
size_t dst_idx = _next + _unread;
I(dom, dst_idx >= _buffer_sz || dst_idx + unit_sz <= _buffer_sz);
if (dst_idx >= _buffer_sz) {
dst_idx -= _buffer_sz;

I(dom, _next >= unit_sz);
I(dom, dst_idx <= _next - unit_sz);
}

I(dom, dst_idx + unit_sz <= _buffer_sz);
memcpy(&_buffer[dst_idx], src, unit_sz);
_unread += unit_sz;

dom->log(rust_log::MEM | rust_log::COMM,
"circular_buffer pushed data at index: %d", i);
"circular_buffer pushed data at index: %d", dst_idx);
}

/**
@@ -105,34 +119,53 @@ circular_buffer::dequeue(void *dst) {

dom->log(rust_log::MEM | rust_log::COMM,
"circular_buffer dequeue "
"unread: %d, buffer_sz: %d, unit_sz: %d",
_unread, _buffer_sz, unit_sz);
"unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
_unread, _next, _buffer_sz, unit_sz);

I(dom, _next + unit_sz <= _buffer_sz);
if (dst != NULL) {
memcpy(dst, &_buffer[_next], unit_sz);
}
dom->log(rust_log::MEM | rust_log::COMM,
"shifted data from index %d", _next);
_unread -= unit_sz;
_next += unit_sz;
I(dom, _next <= _buffer_sz);
if (_next == _buffer_sz) {
_next = 0;
}

// Shrink if possible.
if (_buffer_sz >= INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz &&
_unread <= _buffer_sz / 4) {
dom->log(rust_log::MEM | rust_log::COMM,
"circular_buffer is shrinking to %d bytes", _buffer_sz / 2);
void *tmp = dom->malloc(_buffer_sz / 2);
transfer(tmp);
_buffer_sz >>= 1;
dom->free(_buffer);
_buffer = (uint8_t *)tmp;
_next = 0;
if (_buffer_sz > initial_size() && _unread <= _buffer_sz / 4) {
shrink();
}
}

void
circular_buffer::grow() {
size_t new_buffer_sz = _buffer_sz * 2;
I(dom, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
dom->log(rust_log::MEM | rust_log::COMM,
"circular_buffer is growing to %d bytes", new_buffer_sz);
void *new_buffer = dom->malloc(new_buffer_sz);
transfer(new_buffer);
dom->free(_buffer);
_buffer = (uint8_t *)new_buffer;
_next = 0;
_buffer_sz = new_buffer_sz;
}

void
circular_buffer::shrink() {
size_t new_buffer_sz = _buffer_sz / 2;
I(dom, initial_size() <= new_buffer_sz);
dom->log(rust_log::MEM | rust_log::COMM,
"circular_buffer is shrinking to %d bytes", new_buffer_sz);
void *new_buffer = dom->malloc(new_buffer_sz);
transfer(new_buffer);
dom->free(_buffer);
_buffer = (uint8_t *)new_buffer;
_next = 0;
_buffer_sz = new_buffer_sz;
}

uint8_t *
@@ -149,3 +182,14 @@ size_t
circular_buffer::size() {
return _unread;
}

//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
23 changes: 18 additions & 5 deletions src/rt/circular_buffer.h
Original file line number Diff line number Diff line change
@@ -7,8 +7,8 @@

class
circular_buffer : public dom_owned<circular_buffer> {
static const size_t INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS = 8;
static const size_t MAX_CIRCULAR_BUFFFER_SIZE = 1 << 24;
static const size_t INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS = 8;
static const size_t MAX_CIRCULAR_BUFFER_SIZE = 1 << 24;

public:
rust_dom *dom;
@@ -24,9 +24,11 @@ circular_buffer : public dom_owned<circular_buffer> {
size_t size();

private:
// Size of the buffer in bytes, should always be a power of two so that
// modulo arithmetic (x % _buffer_sz) can optimized away with
// (x & (_buffer_sz - 1)).
size_t initial_size();
void grow();
void shrink();

// Size of the buffer in bytes.
size_t _buffer_sz;

// Byte offset within the buffer where to read the next unit of data.
@@ -39,4 +41,15 @@ circular_buffer : public dom_owned<circular_buffer> {
uint8_t *_buffer;
};

//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//

#endif /* CIRCULAR_BUFFER_H */
36 changes: 0 additions & 36 deletions src/test/run-pass/chan-poweroftwo.rs

This file was deleted.

122 changes: 122 additions & 0 deletions src/test/run-pass/rt-circular-buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// -*- rust -*-

// Regression tests for circular_buffer when using a unit
// that has a size that is not a power of two

use std;

import std.option;
import std._uint;
import std._vec;

// A 12-byte unit to send over the channel
type record = rec(u32 val1, u32 val2, u32 val3);

// Assuming that the default buffer size needs to hold 8 units,
// then the minimum buffer size needs to be 96. That's not a
// power of two so needs to be rounded up. Don't trigger any
// assertions.
impure fn test_init() {
let port[record] myport = port();
auto mychan = chan(myport);

let record val = rec(val1=0u32, val2=0u32, val3=0u32);

mychan <| val;
}

// Dump lots of items into the channel so it has to grow.
// Don't trigger any assertions.
impure fn test_grow() {
let port[record] myport = port();
auto mychan = chan(myport);

let record val = rec(val1=0u32, val2=0u32, val3=0u32);

for each (uint i in _uint.range(0u, 100u)) {
mychan <| val;
}
}

// Don't allow the buffer to shrink below it's original size
impure fn test_shrink1() {
let port[i8] myport = port();
auto mychan = chan(myport);

mychan <| 0i8;
auto x <- myport;
}

impure fn test_shrink2() {
let port[record] myport = port();
auto mychan = chan(myport);

let record val = rec(val1=0u32, val2=0u32, val3=0u32);

for each (uint i in _uint.range(0u, 100u)) {
mychan <| val;
}

for each (uint i in _uint.range(0u, 100u)) {
auto x <- myport;
}
}

// Test rotating the buffer when the unit size is not a power of two
impure fn test_rotate() {
let port[record] myport = port();
auto mychan = chan(myport);

for each (uint i in _uint.range(0u, 100u)) {
auto val = rec(val1=i as u32,
val2=i as u32,
val3=i as u32);
mychan <| val;

auto x <- myport;
check (x.val1 == i as u32);
check (x.val2 == i as u32);
check (x.val3 == i as u32);
}
}

// Test rotating and growing the buffer when
// the unit size is not a power of two
impure fn test_rotate_grow() {
let port[record] myport = port();
auto mychan = chan(myport);

for each (uint j in _uint.range(0u, 10u)) {
for each (uint i in _uint.range(0u, 10u)) {
let record val = rec(val1=i as u32,
val2=i as u32,
val3=i as u32);
mychan <| val;
}

for each (uint i in _uint.range(0u, 10u)) {
auto x <- myport;
check (x.val1 == i as u32);
check (x.val2 == i as u32);
check (x.val3 == i as u32);
}
}
}

impure fn main() {
test_init();
test_grow();
test_shrink1();
test_shrink2();
test_rotate();
test_rotate_grow();
}

// Local Variables:
// mode: rust;
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End: