Skip to content

Commit a494a9f

Browse files
committed
Avoid keeping txpool lock during block creation
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
1 parent 2d59f4d commit a494a9f

File tree

14 files changed

+191
-131
lines changed

14 files changed

+191
-131
lines changed

ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private List<AbstractTransactionSelector> createTransactionSelectors(
164164
public TransactionSelectionResults buildTransactionListForBlock() {
165165
LOG.atDebug()
166166
.setMessage("Transaction pool stats {}")
167-
.addArgument(blockSelectionContext.transactionPool().logStats())
167+
.addArgument(blockSelectionContext.transactionPool()::logStats)
168168
.log();
169169
timeLimitedSelection();
170170
LOG.atTrace()

ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/AbstractIsolationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ public abstract class AbstractIsolationTests {
137137
txPoolMetrics,
138138
transactionReplacementTester,
139139
new BlobCache(),
140-
MiningParameters.newDefault()));
140+
MiningParameters.newDefault()),
141+
ethScheduler);
141142

142143
protected final List<GenesisAllocation> accounts =
143144
GenesisConfigFile.fromResource("/dev.json")

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ private static PendingTransactions createLayeredPendingTransactions(
357357
miningParameters);
358358
}
359359

360-
return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter);
360+
return new LayeredPendingTransactions(
361+
transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler);
361362
}
362363
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
2525
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
2626

27+
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.NavigableMap;
3031
import java.util.TreeSet;
3132
import java.util.function.BiFunction;
3233
import java.util.function.Predicate;
33-
import java.util.stream.Stream;
3434

