Skip to content

Commit eb77634

Browse files
committed
[filedao] get recent blocks from staging buffer (#4488)
1 parent 24a8545 commit eb77634

File tree

3 files changed

+44
-37
lines changed

3 files changed

+44
-37
lines changed

blockchain/filedao/filedao_v2_util.go

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
)
2020

2121
func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
22-
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.deser)
22+
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.header.Start)
2323
blockStoreTip := fd.highestBlockOfStoreTip()
2424
for i := uint64(0); i < fd.header.BlockStoreSize; i++ {
2525
v, err := fd.kvStore.Get(_headerDataNs, byteutil.Uint64ToBytesBigEndian(i))
@@ -42,7 +42,7 @@ func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
4242
// populate to staging buffer, if the block is in latest round
4343
height := info.Block.Height()
4444
if height > blockStoreTip {
45-
if _, err = buffer.Put(stagingKey(height, fd.header), v); err != nil {
45+
if _, err = buffer.Put(height, info); err != nil {
4646
return nil, err
4747
}
4848
} else {
@@ -87,12 +87,12 @@ func (fd *fileDAOv2) putBlock(blk *block.Block) error {
8787
}
8888

8989
// add to staging buffer
90-
index := stagingKey(blk.Height(), fd.header)
91-
full, err := fd.blkBuffer.Put(index, ser)
90+
full, err := fd.blkBuffer.Put(blk.Height(), blkInfo)
9291
if err != nil {
9392
return err
9493
}
9594
if !full {
95+
index := fd.blkBuffer.slot(blk.Height())
9696
fd.batch.Put(_headerDataNs, byteutil.Uint64ToBytesBigEndian(index), blkBytes, "failed to put block")
9797
return nil
9898
}
@@ -151,11 +151,6 @@ func blockStoreKey(height uint64, header *FileHeader) uint64 {
151151
return (height - header.Start) / header.BlockStoreSize
152152
}
153153

154-
// stagingKey is the position of block in the staging buffer
155-
func stagingKey(height uint64, header *FileHeader) uint64 {
156-
return (height - header.Start) % header.BlockStoreSize
157-
}
158-
159154
// lowestBlockOfStoreTip is the lowest height of the tip of block storage
160155
// used in DeleteTipBlock(), once new tip height drops below this, the tip of block storage can be deleted
161156
func (fd *fileDAOv2) lowestBlockOfStoreTip() uint64 {
@@ -178,12 +173,7 @@ func (fd *fileDAOv2) getBlock(height uint64) (*block.Block, error) {
178173
return nil, db.ErrNotExist
179174
}
180175
// check whether block in staging buffer or not
181-
storeKey := blockStoreKey(height, fd.header)
182-
if storeKey >= fd.blkStore.Size() {
183-
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header))
184-
if err != nil {
185-
return nil, err
186-
}
176+
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil {
187177
return blkStore.Block, nil
188178
}
189179
// read from storage DB
@@ -199,12 +189,7 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) {
199189
return nil, db.ErrNotExist
200190
}
201191
// check whether block in staging buffer or not
202-
storeKey := blockStoreKey(height, fd.header)
203-
if storeKey >= fd.blkStore.Size() {
204-
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header))
205-
if err != nil {
206-
return nil, err
207-
}
192+
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil {
208193
return blkStore.Receipts, nil
209194
}
210195
// read from storage DB
@@ -215,12 +200,23 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) {
215200
return fd.deser.ReceiptsFromBlockStoreProto(blockStore)
216201
}
217202

203+
func (fd *fileDAOv2) getFromStagingBuffer(height uint64) *block.Store {
204+
if fd.loadTip().Height-height >= fd.header.BlockStoreSize {
205+
return nil
206+
}
207+
blkStore := fd.blkBuffer.Get(height)
208+
if blkStore == nil || blkStore.Block.Height() != height {
209+
return nil
210+
}
211+
return blkStore
212+
}
213+
218214
func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error) {
219215
// check whether blockStore in read cache or not
220216
storeKey := blockStoreKey(height, fd.header)
221217
if value, ok := fd.blkStorePbCache.Get(storeKey); ok {
222218
pbInfos := value.(*iotextypes.BlockStores)
223-
return pbInfos.BlockStores[stagingKey(height, fd.header)], nil
219+
return pbInfos.BlockStores[fd.blkBuffer.slot(height)], nil
224220
}
225221
// read from storage DB
226222
value, err := fd.blkStore.Get(storeKey)
@@ -240,5 +236,5 @@ func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error
240236
}
241237
// add to read cache
242238
fd.blkStorePbCache.Add(storeKey, pbStores)
243-
return pbStores.BlockStores[stagingKey(height, fd.header)], nil
239+
return pbStores.BlockStores[fd.blkBuffer.slot(height)], nil
244240
}

