Skip to content
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ This RC is still pending burn in for mainnet.
- Implement optional sender balance checks in the layered txpool [#9176](https://github.com/hyperledger/besu/pull/9176)
- Add `--cache-last-block-headers` flag to cache the last n block headers persisted to the blockchain [#9223](https://github.com/hyperledger/besu/pull/9223)
- Manage unexpected exceptions during block creation [#9208](https://github.com/hyperledger/besu/pull/9208)
- Upgrade to execution-spec-tests v5.1.0 [#9226](https://github.com/hyperledger/besu/pull/9226)
- Add `--cache-last-block-headers-preload-enabled` flag to enable preloading the block headers cache during startup time [#9248](https://github.com/hyperledger/besu/pull/9248)
- Upgrade to execution-spec-tests v5.2.0 [#9226](https://github.com/hyperledger/besu/pull/9226), [#9242](https://github.com/hyperledger/besu/pull/9242)

### Bug fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.chainimport;

import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Service for preloading block headers from the blockchain into cache.
*
* <p>This class preloads a specified number of recent block headers by processing them in chunks to
* avoid memory exhaustion while maximizing concurrency. It uses a semaphore-based backpressure
* mechanism to control resource usage.
*/
public class BlockHeadersCachePreload {

private static final Logger LOG = LoggerFactory.getLogger(BlockHeadersCachePreload.class);
private final Blockchain blockchain;
private final EthScheduler ethScheduler;
private final int numberOfBlockHeadersToCache;

/**
* Creates a new block headers cache preloader.
*
* @param blockchain the blockchain to retrieve block headers from disk
* @param ethScheduler the scheduler for executing preload tasks
* @param numberOfBlockHeadersToCache the number of recent block headers to cache
*/
public BlockHeadersCachePreload(
final Blockchain blockchain,
final EthScheduler ethScheduler,
final int numberOfBlockHeadersToCache) {
this.blockchain = blockchain;
this.ethScheduler = ethScheduler;
this.numberOfBlockHeadersToCache = numberOfBlockHeadersToCache;
}

/**
* Preloads block headers into the cache asynchronously.
*
* @return a CompletableFuture that completes when all block headers have been successfully loaded
* into cache, or completes exceptionally if the process fails. The future returns null on
* successful completion.
*/
public CompletableFuture<Void> preloadCache() {
final BlockHeader chainHead = blockchain.getChainHeadHeader();
final long chainHeadNumber = chainHead.getNumber();
final long lastBlockToCache = Math.max(0, chainHeadNumber - numberOfBlockHeadersToCache);
final int maxConcurrent = Runtime.getRuntime().availableProcessors() * 2;
final int chunkSize = maxConcurrent * 10; // Process in reasonable chunks

return processChunksSequentially(
chainHeadNumber - 1, lastBlockToCache, chunkSize, maxConcurrent);
}

/**
* Processes block headers in sequential chunks to maintain constant memory usage.
*
* @param startBlock the highest block number to process in this and subsequent chunks
* @param endBlock the lowest block number to process (exclusive, will not be processed)
* @param chunkSize the maximum number of blocks to process in each chunk
* @param maxConcurrent the maximum number of concurrent operations within each chunk
* @return a CompletableFuture that completes when all chunks have been processed
*/
private CompletableFuture<Void> processChunksSequentially(
final long startBlock, final long endBlock, final int chunkSize, final int maxConcurrent) {
if (startBlock <= endBlock) {
return CompletableFuture.completedFuture(null);
}

long chunkEnd = Math.max(endBlock + 1, startBlock - chunkSize + 1);

return processChunk(startBlock, chunkEnd, maxConcurrent)
.thenCompose(
v -> processChunksSequentially(chunkEnd - 1, endBlock, chunkSize, maxConcurrent));
}

/**
* Processes a single chunk of block headers with controlled concurrency.
*
* @param startBlock the highest block number to process (inclusive)
* @param endBlock the lowest block number to process (inclusive)
* @param maxConcurrent the maximum number of concurrent block header retrievals
* @return a CompletableFuture that completes when all block headers in the chunk have been
* processed (successfully or with failures logged)
*/
private CompletableFuture<Void> processChunk(
final long startBlock, final long endBlock, final int maxConcurrent) {
final Semaphore semaphore = new Semaphore(maxConcurrent);
final List<CompletableFuture<Void>> futures = new ArrayList<>();

for (long blockNumber = startBlock; blockNumber >= endBlock; blockNumber--) {
final long currentBlockNumber = blockNumber;

try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
futures.forEach(future -> future.cancel(false));
break;
}

CompletableFuture<Void> future =
ethScheduler
.scheduleServiceTask(
() -> {
try {
blockchain.getBlockHeader(currentBlockNumber);
} catch (Exception e) {
LOG.warn(
"Failed to preload block header {}: {}",
currentBlockNumber,
e.getMessage());
} finally {
semaphore.release();
}
})
.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(
throwable -> {
if (throwable instanceof TimeoutException) {
LOG.warn("Timeout preloading block header {}", currentBlockNumber);
}
return null;
});

futures.add(future);
}

return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
}
6 changes: 6 additions & 0 deletions app/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ void setUserName(final String userName) {
"Specifies the number of last block headers to cache (default: ${DEFAULT-VALUE})")
private final Integer numberOfBlockHeadersToCache = 0;

@CommandLine.Option(
names = {"--cache-last-block-headers-preload-enabled"},
description = "Enable preloading of the block header cache (default: ${DEFAULT-VALUE})")
private final Boolean isCacheLastBlockHeadersPreloadEnabled = false;

@CommandLine.Option(
names = {"--cache-precompiles"},
description = "Specifies whether to cache precompile results (default: ${DEFAULT-VALUE})")
Expand Down Expand Up @@ -1830,6 +1835,7 @@ public BesuControllerBuilder setupControllerBuilder() {
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject())
.cacheLastBlocks(numberOfBlocksToCache)
.cacheLastBlockHeaders(numberOfBlockHeadersToCache)
.isCacheLastBlockHeadersPreloadEnabled(isCacheLastBlockHeadersPreloadEnabled)
.genesisStateHashCacheEnabled(genesisStateHashCacheEnabled)
.apiConfiguration(apiConfigurationSupplier.get())
.besuComponent(besuComponent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import org.hyperledger.besu.chainimport.BlockHeadersCachePreload;
import org.hyperledger.besu.components.BesuComponent;
import org.hyperledger.besu.config.CheckpointConfigOptions;
import org.hyperledger.besu.config.GenesisConfig;
Expand Down Expand Up @@ -209,6 +210,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides

private int numberOfBlocksToCache = 0;
private int numberOfBlockHeadersToCache = 0;
private boolean isCacheLastBlockHeadersPreloadEnabled;

/** whether parallel transaction processing is enabled or not */
protected boolean isParallelTxProcessingEnabled;
Expand Down Expand Up @@ -514,6 +516,19 @@ public BesuControllerBuilder cacheLastBlockHeaders(final Integer numberOfBlockHe
return this;
}

/**
* Sets whether the block header cache should be preloaded.
*
* @param isCacheLastBlockHeadersPreloadEnabled {@code true} to enable preloading of the block
* header cache, {@code false} to disable it
* @return this builder instance
*/
public BesuControllerBuilder isCacheLastBlockHeadersPreloadEnabled(
final Boolean isCacheLastBlockHeadersPreloadEnabled) {
this.isCacheLastBlockHeadersPreloadEnabled = isCacheLastBlockHeadersPreloadEnabled;
return this;
}

/**
* sets the networkConfiguration in the builder
*
Expand Down Expand Up @@ -623,6 +638,13 @@ public BesuController build() {
protocolSchedule,
codeCache);

final EthScheduler scheduler =
new EthScheduler(
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
syncConfig.getComputationParallelism(),
metricsSystem);

final MutableBlockchain blockchain =
DefaultBlockchain.createMutable(
genesisState.getBlock(),
Expand All @@ -632,6 +654,13 @@ public BesuController build() {
dataDirectory.toString(),
numberOfBlocksToCache,
numberOfBlockHeadersToCache);

if (isCacheLastBlockHeadersPreloadEnabled && numberOfBlockHeadersToCache > 0) {
LOG.info(
"--cache-last-block-headers and --cache-last-block-headers-preload-enabled are enabled, start preloading block headers cache");
preloadBlockHeaderCache(blockchain, scheduler);
}

final BonsaiCachedMerkleTrieLoader bonsaiCachedMerkleTrieLoader =
besuComponent
.map(BesuComponent::getCachedMerkleTrieLoader)
Expand Down Expand Up @@ -696,13 +725,6 @@ public BesuController build() {
final EthMessages ethMessages = new EthMessages();
final EthMessages snapMessages = new EthMessages();

final EthScheduler scheduler =
new EthScheduler(
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
syncConfig.getComputationParallelism(),
metricsSystem);

Optional<Checkpoint> checkpoint = Optional.empty();
if (genesisConfigOptions.getCheckpointOptions().isValid()) {
checkpoint =
Expand Down Expand Up @@ -881,6 +903,33 @@ public BesuController build() {
transactionSimulator);
}

private void preloadBlockHeaderCache(
final MutableBlockchain blockchain, final EthScheduler scheduler) {
final BlockHeadersCachePreload blockHeaderCachePreload =
new BlockHeadersCachePreload(blockchain, scheduler, numberOfBlockHeadersToCache);
long startTime = System.nanoTime();
blockHeaderCachePreload
.preloadCache()
.thenRun(
() -> {
long duration = System.nanoTime() - startTime;
LOG.info(
"Preloading {} block headers to the cache finished in {} ms",
numberOfBlockHeadersToCache,
duration / 1_000_000);
})
.exceptionally(
throwable -> {
long duration = System.nanoTime() - startTime;
LOG.error(
"Preloading {} block headers to the cache failed after {} ms",
numberOfBlockHeadersToCache,
duration / 1_000_000,
throwable);
return null;
});
}

private GenesisState getGenesisState(
final Optional<BlockHeader> maybeGenesisBlockHeader,
final ProtocolSchedule protocolSchedule,
Expand Down
13 changes: 13 additions & 0 deletions app/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2686,6 +2686,19 @@ public void cacheLastBlockHeadersOptionShouldWork() {
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}

@Test
public void isPreloadBlockHeadersCacheEnabledOptionShouldWork() {
boolean isPreloadBlockHeadersCacheEnabled = true;
parseCommand("--cache-last-block-headers-preload-enabled=true");
verify(mockControllerBuilder)
.isCacheLastBlockHeadersPreloadEnabled(booleanArgumentCaptor.capture());
verify(mockControllerBuilder).build();

assertThat(booleanArgumentCaptor.getValue()).isEqualTo(isPreloadBlockHeadersCacheEnabled);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}

@Test
public void genesisStateHashCacheEnabledShouldWork() throws IOException {
final Path genesisFile = createFakeGenesisFile(GENESIS_VALID_JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ public abstract class CommandTestAbstract {
@Captor protected ArgumentCaptor<String> stringArgumentCaptor;
@Captor protected ArgumentCaptor<Integer> intArgumentCaptor;
@Captor protected ArgumentCaptor<Long> longArgumentCaptor;
@Captor protected ArgumentCaptor<Boolean> booleanArgumentCaptor;
@Captor protected ArgumentCaptor<EthNetworkConfig> ethNetworkConfigArgumentCaptor;
@Captor protected ArgumentCaptor<SynchronizerConfiguration> syncConfigurationCaptor;
@Captor protected ArgumentCaptor<JsonRpcConfiguration> jsonRpcConfigArgumentCaptor;
Expand Down Expand Up @@ -304,6 +305,8 @@ public void initMocks() throws Exception {
when(mockControllerBuilder.besuComponent(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.cacheLastBlocks(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.cacheLastBlockHeaders(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.isCacheLastBlockHeadersPreloadEnabled(any()))
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.genesisStateHashCacheEnabled(any()))
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.apiConfiguration(any())).thenReturn(mockControllerBuilder);
Expand Down
1 change: 1 addition & 0 deletions app/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ rpc-max-logs-range=100
json-pretty-print-enabled=false
cache-last-blocks=512
cache-last-block-headers=5000
cache-last-block-headers-preload-enabled=true
cache-precompiles=true
rpc-gas-cap = 50000000
rpc-max-trace-filter-range=100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ private DefaultBlockchain(

this.blockchainStorage = blockchainStorage;
genesisBlock.ifPresent(block -> this.setGenesis(block, dataDirectory));

final Hash chainHead = blockchainStorage.getChainHead().get();
chainHeader = blockchainStorage.getBlockHeader(chainHead).get();
totalDifficulty = blockchainStorage.getTotalDifficulty(chainHead).get();
Expand Down Expand Up @@ -288,7 +287,7 @@ public static MutableBlockchain createMutable(
final long reorgLoggingThreshold,
final String dataDirectory,
final int numberOfBlocksToCache,
final int numberOgBlockHeadersToCache) {
final int numberOfBlockHeadersToCache) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
Expand All @@ -297,7 +296,7 @@ public static MutableBlockchain createMutable(
reorgLoggingThreshold,
dataDirectory,
numberOfBlocksToCache,
numberOgBlockHeadersToCache);
numberOfBlockHeadersToCache);
}

public static Blockchain create(
Expand Down Expand Up @@ -376,9 +375,19 @@ public Optional<BlockHeader> getBlockHeader(final long blockNumber) {
public Optional<BlockHeader> getBlockHeader(final Hash blockHeaderHash) {
return blockHeadersCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockHeader(blockHeaderHash)))
cache -> {
final BlockHeader cached = cache.getIfPresent(blockHeaderHash);
if (cached != null) {
return Optional.of(cached);
}
return blockchainStorage
.getBlockHeader(blockHeaderHash)
.map(
header -> {
cache.put(blockHeaderHash, header);
return header;
});
})
.orElseGet(() -> blockchainStorage.getBlockHeader(blockHeaderHash));
}

Expand Down