Skip to content
Merged
6 changes: 2 additions & 4 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1806,10 +1806,8 @@ public BesuControllerBuilder setupControllerBuilder() {
if (DataStorageFormat.BONSAI.equals(getDataStorageConfiguration().getDataStorageFormat())) {
final DiffBasedSubStorageConfiguration subStorageConfiguration =
getDataStorageConfiguration().getDiffBasedSubStorageConfiguration();
if (subStorageConfiguration.getLimitTrieLogsEnabled()) {
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
return besuControllerBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.requests.ProcessRequestContext;
import org.hyperledger.besu.ethereum.mainnet.requests.RequestProcessorCoordinator;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
Expand Down Expand Up @@ -102,6 +103,26 @@ public BlockProcessingResult processBlock(
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
return processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}

protected BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater,
final PreprocessingFunction preprocessingBlockFunction) {
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
long currentBlobGasUsed = 0;
Expand All @@ -127,7 +148,7 @@ public BlockProcessingResult processBlock(
.orElse(Wei.ZERO);

final Optional<PreprocessingContext> preProcessingContext =
runBlockPreProcessing(
preprocessingBlockFunction.run(
worldState,
privateMetadataUpdater,
blockHeader,
Expand Down Expand Up @@ -246,17 +267,6 @@ public BlockProcessingResult processBlock(
Optional.of(new BlockProcessingOutputs(worldState, receipts, maybeRequests)));
}

protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}

protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
final MutableWorldState worldState,
Expand Down Expand Up @@ -309,5 +319,30 @@ abstract boolean rewardCoinbase(
final boolean skipZeroBlockRewards);

public interface PreprocessingContext {}
;

public interface PreprocessingFunction {
Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice);

class NoPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.BlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
Expand All @@ -37,8 +41,13 @@
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MainnetParallelBlockProcessor extends MainnetBlockProcessor {

private static final Logger LOG = LoggerFactory.getLogger(MainnetParallelBlockProcessor.class);

private final Optional<MetricsSystem> metricsSystem;
private final Optional<Counter> confirmedParallelizedTransactionCounter;
private final Optional<Counter> conflictingButCachedTransactionCounter;
Expand Down Expand Up @@ -78,34 +87,6 @@ public MainnetParallelBlockProcessor(
"Counter for the number of conflicted transactions during block processing"));
}

@Override
protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}

@Override
protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
Expand All @@ -126,7 +107,7 @@ protected TransactionProcessingResult getTransactionProcessingResult(
(ParallelizedPreProcessingContext) preProcessingContext.get();
transactionProcessingResult =
parallelizedPreProcessingContext
.getParallelizedConcurrentTransactionProcessor()
.parallelizedConcurrentTransactionProcessor()
.applyParallelizedTransactionResult(
worldState,
miningBeneficiary,
Expand Down Expand Up @@ -154,21 +135,48 @@ protected TransactionProcessingResult getTransactionProcessingResult(
}
}

static class ParallelizedPreProcessingContext implements PreprocessingContext {
final ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor;

public ParallelizedPreProcessingContext(
final ParallelizedConcurrentTransactionProcessor
parallelizedConcurrentTransactionProcessor) {
this.parallelizedConcurrentTransactionProcessor = parallelizedConcurrentTransactionProcessor;
}

public ParallelizedConcurrentTransactionProcessor
getParallelizedConcurrentTransactionProcessor() {
return parallelizedConcurrentTransactionProcessor;
@Override
public BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
final BlockProcessingResult blockProcessingResult =
super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new ParallelTransactionPreprocessing());
if (blockProcessingResult.isFailed()) {
// Fallback to non-parallel processing if there is a block processing exception .
LOG.info(
"Block processing failed. Falling back to non-parallel processing for block #{} ({})",
blockHeader.getNumber(),
blockHeader.getBlockHash());
return super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}
return blockProcessingResult;
}

record ParallelizedPreProcessingContext(
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor)
implements PreprocessingContext {}

public static class ParallelBlockProcessorBuilder
implements ProtocolSpecBuilder.BlockProcessorBuilder {

Expand Down Expand Up @@ -196,4 +204,35 @@ public BlockProcessor apply(
metricsSystem);
}
}

class ParallelTransactionPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}
}
}
Loading