blockchain/filedao/staging_buffer.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package filedao
77

88
import (
9+
"sync"
10+
911
"google.golang.org/protobuf/proto"
1012

1113
"github.com/iotexproject/iotex-proto/golang/iotextypes"
@@ -15,44 +17,52 @@ import (
1517

1618
type (
1719
stagingBuffer struct {
20+
lock sync.RWMutex
1821
size uint64
22+
start uint64
1923
buffer []*block.Store
20-
deser *block.Deserializer
2124
}
2225
)
2326

24-
func newStagingBuffer(size uint64, deser *block.Deserializer) *stagingBuffer {
27+
func newStagingBuffer(size, start uint64) *stagingBuffer {
2528
return &stagingBuffer{
2629
size: size,
30+
start: start,
2731
buffer: make([]*block.Store, size),
28-
deser: deser,
2932
}
3033
}
3134

32-
func (s *stagingBuffer) Get(pos uint64) (*block.Store, error) {
33-
if pos >= s.size {
34-
return nil, ErrNotSupported
35+
func (s *stagingBuffer) Get(height uint64) *block.Store {
36+
if height < s.start {
37+
return nil
3538
}
36-
return s.buffer[pos], nil
39+
s.lock.RLock()
40+
defer s.lock.RUnlock()
41+
return s.buffer[s.slot(height)]
3742
}
3843

39-
func (s *stagingBuffer) Put(pos uint64, blkBytes []byte) (bool, error) {
40-
if pos >= s.size {
44+
func (s *stagingBuffer) Put(height uint64, blk *block.Store) (bool, error) {
45+
if height < s.start {
4146
return false, ErrNotSupported
4247
}
43-
blk, err := s.deser.DeserializeBlockStore(blkBytes)
44-
if err != nil {
45-
return false, err
46-
}
48+
pos := s.slot(height)
49+
s.lock.Lock()
50+
defer s.lock.Unlock()
4751
s.buffer[pos] = blk
4852
return pos == s.size-1, nil
4953
}
5054

55+
func (s *stagingBuffer) slot(height uint64) uint64 {
56+
return (height - s.start) % s.size
57+
}
58+
5159
func (s *stagingBuffer) Serialize() ([]byte, error) {
5260
blkStores := []*iotextypes.BlockStore{}
61+
s.lock.RLock()
5362
for _, v := range s.buffer {
5463
blkStores = append(blkStores, v.ToProto())
5564
}
65+
s.lock.RUnlock()
5666
allBlks := &iotextypes.BlockStores{
5767
BlockStores: blkStores,
5868
}

blockchain/integrity/integrity_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ func TestGetBlockHash(t *testing.T) {
532532
// disable account-based testing
533533
cfg.Chain.TrieDBPath = ""
534534
cfg.Genesis.EnableGravityChainVoting = false
535+
cfg.Genesis.AleutianBlockHeight = 2
535536
cfg.Genesis.HawaiiBlockHeight = 4
536537
cfg.Genesis.MidwayBlockHeight = 9
537538
cfg.ActPool.MinGasPriceStr = "0"
@@ -692,7 +693,7 @@ func addTestingGetBlockHash(t *testing.T, g genesis.Genesis, bc blockchain.Block
692693
bcHash, err = dao.GetBlockHash(targetHeight)
693694
require.NoError(err)
694695
}
695-
require.Equal(r.Logs()[0].Topics[0], bcHash)
696+
require.Equal(r.Logs()[0].Topics[1], bcHash)
696697
nonce++
697698
}
698699
}

0 commit comments

Comments
 (0)