Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
* [BUGFIX] TraceQL results caching bug for floats ending in .0 [#4539](https://github.com/grafana/tempo/pull/4539) (@carles-grafana)
* [BUGFIX] Rhythm: fix sorting order for partition consumption [#4747](https://github.com/grafana/tempo/pull/4747) (@javiermolinar)
* [BUGFIX] Rhythm: fix block builder to not reuse a block ID if it was already flushed, to prevent read errors [#4872](https://github.com/grafana/tempo/pull/4872) (@mdisibio)
* [BUGFIX] Fix rare panic during compaction. [#4915](https://github.com/grafana/tempo/pull/4915) (@joe-elliott)
* [BUGFIX] Fix metrics streaming for all non-trivial metrics [#4624](https://github.com/grafana/tempo/pull/4624) (@joe-elliott)
* [BUGFIX] Fix starting consuming log [#4539](https://github.com/grafana/tempo/pull/4539) (@javiermolinar)
* [BUGFIX] Rhythm - fix adjustment of the start and end range for livetraces blocks [#4746](https://github.com/grafana/tempo/pull/4746) (@javiermolinar)
Expand Down
150 changes: 30 additions & 120 deletions pkg/io/buffered.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package io

import (
"fmt"
"errors"
"io"
"sync"

"go.uber.org/atomic"
)

// BufferedReaderAt implements io.ReaderAt but extends and buffers reads up to the given buffer size.
// Subsequent reads are returned from the buffers. Additionally it supports concurrent readers
// by maintaining multiple buffers at different offsets, and matching up reads with existing
// buffers where possible. When needed the least-recently-used buffer is overwritten with new reads.
//
// Note that the locking effectively serializes reads to the underlying io.ReaderAt. This reader
// is not suitable for high-concurrency workloads, but does nicely reduce memory usage and backend calls
// for batch workloads.
type BufferedReaderAt struct {
mtx sync.Mutex
ra io.ReaderAt
Expand All @@ -22,7 +24,6 @@ type BufferedReaderAt struct {
}

type readerBuffer struct {
mtx sync.RWMutex
buf []byte
off int64
count int64
Expand Down Expand Up @@ -62,8 +63,16 @@ func (r *BufferedReaderAt) prep(buf *readerBuffer, offset, length int64) {
}

func (r *BufferedReaderAt) populate(buf *readerBuffer) (int, error) {
// Read
// read
n, err := r.ra.ReadAt(buf.buf, buf.off)

// if err is fatal we need to invalidate the buffer by setting it back to 0s (uninitialized)
if isFatalError(err) {
buf.buf = buf.buf[:0]
buf.count = 0
buf.off = 0
}

return n, err
}

Expand All @@ -89,23 +98,10 @@ func calculateBounds(offset, length int64, bufferSize int, readerAtSize int64) (
}

func (r *BufferedReaderAt) ReadAt(b []byte, offset int64) (int, error) {
// There are two-levels of locking: the top-level governs the
// the reader and the arrangement and position of the buffers.
// Then each individual buffer has its own lock for populating
// and reading it.

// The main reason for this is to support concurrent activity
// while solving the stampeding herd issue for fresh reads:
// The first read will prep the offset/length of the buffer
// and then switch to the buffer's write-lock while populating it.
// The second read will inspect the offset/length and know
// that it will satisfy, but by taking the read-lock will
// wait until the first call has finished populating the buffer .

r.mtx.Lock()
defer r.mtx.Unlock()

if len(r.buffers) == 0 {
r.mtx.Unlock()
return r.ra.ReadAt(b, offset)
}

Expand All @@ -128,29 +124,19 @@ func (r *BufferedReaderAt) ReadAt(b []byte, offset int64) (int, error) {
// No buffer satisfied read, overwrite least-recently-used
buf = lru

// Here we exchange the top-level lock for
// the buffer's individual write lock
buf.mtx.Lock()
defer buf.mtx.Unlock()
r.prep(buf, offset, int64(len(b)))
buf.count = r.count
r.mtx.Unlock()

if _, err := r.populate(buf); err != nil {
var err error
if _, err = r.populate(buf); isFatalError(err) {
return 0, err
}

r.read(buf, b, offset)
return len(b), nil
return len(b), err
Comment thread
joe-elliott marked this conversation as resolved.
}

// Here we exchange the top-level lock for
// the buffer's individual read lock
buf.mtx.RLock()
defer buf.mtx.RUnlock()
buf.count = r.count
r.mtx.Unlock()

r.read(buf, b, offset)
return len(b), nil
}
Expand All @@ -160,6 +146,12 @@ type BufferedWriter struct {
buf []byte
}

type BufferedWriteFlusher interface {
io.WriteCloser
Len() int
Flush() error
}

func NewBufferedWriter(w io.Writer) BufferedWriteFlusher {
return &BufferedWriter{w, nil}
}
Expand Down Expand Up @@ -190,96 +182,14 @@ func (b *BufferedWriter) Close() error {
return nil
}

type BufferedWriteFlusher interface {
io.WriteCloser
Len() int
Flush() error
}

// BufferedWriterWithQueue is an attempt at writing an async queue of outgoing data to the underlying writer.
// As a tradeoff for removing flushes from the hot path, this writer does not provide guarantees about
// bubbling up errors and takes a best effort approach to signal a failure.
//
// Note: This is not used at the moment due to concerns with error handling when writing async data to the backend.
type BufferedWriterWithQueue struct {
w io.Writer
buf []byte

flushCh chan []byte
doneCh chan struct{}
err atomic.Error
}

var _ BufferedWriteFlusher = (*BufferedWriterWithQueue)(nil)

func NewBufferedWriterWithQueue(w io.Writer) BufferedWriteFlusher {
b := &BufferedWriterWithQueue{
w: w,
buf: nil,
flushCh: make(chan []byte, 10), // todo: guess better?
doneCh: make(chan struct{}, 1),
func isFatalError(err error) bool {
if err == nil {
return false
}

go b.flushLoop()

return b
}

func (b *BufferedWriterWithQueue) Write(p []byte) (n int, err error) {
b.buf = append(b.buf, p...)
return len(p), nil
}

func (b *BufferedWriterWithQueue) Len() int {
return len(b.buf)
}

func (b *BufferedWriterWithQueue) Flush() error {
if err := b.err.Load(); err != nil {
return fmt.Errorf("error in async write using buffered writer: %w", err)
if errors.Is(err, io.EOF) {
return false
}

bufCopy := make([]byte, 0, len(b.buf))
bufCopy = append(bufCopy, b.buf...)

// reset/resize buffer
b.buf = b.buf[:0]

// will only block if the entire buffered channel is full
b.flushCh <- bufCopy
return nil
}

func (b *BufferedWriterWithQueue) flushLoop() {
defer close(b.doneCh)

// for-range will exit once channel is closed
// https://dave.cheney.net/tag/golang-3
for buf := range b.flushCh {
_, err := b.w.Write(buf)
if err != nil {
b.err.Store(err)
return
}
}
}

func (b *BufferedWriterWithQueue) Close() error {
if err := b.err.Load(); err != nil {
return err
}

var flushErr error
if len(b.buf) > 0 {
flushErr = b.Flush()
b.buf = nil
}

// close out flushLoop
close(b.flushCh)

// blocking wait on doneCh
<-b.doneCh

return flushErr
return true
}
97 changes: 73 additions & 24 deletions pkg/io/buffered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package io

import (
"bytes"
"errors"
"io"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -98,37 +99,85 @@ func TestBufferedReaderAt(t *testing.T) {
}
}

func TestBufferedReaderConcurrency(t *testing.T) {
input := make([]byte, 1024)
inputReader := bytes.NewReader(input)
func TestBufferedReaderConcurrencyAndFuzz(t *testing.T) {
const minLen = 100

r := NewBufferedReaderAt(inputReader, int64(len(input)), 50, 1)
for i := 0; i < 100; i++ {
inputLen := rand.Intn(1024) + minLen
input := make([]byte, inputLen)
inputReader := bytes.NewReader(input)

for i := 0; i < 1000; i++ {
length := rand.Intn(100)
offset := rand.Intn(len(input) - length)
b := make([]byte, length)
// write 0 -> 1023 to input
for i := range input {
input[i] = byte(i)
}

go func() {
_, err := r.ReadAt(b, int64(offset))
require.NoError(t, err)
}()
r := NewBufferedReaderAt(inputReader, int64(len(input)), 50, 1)

for i := 0; i < 1000; i++ {
go func() {
length := rand.Intn(minLen)
offset := rand.Intn(len(input) - length)

b := make([]byte, length)
_, err := r.ReadAt(b, int64(offset))
require.NoError(t, err)
// require actual to be expected
require.Equal(t, input[offset:offset+length], b)
}()
}
}
}

func TestBufferedWriterWithQueueWritesToBackend(t *testing.T) {
buf := bytes.NewBuffer(make([]byte, 0, 10))
type erroringReaderAt struct {
err error
}

b := NewBufferedWriterWithQueue(buf)
func (e *erroringReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
if e.err != nil && !errors.Is(e.err, io.EOF) {
// set all bytes to 0
for i := range p {
p[i] = 0
}
return 0, e.err
}

n, err := b.Write([]byte{0x01})
require.NoError(t, err)
require.Equal(t, 1, n)
// set the bytes to the offset
for i := range p {
p[i] = byte(off + int64(i))
}

return len(p), e.err
}

require.NoError(t, b.Flush())
require.NoError(t, b.Close())
func TestBufferedReaderInvalidatesBufferOnErr(t *testing.T) {
erroringReaderAt := &erroringReaderAt{
err: nil,
}

r := NewBufferedReaderAt(erroringReaderAt, 100, 50, 1)

// eventual consistency :)
time.Sleep(100 * time.Millisecond)
require.Equal(t, []byte{0x01}, buf.Bytes())
// force the reader to return an error
erroringReaderAt.err = errors.New("error")
actual := make([]byte, 10)
read, err := r.ReadAt(actual, 0)
require.Error(t, err)
require.Equal(t, 0, read)
require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, actual) // first 10 bytes should be zeroed

// clear the error and read the first 10 bytes again
erroringReaderAt.err = nil
actual = make([]byte, 10)
read, err = r.ReadAt(actual, 0)
require.NoError(t, err)
require.Equal(t, 10, read)
require.Equal(t, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, actual) // first 10 bytes should be read

// force the reader to return io.EOF and see it handled correctly
erroringReaderAt.err = io.EOF
actual = make([]byte, 10)
read, err = r.ReadAt(actual, 90)
require.ErrorIs(t, err, io.EOF)
require.Equal(t, 10, read)
require.Equal(t, []byte{90, 91, 92, 93, 94, 95, 96, 97, 98, 99}, actual) // last 10 bytes should be read
}
6 changes: 4 additions & 2 deletions tempodb/encoding/vparquet4/block_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
)

func (b *backendBlock) open(ctx context.Context) (*parquet.File, *parquet.Reader, error) { //nolint:all //deprecated
// openForIteration opens and returns the parquet file. Note that this is currently only used for compaction and is tuned for this kind of workload.
// In particular it uses the tempo_io.BufferedReaderAt which is slower but uses less memory and incurs fewer backend calls.
func (b *backendBlock) openForIteration(ctx context.Context) (*parquet.File, *parquet.Reader, error) { //nolint:all //deprecated
rr := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta)

// 128 MB memory buffering
Expand All @@ -36,7 +38,7 @@ func (b *backendBlock) open(ctx context.Context) (*parquet.File, *parquet.Reader
}

func (b *backendBlock) rawIter(ctx context.Context, pool *rowPool) (*rawIterator, error) {
pf, r, err := b.open(ctx)
pf, r, err := b.openForIteration(ctx)
if err != nil {
return nil, err
}
Expand Down