Skip to content

Commit 8f5d67b

Browse files
committed
merge pruning block for fleet mode
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
2 parents cf5138e + 9a3166f commit 8f5d67b

File tree

4 files changed

+77
-18
lines changed

4 files changed

+77
-18
lines changed

besu/src/main/java/org/hyperledger/besu/cli/options/unstable/ChainPruningOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class ChainPruningOptions implements CLIOptions<ChainPrunerConfiguration>
3131
"--Xchain-pruning-blocks-retained";
3232
private static final String CHAIN_PRUNING_FREQUENCY_FLAG = "--Xchain-pruning-frequency";
3333
/** The constant DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED. */
34-
public static final long DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED = 7200;
34+
public static final long DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED = 512;
3535
/** The constant DEFAULT_CHAIN_DATA_PRUNING_FREQUENCY. */
3636
public static final int DEFAULT_CHAIN_DATA_PRUNING_FREQUENCY = 256;
3737

besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -629,22 +629,6 @@ public BesuController build() {
629629
transactionSelectorFactory);
630630
validateContext(protocolContext);
631631

632-
if (chainPrunerConfiguration.getChainPruningEnabled()) {
633-
protocolContext
634-
.safeConsensusContext(MergeContext.class)
635-
.ifPresent(
636-
mergeContext -> {
637-
mergeContext.setIsChainPruningEnabled(true);
638-
});
639-
final ChainDataPruner chainDataPruner = createChainPruner(blockchainStorage);
640-
blockchain.observeBlockAdded(chainDataPruner);
641-
LOG.info(
642-
"Chain data pruning enabled with recent blocks retained to be: "
643-
+ chainPrunerConfiguration.getChainPruningBlocksRetained()
644-
+ " and frequency to be: "
645-
+ chainPrunerConfiguration.getChainPruningBlocksFrequency());
646-
}
647-
648632
protocolSchedule.setPublicWorldStateArchiveForPrivacyBlockProcessor(
649633
protocolContext.getWorldStateArchive());
650634

@@ -715,6 +699,22 @@ public BesuController build() {
715699
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
716700
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);
717701

702+
if (chainPrunerConfiguration.getChainPruningEnabled()) {
703+
protocolContext
704+
.safeConsensusContext(MergeContext.class)
705+
.ifPresent(
706+
mergeContext -> {
707+
mergeContext.setIsChainPruningEnabled(true);
708+
});
709+
final ChainDataPruner chainDataPruner = createChainPruner(syncState, blockchainStorage);
710+
blockchain.observeBlockAdded(chainDataPruner);
711+
LOG.info(
712+
"Chain data pruning enabled with recent blocks retained to be: "
713+
+ chainPrunerConfiguration.getChainPruningBlocksRetained()
714+
+ " and frequency to be: "
715+
+ chainPrunerConfiguration.getChainPruningBlocksFrequency());
716+
}
717+
718718
final TransactionPool transactionPool =
719719
TransactionPoolFactory.createTransactionPool(
720720
protocolSchedule,
@@ -1089,14 +1089,16 @@ WorldStateArchive createWorldStateArchive(
10891089
}
10901090
}
10911091

