Skip to content

Commit a53e0c6

Browse files
authored
Update last seen cluster state in commit phase (#16215)
* Update last seen cluster state on apply commit Signed-off-by: Sooraj Sinha <[email protected]>
1 parent 6c17119 commit a53e0c6

File tree

4 files changed

+16
-5
lines changed

4 files changed

+16
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6262
- Enable coordinator search.request_stats_enabled by default ([#16290](https://github.com/opensearch-project/OpenSearch/pull/16290))
6363
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273))
6464
- Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296))
65+
- Update last seen cluster state in the commit phase ([#16215](https://github.com/opensearch-project/OpenSearch/pull/16215))
6566

6667
### Deprecated
6768

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import java.util.Set;
106106
import java.util.concurrent.atomic.AtomicBoolean;
107107
import java.util.function.BiConsumer;
108+
import java.util.function.Consumer;
108109
import java.util.function.Supplier;
109110
import java.util.stream.Collectors;
110111
import java.util.stream.Stream;
@@ -383,14 +384,19 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
383384
}
384385
}
385386

386-
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
387+
private void handleApplyCommit(
388+
ApplyCommitRequest applyCommitRequest,
389+
Consumer<ClusterState> updateLastSeen,
390+
ActionListener<Void> applyListener
391+
) {
387392
synchronized (mutex) {
388393
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
389394

390395
coordinationState.get().handleCommit(applyCommitRequest);
391396
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
392397
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState;
393398
clusterApplier.setPreCommitState(applierState);
399+
updateLastSeen.accept(coordinationState.get().getLastAcceptedState());
394400

395401
if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
396402
// cluster-manager node applies the committed state at the end of the publication process, not here.

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
4444
import org.opensearch.cluster.node.DiscoveryNode;
4545
import org.opensearch.cluster.node.DiscoveryNodes;
46+
import org.opensearch.common.TriConsumer;
4647
import org.opensearch.core.action.ActionListener;
4748
import org.opensearch.core.common.bytes.BytesReference;
4849
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
@@ -65,7 +66,6 @@
6566
import java.util.concurrent.atomic.AtomicBoolean;
6667
import java.util.concurrent.atomic.AtomicLong;
6768
import java.util.concurrent.atomic.AtomicReference;
68-
import java.util.function.BiConsumer;
6969
import java.util.function.Consumer;
7070
import java.util.function.Function;
7171

@@ -110,7 +110,7 @@ public PublicationTransportHandler(
110110
TransportService transportService,
111111
NamedWriteableRegistry namedWriteableRegistry,
112112
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
113-
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
113+
TriConsumer<ApplyCommitRequest, Consumer<ClusterState>, ActionListener<Void>> handleApplyCommit,
114114
RemoteClusterStateService remoteClusterStateService
115115
) {
116116
this.transportService = transportService;
@@ -142,7 +142,7 @@ public PublicationTransportHandler(
142142
false,
143143
false,
144144
ApplyCommitRequest::new,
145-
(request, channel, task) -> handleApplyCommit.accept(request, transportCommitCallback(channel))
145+
(request, channel, task) -> handleApplyCommit.apply(request, this::updateLastSeen, transportCommitCallback(channel))
146146
);
147147
}
148148

@@ -377,6 +377,10 @@ private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes dis
377377
return true;
378378
}
379379

380+
private void updateLastSeen(final ClusterState clusterState) {
381+
lastSeenClusterState.set(clusterState);
382+
}
383+
380384
// package private for testing
381385
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
382386
this.currentPublishRequestToSelf.set(publishRequest);

server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ private PublicationTransportHandler getPublicationTransportHandler(
466466
transportService,
467467
writableRegistry(),
468468
handlePublishRequest,
469-
(pu, l) -> {},
469+
(pu, uc, l) -> {},
470470
remoteClusterStateService
471471
);
472472
transportService.start();

0 commit comments

Comments
 (0)