Skip to content

Commit c8147a8

Browse files
matktNickSneo
authored andcommitted
Fix snapsync heal (besu-eth#5838)
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
1 parent a7f8f51 commit c8147a8

15 files changed

+415
-144
lines changed

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
8888
.getAccountToRepair()
8989
.ifPresent(
9090
address ->
91-
snapContext.addAccountsToBeRepaired(
91+
snapContext.addAccountToHealingList(
9292
CompactEncoding.bytesToPath(address.addressHash())));
9393
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
9494
&& protocolContext.getBlockchain().getChainHeadBlockNumber()

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static Optional<FastSyncDownloader<?>> createSnapDownloader(
8383
.getAccountToRepair()
8484
.ifPresent(
8585
address ->
86-
snapContext.addAccountsToBeRepaired(
86+
snapContext.addAccountToHealingList(
8787
CompactEncoding.bytesToPath(address.addressHash())));
8888
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
8989
&& protocolContext.getBlockchain().getChainHeadBlockNumber()

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java

Lines changed: 75 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountFlatDatabaseHealingRangeRequest;
2929
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageFlatDatabaseHealingRangeRequest;
3030
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
31-
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
3231
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
3332
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
3433
import org.hyperledger.besu.metrics.BesuMetricCategory;
@@ -43,6 +42,7 @@
4342
import java.util.List;
4443
import java.util.Map;
4544
import java.util.OptionalLong;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4646
import java.util.function.Consumer;
4747
import java.util.function.Predicate;
4848
import java.util.stream.Stream;
@@ -72,7 +72,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
7272

7373
protected final InMemoryTasksPriorityQueues<SnapDataRequest>
7474
pendingStorageFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>();
75-
private HashSet<Bytes> accountsToBeRepaired = new HashSet<>();
75+
private HashSet<Bytes> accountsHealingList = new HashSet<>();
7676
private DynamicPivotBlockSelector pivotBlockSelector;
7777

7878
private final SnapSyncStatePersistenceManager snapContext;
@@ -156,6 +156,7 @@ protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
156156
@Override
157157
public synchronized boolean checkCompletion(final BlockHeader header) {
158158

159+
// Check if all snapsync tasks are completed
159160
if (!internalFuture.isDone()
160161
&& pendingAccountRequests.allTasksCompleted()
161162
&& pendingCodeRequests.allTasksCompleted()
@@ -164,29 +165,50 @@ public synchronized boolean checkCompletion(final BlockHeader header) {
164165
&& pendingTrieNodeRequests.allTasksCompleted()
165166
&& pendingAccountFlatDatabaseHealingRequests.allTasksCompleted()
166167
&& pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) {
168+
169+
// if all snapsync tasks are completed and the healing process was not running
167170
if (!snapSyncState.isHealTrieInProgress()) {
171+
// Register blockchain observer if not already registered
172+
blockObserverId =
173+
blockObserverId.isEmpty()
174+
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
175+
: blockObserverId;
176+
// Start the healing process
168177
startTrieHeal();
169-
} else if (pivotBlockSelector.isBlockchainBehind()) {
178+
}
179+
// if all snapsync tasks are completed and the healing was running and blockchain is behind
180+
// the pivot block
181+
else if (pivotBlockSelector.isBlockchainBehind()) {
170182
LOG.info("Pausing world state download while waiting for sync to complete");
171-
if (blockObserverId.isEmpty())
172-
blockObserverId = OptionalLong.of(blockchain.observeBlockAdded(getBlockAddedListener()));
183+
// Set the snapsync to wait for the blockchain to catch up
173184
snapSyncState.setWaitingBlockchain(true);
174-
} else if (!snapSyncState.isHealFlatDatabaseInProgress()
175-
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
176-
// only doing a flat db heal for bonsai
177-
startFlatDatabaseHeal(header);
178-
} else {
179-
final WorldStateStorage.Updater updater = worldStateStorage.updater();
180-
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
181-
updater.commit();
182-
metricsManager.notifySnapSyncCompleted();
183-
snapContext.clear();
184-
internalFuture.complete(null);
185-
186-
return true;
185+
}
186+
// if all snapsync tasks are completed and the healing was running and the blockchain is not
187+
// behind the pivot block
188+
else {
189+
// Remove the blockchain observer
190+
blockObserverId.ifPresent(blockchain::removeObserver);
191+
// If the flat database healing process is not in progress and the flat database mode is
192+
// FULL
193+
if (!snapSyncState.isHealFlatDatabaseInProgress()
194+
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
195+
// Start the flat database healing process
196+
startFlatDatabaseHeal(header);
197+
}
198+
// If the flat database healing process is in progress or the flat database mode is not FULL
199+
else {
200+
final WorldStateStorage.Updater updater = worldStateStorage.updater();
201+
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
202+
updater.commit();
203+
// Notify that the snap sync has completed
204+
metricsManager.notifySnapSyncCompleted();
205+
// Clear the snap context
206+
snapContext.clear();
207+
internalFuture.complete(null);
208+
return true;
209+
}
187210
}
188211
}
189-
190212
return false;
191213
}
192214

@@ -200,10 +222,11 @@ protected synchronized void cleanupQueues() {
200222
pendingTrieNodeRequests.clear();
201223
}
202224

225+
/** Method to start the healing process of the trie */
203226
public synchronized void startTrieHeal() {
204227
snapContext.clearAccountRangeTasks();
205228
snapSyncState.setHealTrieStatus(true);
206-
// try to find new pivot block before healing
229+
// Try to find a new pivot block before starting the healing process
207230
pivotBlockSelector.switchToNewPivotBlock(
208231
(blockHeader, newPivotBlockFound) -> {
209232
snapContext.clearAccountRangeTasks();
@@ -212,21 +235,25 @@ public synchronized void startTrieHeal() {
212235
blockHeader.getNumber());
213236
enqueueRequest(
214237
createAccountTrieNodeDataRequest(
215-
blockHeader.getStateRoot(), Bytes.EMPTY, accountsToBeRepaired));
238+
blockHeader.getStateRoot(), Bytes.EMPTY, accountsHealingList));
216239
});
217240
}
218241

242+
/** Method to reload the healing process of the trie */
219243
public synchronized void reloadTrieHeal() {
244+
// Clear the flat database and trie log from the world state storage if needed
220245
worldStateStorage.clearFlatDatabase();
221246
worldStateStorage.clearTrieLog();
247+
// Clear pending trie node and code requests
222248
pendingTrieNodeRequests.clear();
223249
pendingCodeRequests.clear();
250+
224251
snapSyncState.setHealTrieStatus(false);
225252
checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
226253
}
227254

228255
public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
229-
LOG.info("Running flat database heal process");
256+
LOG.info("Initiating the healing process for the flat database");
230257
snapSyncState.setHealFlatDatabaseInProgress(true);
231258
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
232259
ranges.forEach(
@@ -235,10 +262,6 @@ public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
235262
createAccountFlatHealingRangeRequest(header.getStateRoot(), key, value)));
236263
}
237264

238-
public boolean isBonsaiStorageFormat() {
239-
return worldStateStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI);
240-
}
241-
242265
@Override
243266
public synchronized void enqueueRequest(final SnapDataRequest request) {
244267
if (!internalFuture.isDone()) {
@@ -263,8 +286,8 @@ public synchronized void enqueueRequest(final SnapDataRequest request) {
263286
}
264287
}
265288

266-
public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsToBeRepaired) {
267-
this.accountsToBeRepaired = accountsToBeRepaired;
289+
public synchronized void setAccountsHealingList(final HashSet<Bytes> addAccountToHealingList) {
290+
this.accountsHealingList = addAccountToHealingList;
268291
}
269292

270293
/**
@@ -274,15 +297,15 @@ public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsTo
274297
*
275298
* @param account The account to be added for repair.
276299
*/
277-
public synchronized void addAccountsToBeRepaired(final Bytes account) {
278-
if (!accountsToBeRepaired.contains(account)) {
279-
snapContext.addAccountsToBeRepaired(account);
280-
accountsToBeRepaired.add(account);
300+
public synchronized void addAccountToHealingList(final Bytes account) {
301+
if (!accountsHealingList.contains(account)) {
302+
snapContext.addAccountToHealingList(account);
303+
accountsHealingList.add(account);
281304
}
282305
}
283306

284-
public HashSet<Bytes> getAccountsToBeRepaired() {
285-
return accountsToBeRepaired;
307+
public HashSet<Bytes> getAccountsHealingList() {
308+
return accountsHealingList;
286309
}
287310

288311
@Override
@@ -385,25 +408,25 @@ public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSele
385408
this.pivotBlockSelector = pivotBlockSelector;
386409
}
387410

388-
public BlockAddedObserver getBlockAddedListener() {
411+
public BlockAddedObserver createBlockchainObserver() {
389412
return addedBlockContext -> {
390-
if (snapSyncState.isWaitingBlockchain()) {
391-
// if we receive a new pivot block we can restart the heal
392-
pivotBlockSelector.check(
393-
(____, isNewPivotBlock) -> {
394-
if (isNewPivotBlock) {
395-
snapSyncState.setWaitingBlockchain(false);
396-
}
397-
});
398-
// if we are close to the head we can also restart the heal and finish snapsync
399-
if (!pivotBlockSelector.isBlockchainBehind()) {
400-
snapSyncState.setWaitingBlockchain(false);
401-
}
402-
if (!snapSyncState.isWaitingBlockchain()) {
403-
blockObserverId.ifPresent(blockchain::removeObserver);
404-
blockObserverId = OptionalLong.empty();
405-
reloadTrieHeal();
406-
}
413+
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
414+
pivotBlockSelector.check(
415+
(____, isNewPivotBlock) -> {
416+
if (isNewPivotBlock) {
417+
foundNewPivotBlock.set(true);
418+
}
419+
});
420+
421+
final boolean isNewPivotBlockFound = foundNewPivotBlock.get();
422+
final boolean isBlockchainCaughtUp =
423+
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();
424+
425+
if (isNewPivotBlockFound
426+
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
427+
// head again
428+
snapSyncState.setWaitingBlockchain(false);
429+
reloadTrieHeal();
407430
}
408431
};
409432
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,10 @@ public CompletableFuture<Void> run(
153153

154154
final List<AccountRangeDataRequest> currentAccountRange =
155155
snapContext.getCurrentAccountRange();
156-
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsToBeRepaired();
156+
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsHealingList();
157157

158158
if (!currentAccountRange.isEmpty()) { // continue to download worldstate ranges
159-
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
159+
newDownloadState.setAccountsHealingList(inconsistentAccounts);
160160
snapContext
161161
.getCurrentAccountRange()
162162
.forEach(
@@ -165,14 +165,14 @@ public CompletableFuture<Void> run(
165165
DOWNLOAD, snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
166166
newDownloadState.enqueueRequest(snapDataRequest);
167167
});
168-
} else if (!snapContext.getAccountsToBeRepaired().isEmpty()) { // restart only the heal step
168+
} else if (!snapContext.getAccountsHealingList().isEmpty()) { // restart only the heal step
169169
snapSyncState.setHealTrieStatus(true);
170170
worldStateStorage.clearFlatDatabase();
171171
worldStateStorage.clearTrieLog();
172-
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
172+
newDownloadState.setAccountsHealingList(inconsistentAccounts);
173173
newDownloadState.enqueueRequest(
174174
SnapDataRequest.createAccountTrieNodeDataRequest(
175-
stateRoot, Bytes.EMPTY, snapContext.getAccountsToBeRepaired()));
175+
stateRoot, Bytes.EMPTY, snapContext.getAccountsHealingList()));
176176
} else {
177177
// start from scratch
178178
worldStateStorage.clear();

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/context/SnapSyncStatePersistenceManager.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
*/
4242
public class SnapSyncStatePersistenceManager {
4343

44-
private final byte[] SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX =
44+
private final byte[] SNAP_ACCOUNT_HEALING_LIST_INDEX =
4545
"snapInconsistentAccountsStorageIndex".getBytes(StandardCharsets.UTF_8);
4646

4747
private final GenericKeyValueStorageFacade<BigInteger, AccountRangeDataRequest>
@@ -104,20 +104,20 @@ public void updatePersistedTasks(final List<? extends SnapDataRequest> accountRa
104104
}
105105

106106
/**
107-
* Persists the current accounts to be repaired in the database.
107+
* Persists the current accounts to heal in the database.
108108
*
109-
* @param accountsToBeRepaired The current list of accounts to persist.
109+
* @param accountsHealingList The current list of accounts to heal.
110110
*/
111-
public void addAccountsToBeRepaired(final Bytes accountsToBeRepaired) {
111+
public void addAccountToHealingList(final Bytes accountsHealingList) {
112112
final BigInteger index =
113113
healContext
114-
.get(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX)
114+
.get(SNAP_ACCOUNT_HEALING_LIST_INDEX)
115115
.map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE))
116116
.orElse(BigInteger.ZERO);
117117
healContext.putAll(
118118
keyValueStorageTransaction -> {
119-
keyValueStorageTransaction.put(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX, index.toByteArray());
120-
keyValueStorageTransaction.put(index.toByteArray(), accountsToBeRepaired.toArrayUnsafe());
119+
keyValueStorageTransaction.put(SNAP_ACCOUNT_HEALING_LIST_INDEX, index.toByteArray());
120+
keyValueStorageTransaction.put(index.toByteArray(), accountsHealingList.toArrayUnsafe());
121121
});
122122
}
123123

@@ -127,9 +127,9 @@ public List<AccountRangeDataRequest> getCurrentAccountRange() {
127127
.collect(Collectors.toList());
128128
}
129129

130-
public HashSet<Bytes> getAccountsToBeRepaired() {
130+
public HashSet<Bytes> getAccountsHealingList() {
131131
return healContext
132-
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX))
132+
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_HEALING_LIST_INDEX))
133133
.collect(Collectors.toCollection(HashSet::new));
134134
}
135135

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ public void addResponse(
124124
if (!slots.isEmpty() || !proofs.isEmpty()) {
125125
if (!worldStateProofProvider.isValidRangeProof(
126126
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
127+
// If the proof is invalid, it means that the storage will be a mix of several blocks.
128+
// Therefore, it will be necessary to heal the account's storage subsequently
129+
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
130+
// We will request the new storage root of the account because it is apparently no longer
131+
// valid with the new pivot block.
127132
downloadState.enqueueRequest(
128133
createAccountDataRequest(
129134
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
@@ -173,7 +178,7 @@ public Stream<SnapDataRequest> getChildRequests(
173178
});
174179
if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) {
175180
// need to heal this account storage
176-
downloadState.addAccountsToBeRepaired(CompactEncoding.bytesToPath(accountHash));
181+
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
177182
}
178183
});
179184

0 commit comments

Comments
 (0)