1092-
private ChainDataPruner createChainPruner(final BlockchainStorage blockchainStorage) {
1092+
private ChainDataPruner createChainPruner(
1093+
final SyncState syncState, final BlockchainStorage blockchainStorage) {
10931094
return new ChainDataPruner(
10941095
blockchainStorage,
10951096
new ChainDataPrunerStorage(
10961097
storageProvider.getStorageBySegmentIdentifier(
10971098
KeyValueSegmentIdentifier.CHAIN_PRUNER_STATE)),
10981099
chainPrunerConfiguration.getChainPruningBlocksRetained(),
10991100
chainPrunerConfiguration.getChainPruningBlocksFrequency(),
1101+
syncState::isInitialSyncPhaseDone,
11001102
MonitoredExecutors.newBoundedThreadPool(
11011103
ChainDataPruner.class.getSimpleName(),
11021104
1,

ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPruner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Collection;
2222
import java.util.concurrent.ExecutorService;
23+
import java.util.function.Supplier;
2324

2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -32,22 +33,29 @@ public class ChainDataPruner implements BlockAddedObserver {
3233
private final long blocksToRetain;
3334
private final long pruningFrequency;
3435
private final ExecutorService pruningExecutor;
36+
private final Supplier<Boolean> isInitialSyncPhaseDoneSupplier;
3537

3638
public ChainDataPruner(
3739
final BlockchainStorage blockchainStorage,
3840
final ChainDataPrunerStorage prunerStorage,
3941
final long blocksToRetain,
4042
final long pruningFrequency,
43+
final Supplier<Boolean> isInitialSyncPhaseDoneSupplier,
4144
final ExecutorService pruningExecutor) {
4245
this.blockchainStorage = blockchainStorage;
4346
this.prunerStorage = prunerStorage;
4447
this.blocksToRetain = blocksToRetain;
4548
this.pruningFrequency = pruningFrequency;
49+
this.isInitialSyncPhaseDoneSupplier = isInitialSyncPhaseDoneSupplier;
4650
this.pruningExecutor = pruningExecutor;
4751
}
4852

4953
@Override
5054
public void onBlockAdded(final BlockAddedEvent event) {
55+
if (!isInitialSyncPhaseDoneSupplier.get()) {
56+
// not run block pruning because the initial synchronization is still in progress.
57+
return;
58+
}
5159
final long blockNumber = event.getBlock().getHeader().getNumber();
5260
final long storedPruningMark = prunerStorage.getPruningMark().orElse(blockNumber);
5361
if (blockNumber < storedPruningMark) {

ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public void singleChainPruning() {
4848
new ChainDataPrunerStorage(new InMemoryKeyValueStorage()),
4949
512,
5050
0,
51+
() -> true, // Set to 'true' to indicate that the initial synchronization phase has
52+
// completed
5153
new BlockingExecutor());
5254
Block genesisBlock = gen.genesisBlock();
5355
final MutableBlockchain blockchain =
@@ -86,6 +88,8 @@ public void forkPruning() {
8688
new ChainDataPrunerStorage(new InMemoryKeyValueStorage()),
8789
512,
8890
0,
91+
() -> true, // Set to 'true' to indicate that the initial synchronization phase has
92+
// completed
8993
new BlockingExecutor());
9094
Block genesisBlock = gen.genesisBlock();
9195
final MutableBlockchain blockchain =
@@ -117,6 +121,51 @@ public void forkPruning() {
117121
}
118122
}
119123

124+
@Test
125+
public void disablePruningWhenInitialSyncPhaseRunning() {
126+
final BlockDataGenerator gen = new BlockDataGenerator();
127+
final BlockchainStorage blockchainStorage =
128+
new KeyValueStoragePrefixedKeyBlockchainStorage(
129+
new InMemoryKeyValueStorage(),
130+
new VariablesKeyValueStorage(new InMemoryKeyValueStorage()),
131+
new MainnetBlockHeaderFunctions());
132+
final ChainDataPruner chainDataPruner =
133+
new ChainDataPruner(
134+
blockchainStorage,
135+
new ChainDataPrunerStorage(new InMemoryKeyValueStorage()),
136+
512,
137+
0,
138+
() -> false, // Set to 'false' to indicate that the initial synchronization phase is
139+
// running.
140+
new BlockingExecutor());
141+
Block genesisBlock = gen.genesisBlock();
142+
final MutableBlockchain blockchain =
143+
DefaultBlockchain.createMutable(
144+
genesisBlock, blockchainStorage, new NoOpMetricsSystem(), 0);
145+
blockchain.observeBlockAdded(chainDataPruner);
146+
147+
List<Block> canonicalChain = gen.blockSequence(genesisBlock, 1000);
148+
List<Block> forkChain = gen.blockSequence(genesisBlock, 16);
149+
for (Block blk : forkChain) {
150+
blockchain.storeBlock(blk, gen.receipts(blk));
151+
}
152+
for (int i = 0; i < 512; i++) {
153+
Block blk = canonicalChain.get(i);
154+
blockchain.appendBlock(blk, gen.receipts(blk));
155+
}
156+
// No prune happened
157+
assertThat(blockchain.getBlockByHash(canonicalChain.get(0).getHash())).isPresent();
158+
assertThat(blockchain.getBlockByHash(forkChain.get(0).getHash())).isPresent();
159+
for (int i = 512; i < 527; i++) {
160+
Block blk = canonicalChain.get(i);
161+
blockchain.appendBlock(blk, gen.receipts(blk));
162+
assertThat(blockchain.getBlockByHash(canonicalChain.get(i - 512).getHash())).isPresent();
163+
assertThat(blockchain.getBlockByHash(canonicalChain.get(i - 511).getHash())).isPresent();
164+
assertThat(blockchain.getBlockByHash(canonicalChain.get(i - 512).getHash())).isPresent();
165+
assertThat(blockchain.getBlockByHash(forkChain.get(i - 511).getHash())).isPresent();
166+
}
167+
}
168+
120169
protected static class BlockingExecutor extends AbstractExecutorService {
121170
@Override
122171
public void shutdown() {}

0 commit comments

Comments
 (0)