3535
/**
3636
* Holds the current set of executable pending transactions, that are candidate for inclusion on
@@ -168,8 +168,16 @@ protected int[] getRemainingPromotionsPerType() {
168168
}
169169

170170
@Override
171-
public Stream<PendingTransaction> stream() {
172-
return orderByFee.descendingSet().stream();
171+
public List<SenderPendingTransactions> getBySender() {
172+
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
173+
return orderByFee.descendingSet().stream()
174+
.map(PendingTransaction::getSender)
175+
.filter(sendersToAdd::remove)
176+
.map(
177+
sender ->
178+
new SenderPendingTransactions(
179+
sender, List.copyOf(txsBySender.get(sender).values())))
180+
.toList();
173181
}
174182

175183
@Override

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515
package org.hyperledger.besu.ethereum.eth.transactions.layered;
1616

17+
import static java.util.Collections.unmodifiableList;
1718
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
1819
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
1920
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
@@ -54,7 +55,6 @@
5455
import java.util.function.BinaryOperator;
5556
import java.util.function.Function;
5657
import java.util.stream.Collectors;
57-
import java.util.stream.Stream;
5858

5959
import org.slf4j.Logger;
6060
import org.slf4j.LoggerFactory;
@@ -138,6 +138,8 @@ public boolean contains(final Transaction transaction) {
138138
|| nextLayer.contains(transaction);
139139
}
140140

141+
public abstract List<SenderPendingTransactions> getBySender();
142+
141143
@Override
142144
public List<PendingTransaction> getAll() {
143145
final List<PendingTransaction> allNextLayers = nextLayer.getAll();
@@ -548,17 +550,17 @@ public List<Transaction> getAllPriority() {
548550
return priorityTxs;
549551
}
550552

551-
Stream<PendingTransaction> stream(final Address sender) {
552-
return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
553-
}
554-
555553
@Override
556-
public List<PendingTransaction> getAllFor(final Address sender) {
557-
return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
554+
public synchronized List<PendingTransaction> getAllFor(final Address sender) {
555+
final var fromNextLayers = nextLayer.getAllFor(sender);
556+
final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values();
557+
final var concatLayers =
558+
new ArrayList<PendingTransaction>(fromThisLayer.size() + fromNextLayers.size());
559+
concatLayers.addAll(fromThisLayer);
560+
concatLayers.addAll(fromNextLayers);
561+
return unmodifiableList(concatLayers);
558562
}
559563

560-
abstract Stream<PendingTransaction> stream();
561-
562564
@Override
563565
public int count() {
564566
return pendingTransactions.size() + nextLayer.count();

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java

Lines changed: 53 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.hyperledger.besu.datatypes.Hash;
2828
import org.hyperledger.besu.ethereum.core.BlockHeader;
2929
import org.hyperledger.besu.ethereum.core.Transaction;
30+
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
3031
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
3132
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
3233
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
@@ -41,13 +42,10 @@
4142

4243
import java.util.ArrayDeque;
4344
import java.util.ArrayList;
44-
import java.util.HashSet;
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Optional;
4848
import java.util.OptionalLong;
49-
import java.util.Set;
50-
import java.util.concurrent.atomic.AtomicBoolean;
5149
import java.util.stream.Collector;
5250
import java.util.stream.Collectors;
5351

@@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions {
6361
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
6462
private final TransactionPoolConfiguration poolConfig;
6563
private final AbstractPrioritizedTransactions prioritizedTransactions;
64+
private final EthScheduler ethScheduler;
6665

6766
public LayeredPendingTransactions(
6867
final TransactionPoolConfiguration poolConfig,
69-
final AbstractPrioritizedTransactions prioritizedTransactions) {
68+
final AbstractPrioritizedTransactions prioritizedTransactions,
69+
final EthScheduler ethScheduler) {
7070
this.poolConfig = poolConfig;
7171
this.prioritizedTransactions = prioritizedTransactions;
72+
this.ethScheduler = ethScheduler;
7273
}
7374

7475
@Override
@@ -311,79 +312,56 @@ public synchronized List<Transaction> getPriorityTransactions() {
311312
}
312313

313314
@Override
314-
// There's a small edge case here we could encounter.
315-
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
316-
// of that new type into our pool.
317-
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
318-
// block could end up with transactions of the new type.
319-
// This seems like it would be very rare but worth it to document that we don't handle that case
320-
// right now.
321-
public synchronized void selectTransactions(
322-
final PendingTransactions.TransactionSelector selector) {
315+
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
323316
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
324-
final Set<Hash> alreadyChecked = new HashSet<>();
325-
final Set<Address> skipSenders = new HashSet<>();
326-
final AtomicBoolean completed = new AtomicBoolean(false);
327-
328-
prioritizedTransactions.stream()
329-
.takeWhile(unused -> !completed.get())
330-
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
331-
.peek(this::logSenderTxs)
332-
.forEach(
333-
highPrioPendingTx ->
334-
prioritizedTransactions.stream(highPrioPendingTx.getSender())
335-
.takeWhile(
336-
candidatePendingTx ->
337-
!skipSenders.contains(candidatePendingTx.getSender())
338-
&& !completed.get())
339-
.filter(
340-
candidatePendingTx ->
341-
!alreadyChecked.contains(candidatePendingTx.getHash())
342-
&& Long.compareUnsigned(
343-
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
344-
<= 0)
345-
.forEach(
346-
candidatePendingTx -> {
347-
alreadyChecked.add(candidatePendingTx.getHash());
348-
final var res = selector.evaluateTransaction(candidatePendingTx);
349-
350-
LOG.atTrace()
351-
.setMessage("Selection result {} for transaction {}")
352-
.addArgument(res)
353-
.addArgument(candidatePendingTx::toTraceLog)
354-
.log();
355-
356-
if (res.discard()) {
357-
invalidTransactions.add(candidatePendingTx);
358-
logDiscardedTransaction(candidatePendingTx, res);
359-
}
360-
361-
if (res.stop()) {
362-
completed.set(true);
363-
}
364-
365-
if (!res.selected()) {
366-
// avoid processing other txs from this sender if this one is skipped
367-
// since the following will not be selected due to the nonce gap
368-
skipSenders.add(candidatePendingTx.getSender());
369-
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
370-
}
371-
}));
372-
373-
invalidTransactions.forEach(
374-
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
375-
}
376317

377-
private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
378-
LOG.atTrace()
379-
.setMessage("highPrioPendingTx {}, senderTxs {}")
380-
.addArgument(highPrioPendingTx::toTraceLog)
381-
.addArgument(
382-
() ->
383-
prioritizedTransactions.stream(highPrioPendingTx.getSender())
384-
.map(PendingTransaction::toTraceLog)
385-
.collect(Collectors.joining(", ")))
386-
.log();
318+
final List<SenderPendingTransactions> candidateTransactions;
319+
synchronized (this) {
320+
// since selecting transactions for block creation is a potential long operation
321+
// we want to avoid to keep the log for all the process, so we just lock to get
322+
// the candidate transactions
323+
candidateTransactions = prioritizedTransactions.getBySender();
324+
}
325+
326+
selection:
327+
for (final var senderTxs : candidateTransactions) {
328+
LOG.trace("highPrioSenderTxs {}", senderTxs);
329+
330+
for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
331+
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);
332+
333+
LOG.atTrace()
334+
.setMessage("Selection result {} for transaction {}")
335+
.addArgument(selectionResult)
336+
.addArgument(candidatePendingTx::toTraceLog)
337+
.log();
338+
339+
if (selectionResult.discard()) {
340+
invalidTransactions.add(candidatePendingTx);
341+
logDiscardedTransaction(candidatePendingTx, selectionResult);
342+
}
343+
344+
if (!selectionResult.selected()) {
345+
// avoid processing other txs from this sender if this one is skipped
346+
// since the following will not be selected due to the nonce gap
347+
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
348+
break;
349+
}
350+
351+
if (selectionResult.stop()) {
352+
break selection;
353+
}
354+
}
355+
}
356+
357+
ethScheduler.scheduleTxWorkerTask(
358+
() ->
359+
invalidTransactions.forEach(
360+
invalidTx -> {
361+
synchronized (this) {
362+
prioritizedTransactions.remove(invalidTx, INVALIDATED);
363+
}
364+
}));
387365
}
388366

389367
@Override

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.function.BiFunction;
3939
import java.util.function.Predicate;
4040
import java.util.stream.Collectors;
41-
import java.util.stream.Stream;
4241

4342
public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
4443

@@ -138,10 +137,14 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
138137
}
139138

140139
@Override
141-
public Stream<PendingTransaction> stream() {
140+
public synchronized List<SenderPendingTransactions> getBySender() {
142141
return orderByMaxFee.descendingSet().stream()
143142
.map(PendingTransaction::getSender)
144-
.flatMap(sender -> txsBySender.get(sender).values().stream());
143+
.map(
144+
sender ->
145+
new SenderPendingTransactions(
146+
sender, List.copyOf(txsBySender.get(sender).values())))
147+
.toList();
145148
}
146149

147150
@Override
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright contributors to Hyperledger Besu.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package org.hyperledger.besu.ethereum.eth.transactions.layered;
16+
17+
import org.hyperledger.besu.datatypes.Address;
18+
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
19+
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
23+
public record SenderPendingTransactions(
24+
Address sender, List<PendingTransaction> pendingTransactions) {
25+
26+
@Override
27+
public String toString() {
28+
return "Sender "
29+
+ sender
30+
+ " has "
31+
+ pendingTransactions.size()
32+
+ " pending transactions "
33+
+ pendingTransactions.stream()
34+
.map(PendingTransaction::toTraceLog)
35+
.collect(Collectors.joining(",", "[", "]"));
36+
}
37+
}

0 commit comments

Comments
 (0)