|
30 | 30 | import java.util.Collections; |
31 | 31 | import java.util.List; |
32 | 32 | import java.util.Optional; |
| 33 | +import java.util.concurrent.locks.ReentrantLock; |
33 | 34 | import java.util.function.Supplier; |
34 | 35 |
|
35 | 36 | import com.google.common.base.Suppliers; |
@@ -70,6 +71,8 @@ public class NodeRecordManager { |
70 | 71 | private final Supplier<List<Bytes>> forkIdSupplier; |
71 | 72 | private final NatService natService; |
72 | 73 |
|
| 74 | + private final ReentrantLock lock = new ReentrantLock(); |
| 75 | + |
73 | 76 | private Optional<DiscoveryPeerV4> localNode = Optional.empty(); |
74 | 77 | private HostEndpoint primaryEndpoint; |
75 | 78 | private Optional<HostEndpoint> ipv6Endpoint = Optional.empty(); |
@@ -166,50 +169,122 @@ public void initializeLocalNode(final HostEndpoint primary, final Optional<HostE |
166 | 169 | * |
167 | 170 | * @throws IllegalStateException if the local node has not been initialized |
168 | 171 | */ |
169 | | - public void updateNodeRecord() { |
170 | | - final NodeRecordFactory factory = NodeRecordFactory.DEFAULT; |
171 | | - |
172 | | - final Optional<NodeRecord> existingRecord = |
173 | | - variablesStorage.getLocalEnrSeqno().map(factory::fromBytes); |
174 | | - |
175 | | - final Bytes ipAddressBytes = |
176 | | - Bytes.of(InetAddresses.forString(primaryEndpoint.host()).getAddress()); |
177 | | - |
178 | | - final int discoveryPort = primaryEndpoint.discoveryPort(); |
179 | | - final int listeningPort = primaryEndpoint.tcpPort(); |
180 | | - final List<Bytes> forkId = forkIdSupplier.get(); |
| 172 | + /** |
| 173 | + * Updates the stored discovery port after the OS assigns an actual port for an ephemeral (port 0) |
| 174 | + * bind, then persists the updated ENR to disk. |
| 175 | + * |
| 176 | + * <p>Called from the {@code localNodeRecordListener} in {@code PeerDiscoveryAgentV5} when the |
| 177 | + * discovery library resolves a bound port via {@code onBoundPortResolved}. At that point the |
| 178 | + * library's {@code LocalNodeRecordStore} already holds the updated record; this method ensures |
| 179 | + * Besu's own {@link NodeRecordManager} state and the on-disk ENR are kept consistent. |
| 180 | + * |
| 181 | + * <p>This is a no-op if the currently stored discovery port is already non-zero. |
| 182 | + * |
| 183 | + * @param resolvedPort the actual OS-assigned port |
| 184 | + * @param isIpv6 {@code true} to update the IPv6 endpoint, {@code false} for IPv4 |
| 185 | + */ |
| 186 | + public void onDiscoveryPortResolved(final int resolvedPort, final boolean isIpv6) { |
| 187 | + lock.lock(); |
| 188 | + try { |
| 189 | + if (isIpv6) { |
| 190 | + if (ipv6Endpoint.map(ep -> ep.discoveryPort() != 0).orElse(false)) { |
| 191 | + return; |
| 192 | + } |
| 193 | + ipv6Endpoint = |
| 194 | + ipv6Endpoint.map(ep -> new HostEndpoint(ep.host(), resolvedPort, ep.tcpPort())); |
| 195 | + } else { |
| 196 | + if (primaryEndpoint.discoveryPort() != 0) { |
| 197 | + return; |
| 198 | + } |
| 199 | + primaryEndpoint = |
| 200 | + new HostEndpoint(primaryEndpoint.host(), resolvedPort, primaryEndpoint.tcpPort()); |
| 201 | + } |
| 202 | + updateNodeRecord(); |
| 203 | + } finally { |
| 204 | + lock.unlock(); |
| 205 | + } |
| 206 | + } |
181 | 207 |
|
182 | | - final boolean primaryIsIpv4 = ipAddressBytes.size() == 4; |
| 208 | + /** |
| 209 | + * Updates the stored discovery endpoints with the actual OS-assigned ports after an ephemeral |
| 210 | + * (port 0) bind. |
| 211 | + * |
| 212 | + * <p>Each argument is only applied when present and only if the currently stored port is 0. The |
| 213 | + * caller is responsible for invoking {@link #updateNodeRecord()} afterwards to persist the |
| 214 | + * change. |
| 215 | + * |
| 216 | + * @param resolvedIpv4Port the actual OS-assigned IPv4 UDP port, or empty if unchanged |
| 217 | + * @param resolvedIpv6Port the actual OS-assigned IPv6 UDP port, or empty if unchanged |
| 218 | + */ |
| 219 | + public void onDiscoveryPortResolved( |
| 220 | + final Optional<Integer> resolvedIpv4Port, final Optional<Integer> resolvedIpv6Port) { |
| 221 | + lock.lock(); |
| 222 | + try { |
| 223 | + if (resolvedIpv4Port.isPresent() && primaryEndpoint.discoveryPort() == 0) { |
| 224 | + primaryEndpoint = |
| 225 | + new HostEndpoint( |
| 226 | + primaryEndpoint.host(), resolvedIpv4Port.get(), primaryEndpoint.tcpPort()); |
| 227 | + } |
| 228 | + if (resolvedIpv6Port.isPresent() |
| 229 | + && ipv6Endpoint.map(ep -> ep.discoveryPort() == 0).orElse(false)) { |
| 230 | + ipv6Endpoint = |
| 231 | + ipv6Endpoint.map( |
| 232 | + ep -> new HostEndpoint(ep.host(), resolvedIpv6Port.get(), ep.tcpPort())); |
| 233 | + } |
| 234 | + } finally { |
| 235 | + lock.unlock(); |
| 236 | + } |
| 237 | + } |
183 | 238 |
|
184 | | - final Optional<Bytes> ipv6AddressBytes = |
185 | | - ipv6Endpoint.map(ep -> Bytes.of(InetAddresses.forString(ep.host()).getAddress())); |
186 | | - |
187 | | - // Reuse the existing ENR if all relevant fields are unchanged. |
188 | | - final NodeRecord nodeRecord = |
189 | | - existingRecord |
190 | | - .filter( |
191 | | - record -> |
192 | | - nodeId.equals(record.get(EnrField.PKEY_SECP256K1)) |
193 | | - && (primaryIsIpv4 |
194 | | - ? primaryIpv4AddressMatches( |
195 | | - record, ipAddressBytes, discoveryPort, listeningPort) |
196 | | - : primaryIpv6AddressMatches( |
197 | | - record, ipAddressBytes, discoveryPort, listeningPort)) |
198 | | - && forkId.equals(record.get(FORK_ID_ENR_FIELD)) |
199 | | - && (!primaryIsIpv4 || ipv6FieldsMatch(record, ipv6AddressBytes))) |
200 | | - // Otherwise, create a new ENR with an incremented sequence number, |
201 | | - // sign it with the local node key, and persist it to disk. |
202 | | - .orElseGet( |
203 | | - () -> |
204 | | - createAndPersistNodeRecord( |
205 | | - factory, |
206 | | - existingRecord, |
207 | | - ipAddressBytes, |
208 | | - discoveryPort, |
209 | | - listeningPort, |
210 | | - forkId)); |
211 | | - |
212 | | - localNode.get().setNodeRecord(nodeRecord); |
| 239 | + public void updateNodeRecord() { |
| 240 | + lock.lock(); |
| 241 | + try { |
| 242 | + final NodeRecordFactory factory = NodeRecordFactory.DEFAULT; |
| 243 | + |
| 244 | + final Optional<NodeRecord> existingRecord = |
| 245 | + variablesStorage.getLocalEnrSeqno().map(factory::fromBytes); |
| 246 | + |
| 247 | + final Bytes ipAddressBytes = |
| 248 | + Bytes.of(InetAddresses.forString(primaryEndpoint.host()).getAddress()); |
| 249 | + |
| 250 | + final int discoveryPort = primaryEndpoint.discoveryPort(); |
| 251 | + final int listeningPort = primaryEndpoint.tcpPort(); |
| 252 | + final List<Bytes> forkId = forkIdSupplier.get(); |
| 253 | + |
| 254 | + final boolean primaryIsIpv4 = ipAddressBytes.size() == 4; |
| 255 | + |
| 256 | + final Optional<Bytes> ipv6AddressBytes = |
| 257 | + ipv6Endpoint.map(ep -> Bytes.of(InetAddresses.forString(ep.host()).getAddress())); |
| 258 | + |
| 259 | + // Reuse the existing ENR if all relevant fields are unchanged. |
| 260 | + final NodeRecord nodeRecord = |
| 261 | + existingRecord |
| 262 | + .filter( |
| 263 | + record -> |
| 264 | + nodeId.equals(record.get(EnrField.PKEY_SECP256K1)) |
| 265 | + && (primaryIsIpv4 |
| 266 | + ? primaryIpv4AddressMatches( |
| 267 | + record, ipAddressBytes, discoveryPort, listeningPort) |
| 268 | + : primaryIpv6AddressMatches( |
| 269 | + record, ipAddressBytes, discoveryPort, listeningPort)) |
| 270 | + && forkId.equals(record.get(FORK_ID_ENR_FIELD)) |
| 271 | + && (!primaryIsIpv4 || ipv6FieldsMatch(record, ipv6AddressBytes))) |
| 272 | + // Otherwise, create a new ENR with an incremented sequence number, |
| 273 | + // sign it with the local node key, and persist it to disk. |
| 274 | + .orElseGet( |
| 275 | + () -> |
| 276 | + createAndPersistNodeRecord( |
| 277 | + factory, |
| 278 | + existingRecord, |
| 279 | + ipAddressBytes, |
| 280 | + discoveryPort, |
| 281 | + listeningPort, |
| 282 | + forkId)); |
| 283 | + |
| 284 | + localNode.get().setNodeRecord(nodeRecord); |
| 285 | + } finally { |
| 286 | + lock.unlock(); |
| 287 | + } |
213 | 288 | } |
214 | 289 |
|
215 | 290 | private boolean primaryIpv4AddressMatches( |
|
0 commit comments