forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreaders.go
More file actions
98 lines (78 loc) · 2.79 KB
/
readers.go
File metadata and controls
98 lines (78 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package vparquet
import (
"context"
"encoding/binary"
"io"
"github.com/google/uuid"
"go.uber.org/atomic"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
)
type BackendReaderAt struct {
ctx context.Context
r backend.Reader
name string
blockID uuid.UUID
tenantID string
TotalBytesRead atomic.Uint64
}
var _ io.ReaderAt = (*BackendReaderAt)(nil)
func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, blockID uuid.UUID, tenantID string) *BackendReaderAt {
return &BackendReaderAt{ctx, r, name, blockID, tenantID, atomic.Uint64{}}
}
func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) {
b.TotalBytesRead.Add(uint64(len(p)))
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false)
return len(p), err
}
func (b *BackendReaderAt) ReadAtWithCache(p []byte, off int64) (int, error) {
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, true)
return len(p), err
}
type parquetOptimizedReaderAt struct {
r io.ReaderAt
br *BackendReaderAt
readerSize int64
footerSize uint32
cacheControl common.CacheControl
cachedObjects map[int64]int64 // storing offsets and length of objects we want to cache
}
var _ io.ReaderAt = (*parquetOptimizedReaderAt)(nil)
func newParquetOptimizedReaderAt(br io.ReaderAt, rr *BackendReaderAt, size int64, footerSize uint32, cc common.CacheControl) *parquetOptimizedReaderAt {
return &parquetOptimizedReaderAt{br, rr, size, footerSize, cc, map[int64]int64{}}
}
// called by parquet-go in OpenFile() to set offset and length of footer section
func (r *parquetOptimizedReaderAt) SetFooterSection(offset, length int64) {
if r.cacheControl.Footer {
r.cachedObjects[offset] = length
}
}
// called by parquet-go in OpenFile() to set offset and length of column indexes
func (r *parquetOptimizedReaderAt) SetColumnIndexSection(offset, length int64) {
if r.cacheControl.ColumnIndex {
r.cachedObjects[offset] = length
}
}
// called by parquet-go in OpenFile() to set offset and length of offset index section
func (r *parquetOptimizedReaderAt) SetOffsetIndexSection(offset, length int64) {
if r.cacheControl.OffsetIndex {
r.cachedObjects[offset] = length
}
}
func (r *parquetOptimizedReaderAt) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 4 && off == 0 {
// Magic header
return copy(p, []byte("PAR1")), nil
}
if len(p) == 8 && off == r.readerSize-8 && r.footerSize > 0 /* not present in previous block metas */ {
// Magic footer
binary.LittleEndian.PutUint32(p, r.footerSize)
copy(p[4:8], []byte("PAR1"))
return 8, nil
}
// check if the offset and length is stored as a special object
if r.cachedObjects[off] == int64(len(p)) {
return r.br.ReadAtWithCache(p, off)
}
return r.r.ReadAt(p, off)
}