2323import org .hyperledger .besu .ethereum .trie .diffbased .common .trielog .TrieLogManager ;
2424import org .hyperledger .besu .plugin .services .trielogs .TrieLog ;
2525
26- import java .util .Comparator ;
26+ import java .util .Collections ;
27+ import java .util .Map ;
2728import java .util .Optional ;
29+ import java .util .TreeMap ;
2830import java .util .concurrent .atomic .AtomicInteger ;
2931import java .util .function .Consumer ;
3032
31- import com .google .common .collect .ArrayListMultimap ;
32- import com .google .common .collect .Multimap ;
33- import com .google .common .collect .TreeMultimap ;
3433import org .apache .tuweni .bytes .Bytes ;
3534import org .slf4j .Logger ;
3635import org .slf4j .LoggerFactory ;
@@ -49,12 +48,12 @@ public class BonsaiArchiveFreezer implements BlockAddedObserver {
4948 private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage ;
5049 private final Blockchain blockchain ;
5150 private final Consumer <Runnable > executeAsync ;
52- private static final int PRELOAD_LIMIT = 1000 ;
51+ private static final int CATCHUP_LIMIT = 1000 ;
5352 private static final int DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE = 10 ;
5453 private final TrieLogManager trieLogManager ;
5554
56- private final Multimap <Long , Hash > blocksToMoveToFreezer =
57- TreeMultimap . create ( Comparator . reverseOrder (), Comparator . naturalOrder ());
55+ private final Map <Long , Hash > pendingBlocksToArchive =
56+ Collections . synchronizedMap ( new TreeMap <> ());
5857
5958 public BonsaiArchiveFreezer (
6059 final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage ,
@@ -67,9 +66,7 @@ public BonsaiArchiveFreezer(
6766 this .trieLogManager = trieLogManager ;
6867 }
6968
70- public void initialize () {
71- // On startup there will be recent blocks whose state and storage hasn't been archived yet.
72- // Pre-load them ready for freezing state once enough new blocks have been added to the chain.
69+ private void preloadCatchupBlocks () {
7370 Optional <Long > frozenBlocksHead = Optional .empty ();
7471
7572 Optional <Long > latestFrozenBlock = rootWorldStateStorage .getLatestArchiveFrozenBlock ();
@@ -86,7 +83,7 @@ public void initialize() {
8683 if (frozenBlocksHead .isPresent ()) {
8784 int preLoadedBlocks = 0 ;
8885 Optional <Block > nextBlock = blockchain .getBlockByNumber (frozenBlocksHead .get ());
89- for (int i = 0 ; i < PRELOAD_LIMIT ; i ++) {
86+ for (int i = 0 ; i < CATCHUP_LIMIT ; i ++) {
9087 if (nextBlock .isPresent ()) {
9188 addToFreezerQueue (
9289 nextBlock .get ().getHeader ().getNumber (), nextBlock .get ().getHeader ().getHash ());
@@ -97,13 +94,27 @@ public void initialize() {
9794 }
9895 }
9996 LOG .atInfo ()
100- .setMessage ("Preloaded {} blocks to move their state and storage to the archive freezer" )
97+ .setMessage (
98+ "Preloaded {} blocks from {} to move their state and storage to the archive freezer" )
10199 .addArgument (preLoadedBlocks )
100+ .addArgument (frozenBlocksHead .get ())
102101 .log ();
103102 }
103+ }
104+
105+ public void initialize () {
106+ // On startup there will be recent blocks whose state and storage hasn't been archived yet.
107+ // Pre-load them ready for freezing state once enough new blocks have been added to the chain.
108+ preloadCatchupBlocks ();
109+
110+ // Keep catching up until we move less to the freezer than the catchup limit
111+ while (moveBlockStateToFreezer () == CATCHUP_LIMIT ) {
112+ preloadCatchupBlocks ();
113+ }
114+ }
104115
105- // Start processing any backlog on startup - don't wait for a new block to be imported.
106- moveBlockStateToFreezer ();
116+ public int getPendingBlocksCount () {
117+ return pendingBlocksToArchive . size ();
107118 }
108119
109120 public synchronized void addToFreezerQueue (final long blockNumber , final Hash blockHash ) {
@@ -113,10 +124,17 @@ public synchronized void addToFreezerQueue(final long blockNumber, final Hash bl
113124 .addArgument (blockNumber )
114125 .addArgument (blockHash )
115126 .log ();
116- blocksToMoveToFreezer .put (blockNumber , blockHash );
127+ pendingBlocksToArchive .put (blockNumber , blockHash );
128+ }
129+
130+ private synchronized void removeArchivedFromQueue (final Map <Long , Hash > archivedBlocks ) {
131+ archivedBlocks .keySet ().forEach (e -> pendingBlocksToArchive .remove (e ));
117132 }
118133
119- public synchronized int moveBlockStateToFreezer () {
134+ // Move state and storage entries from their primary DB segments to the freezer segments. This is
135+ // intended to maintain good performance for new block imports by keeping the primary DB segments
136+ // to live state only. Returns the number of state and storage entries moved.
137+ public int moveBlockStateToFreezer () {
120138 final long retainAboveThisBlock =
121139 blockchain .getChainHeadBlockNumber () - DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE ;
122140
@@ -135,100 +153,99 @@ public synchronized int moveBlockStateToFreezer() {
135153 .addArgument (retainAboveThisBlock )
136154 .log ();
137155
138- final var accountsToMove =
139- blocksToMoveToFreezer .asMap ().entrySet ().stream ()
140- .dropWhile ((e ) -> e .getKey () > retainAboveThisBlock );
156+ // Typically we will move all storage and state for a single block i.e. when a new block is
157+ // imported, move state for block-N. There are cases where we catch-up and move old state
158+ // for a number of blocks so we may iterate over a number of blocks freezing their state,
159+ // not just a single one.
141160
142- final Multimap <Long , Hash > accountStateFreezerActionsComplete = ArrayListMultimap .create ();
143- final Multimap <Long , Hash > accountStorageFreezerActionsComplete = ArrayListMultimap .create ();
161+ final Map <Long , Hash > blocksToFreeze = new TreeMap <>();
162+ pendingBlocksToArchive .entrySet ().stream ()
163+ .filter ((e ) -> e .getKey () <= retainAboveThisBlock )
164+ .forEach (
165+ (e ) -> {
166+ blocksToFreeze .put (e .getKey (), e .getValue ());
167+ });
144168
145169 // Determine which world state keys have changed in the last N blocks by looking at the
146170 // trie logs for the blocks. Then move the old keys to the freezer segment (if and only if they
147171 // have changed)
148- accountsToMove
149- .parallel ()
172+ blocksToFreeze
173+ .entrySet ()
150174 .forEach (
151175 (block ) -> {
152- for (Hash blockHash : block .getValue ()) {
153- Optional <TrieLog > trieLog = trieLogManager .getTrieLogLayer (blockHash );
154- if (trieLog .isPresent ()) {
155- trieLog
156- .get ()
157- .getAccountChanges ()
158- .forEach (
159- (address , change ) -> {
160- // Move any previous state for this account
161- frozenAccountStateCount .addAndGet (
162- rootWorldStateStorage .freezePreviousAccountState (
163- blockchain .getBlockHeader (
164- blockchain .getBlockHeader (blockHash ).get ().getParentHash ()),
165- address .addressHash ()));
166- });
167- }
168- accountStateFreezerActionsComplete .put (block .getKey (), blockHash );
176+ if (pendingBlocksToArchive .size () > 0 && pendingBlocksToArchive .size () % 100 == 0 ) {
177+ // Log progress in case catching up causes there to be a large number of keys
178+ // to move
179+ LOG .atInfo ()
180+ .setMessage ("state for blocks {} to {} archived" )
181+ .addArgument (block .getKey ())
182+ .addArgument (block .getKey () + pendingBlocksToArchive .size ())
183+ .log ();
169184 }
170- });
171-
172- final var storageToMove =
173- blocksToMoveToFreezer .asMap ().entrySet ().stream ()
174- .dropWhile ((e ) -> e .getKey () > retainAboveThisBlock );
175-
176- storageToMove
177- .parallel ()
178- .forEach (
179- (block ) -> {
180- for (Hash blockHash : block .getValue ()) {
181- Optional <TrieLog > trieLog = trieLogManager .getTrieLogLayer (blockHash );
182- if (trieLog .isPresent ()) {
183- trieLog
184- .get ()
185- .getStorageChanges ()
186- .forEach (
187- (address , storageSlotKey ) -> {
188- storageSlotKey .forEach (
189- (slotKey , slotValue ) -> {
190- // Move any previous state for this account
191- frozenAccountStorageCount .addAndGet (
192- rootWorldStateStorage .freezePreviousStorageState (
193- blockchain .getBlockHeader (
194- blockchain
195- .getBlockHeader (blockHash )
196- .get ()
197- .getParentHash ()),
198- Bytes .concatenate (
199- address .addressHash (), slotKey .getSlotHash ())));
200- });
201- });
202- }
203- accountStorageFreezerActionsComplete .put (block .getKey (), blockHash );
185+ Hash blockHash = block .getValue ();
186+ LOG .atDebug ()
187+ .setMessage ("Freezing all account state for block {}" )
188+ .addArgument (block .getKey ())
189+ .log ();
190+ Optional <TrieLog > trieLog = trieLogManager .getTrieLogLayer (blockHash );
191+ if (trieLog .isPresent ()) {
192+ trieLog
193+ .get ()
194+ .getAccountChanges ()
195+ .forEach (
196+ (address , change ) -> {
197+ // Move any previous state for this account
198+ frozenAccountStateCount .addAndGet (
199+ rootWorldStateStorage .freezePreviousAccountState (
200+ blockchain .getBlockHeader (
201+ blockchain .getBlockHeader (blockHash ).get ().getParentHash ()),
202+ address .addressHash ()));
203+ });
204+ LOG .atDebug ()
205+ .setMessage ("Freezing all storage state for block {}" )
206+ .addArgument (block .getKey ())
207+ .log ();
208+ trieLog
209+ .get ()
210+ .getStorageChanges ()
211+ .forEach (
212+ (address , storageSlotKey ) -> {
213+ storageSlotKey .forEach (
214+ (slotKey , slotValue ) -> {
215+ // Move any previous state for this account
216+ frozenAccountStorageCount .addAndGet (
217+ rootWorldStateStorage .freezePreviousStorageState (
218+ blockchain .getBlockHeader (
219+ blockchain
220+ .getBlockHeader (blockHash )
221+ .get ()
222+ .getParentHash ()),
223+ Bytes .concatenate (
224+ address .addressHash (), slotKey .getSlotHash ())));
225+ });
226+ });
204227 }
228+ LOG .atDebug ()
229+ .setMessage ("All account state and storage frozen for block {}" )
230+ .addArgument (block .getKey ())
231+ .log ();
232+ rootWorldStateStorage .setLatestArchiveFrozenBlock (block .getKey ());
205233 });
206234
207- // For us to consider all state and storage changes for a block complete, it must have been
208- // recorded in both accountState and accountStorage lists. If only one finished we need to try
209- // freezing state/storage for that block again on the next loop
210- AtomicInteger frozenBlocksCompleted = new AtomicInteger ();
211- accountStateFreezerActionsComplete
212- .keySet ()
213- .forEach (
214- (b ) -> {
215- if (accountStorageFreezerActionsComplete .containsKey (b )) {
216- frozenBlocksCompleted .getAndIncrement ();
217- rootWorldStateStorage .setLatestArchiveFrozenBlock (b );
218- blocksToMoveToFreezer .removeAll (b );
219- }
220- });
235+ LOG .atDebug ()
236+ .setMessage (
237+ "finished moving cold state to freezer storage for range (chainHeadNumber: {} - numberOfBlocksToKeepInWarmStorage: {}) = {}. Froze {} account state entries, {} account storage entries from {} blocks" )
238+ .addArgument (blockchain ::getChainHeadBlockNumber )
239+ .addArgument (DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE )
240+ .addArgument (retainAboveThisBlock )
241+ .addArgument (frozenAccountStateCount .get ())
242+ .addArgument (frozenAccountStorageCount .get ())
243+ .addArgument (blocksToFreeze .size ())
244+ .log ();
221245
222- if (frozenAccountStateCount .get () > 0 || frozenAccountStorageCount .get () > 0 ) {
223- LOG .atDebug ()
224- .setMessage ("Froze {} account state entries, {} account storage entries for {} blocks" )
225- .addArgument (frozenAccountStateCount .get ())
226- .addArgument (frozenAccountStorageCount .get ())
227- .addArgument (frozenBlocksCompleted .get ())
228- .log ();
229- }
246+ removeArchivedFromQueue (blocksToFreeze );
230247
231- return frozenBlocksCompleted .get ();
248+ return frozenAccountStateCount . get () + frozenAccountStorageCount .get ();
232249 }
233250
234251 @ Override
0 commit comments