Skip to content

Commit f1dbacc

Browse files
committed
make waitForInterest cancellable and use this when waiting for interest across a supercluster
1 parent 43a1733 commit f1dbacc

File tree

3 files changed

+51
-10
lines changed

3 files changed

+51
-10
lines changed

src/packages/backend/conat/test/supercluster/supercluster.test.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
after,
1111
initConatServer,
1212
once,
13-
//delay,
13+
delay,
1414
wait,
1515
} from "@cocalc/backend/conat/test/setup";
1616
import { server as createPersistServer } from "@cocalc/backend/conat/persist";
@@ -176,7 +176,7 @@ describe("create a supercluster enabled socketio server and test that the stream
176176
});
177177
});
178178

179-
describe("create a supercluster with two distinct servers and send a message from one client to another via a link, and also use request/reply", () => {
179+
describe.only("create a supercluster with two distinct servers and send a message from one client to another via a link, and also use request/reply", () => {
180180
let server1, server2, client1, client2;
181181
it("create two distinct servers with supercluster support enabled", async () => {
182182
({ server: server1, client: client1 } = await createCluster({
@@ -198,6 +198,19 @@ describe("create a supercluster with two distinct servers and send a message fro
198198
});
199199
});
200200

201+
it("tests that server-side waitForInterest can be aborted", async () => {
202+
const controller = new AbortController();
203+
const w = server2.waitForInterest(
204+
"no-interest",
205+
90000,
206+
client2.conn.id,
207+
controller.signal,
208+
);
209+
await delay(15);
210+
controller.abort();
211+
expect(await w).toBe(false);
212+
});
213+
201214
const N =
202215
"114381625757888867669235779976146612010218296721242362562561842935706935245733897830597123563958705058989075147599290026879543541";
203216

src/packages/conat/core/server.ts

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -971,13 +971,15 @@ export class ConatServer {
971971
subject: string,
972972
timeout: number,
973973
socketId: string,
974+
signal?: AbortSignal,
974975
): Promise<boolean> => {
975976
const links = Object.values(this.superclusterLinks);
976977
if (links.length == 0) {
977978
return await this.waitForInterestInThisCluster(
978979
subject,
979980
timeout,
980981
socketId,
982+
signal,
981983
);
982984
} else {
983985
const v: any[] = [];
@@ -993,15 +995,23 @@ export class ConatServer {
993995
}
994996
return false;
995997
};
998+
const controller = new AbortController();
999+
const signal2 = controller.signal;
9961000
v.push(
9971001
nothrow(
998-
this.waitForInterestInThisCluster(subject, timeout, socketId),
1002+
this.waitForInterestInThisCluster(
1003+
subject,
1004+
timeout,
1005+
socketId,
1006+
signal2,
1007+
),
9991008
),
10001009
);
10011010
for (const link of links) {
1002-
v.push(nothrow(link.waitForInterest(subject, timeout)));
1011+
v.push(nothrow(link.waitForInterest(subject, timeout, signal2)));
10031012
}
10041013
if (!timeout) {
1014+
// with timeout=0 they all immediately answer (so no need to worry about abort/pormise)
10051015
const w = await Promise.all(v);
10061016
for (const x of w) {
10071017
if (x) {
@@ -1010,8 +1020,13 @@ export class ConatServer {
10101020
}
10111021
return false;
10121022
}
1023+
1024+
signal?.addEventListener("abort", () => {
1025+
controller.abort();
1026+
});
10131027
const w = await Promise.race(v);
1014-
// [ ] todo: cancel all the others.
1028+
// cancel all the others.
1029+
controller.abort();
10151030
return w;
10161031
} finally {
10171032
done = true;
@@ -1023,6 +1038,7 @@ export class ConatServer {
10231038
subject: string,
10241039
timeout: number,
10251040
socketId: string,
1041+
signal?: AbortSignal,
10261042
) => {
10271043
const matches = this.interest.matches(subject);
10281044
if (matches.length > 0 || !timeout) {
@@ -1035,12 +1051,20 @@ export class ConatServer {
10351051
timeout = MAX_INTEREST_TIMEOUT;
10361052
}
10371053
const start = Date.now();
1038-
while (this.state != "closed" && this.sockets[socketId]) {
1054+
while (
1055+
this.state != "closed" &&
1056+
this.sockets[socketId] &&
1057+
!signal?.aborted
1058+
) {
10391059
if (Date.now() - start >= timeout) {
10401060
throw Error("timeout");
10411061
}
10421062
await once(this.interest, "change");
1043-
if ((this.state as any) == "closed" || !this.sockets[socketId]) {
1063+
if (
1064+
(this.state as any) == "closed" ||
1065+
!this.sockets[socketId] ||
1066+
signal?.aborted
1067+
) {
10441068
return false;
10451069
}
10461070
const matches = this.interest.matches(subject);

src/packages/conat/core/supercluster.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ export class SuperclusterLink {
112112
return randomChoice(targets);
113113
};
114114

115-
waitForInterest = async (subject: string, timeout: number) => {
115+
waitForInterest = async (
116+
subject: string,
117+
timeout: number,
118+
signal?: AbortSignal,
119+
) => {
116120
const matches = this.interest.matches(subject);
117121

118122
if (matches.length > 0 || !timeout) {
@@ -122,12 +126,12 @@ export class SuperclusterLink {
122126
return matches.length > 0;
123127
}
124128
const start = Date.now();
125-
while (this.state != "closed") {
129+
while (this.state != "closed" && !signal?.aborted) {
126130
if (Date.now() - start >= timeout) {
127131
throw Error("timeout");
128132
}
129133
await once(this.interest, "change");
130-
if ((this.state as any) == "closed") {
134+
if ((this.state as any) == "closed" || signal?.aborted) {
131135
return false;
132136
}
133137
// todo: implement this.interest.hasMatch that just checks if there is at least one match

0 commit comments

Comments
 (0)