Skip to content

Commit 828f6e5

Browse files
committed
cluster: starting work on sticky stream
1 parent a601fcd commit 828f6e5

File tree

3 files changed

+92
-65
lines changed

3 files changed

+92
-65
lines changed

src/packages/backend/conat/test/supercluster/supercluster.test.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import {
1515
} from "@cocalc/backend/conat/test/setup";
1616
import {
1717
superclusterLink,
18-
superclusterStream,
18+
superclusterStreams,
1919
superclusterService,
20-
trimSuperclusterStream,
20+
trimSuperclusterStreams,
2121
} from "@cocalc/conat/core/supercluster";
2222
import { isEqual } from "lodash";
2323

@@ -42,16 +42,16 @@ describe("create a supercluster enabled socketio server and test that the stream
4242
({ server, client } = await createCluster());
4343
});
4444

45-
let stream;
45+
let streams;
4646
it("get the interest stream via our client. There MUST be at least two persist subjects in there, since they were needed to even create the interest stream.", async () => {
47-
stream = await superclusterStream({
47+
streams = await superclusterStreams({
4848
client,
4949
clusterName: server.options.clusterName,
5050
});
5151
const service = superclusterService(server.options.clusterName);
5252
await wait({
5353
until: () => {
54-
const v = stream.getAll();
54+
const v = streams.interest.getAll();
5555
expect(service).toContain(server.options.clusterName);
5656
const persistUpdates = v.filter((update) =>
5757
update.subject.startsWith(service),
@@ -68,25 +68,25 @@ describe("create a supercluster enabled socketio server and test that the stream
6868
it("subscribe and see update appear in the stream; close sub and see delete appear", async () => {
6969
const sub = await client.subscribe("foo");
7070
while (true) {
71-
const v = stream.getAll().filter((x) => x.subject == "foo");
71+
const v = streams.interest.getAll().filter((x) => x.subject == "foo");
7272
if (v.length == 1) {
7373
expect(v[0]).toEqual(
7474
expect.objectContaining({ op: "add", subject: "foo" }),
7575
);
7676
break;
7777
}
78-
await once(stream, "change");
78+
await once(streams.interest, "change");
7979
}
8080
sub.close();
8181
while (true) {
82-
const v = stream.getAll().filter((x) => x.subject == "foo");
82+
const v = streams.interest.getAll().filter((x) => x.subject == "foo");
8383
if (v.length == 2) {
8484
expect(v[1]).toEqual(
8585
expect.objectContaining({ op: "delete", subject: "foo" }),
8686
);
8787
break;
8888
}
89-
await once(stream, "change");
89+
await once(streams.interest, "change");
9090
}
9191
});
9292

@@ -341,15 +341,15 @@ describe("test trimming the interest stream", () => {
341341
let server, client;
342342
it("create a supercluster server", async () => {
343343
({ server, client } = await createCluster());
344-
await wait({ until: () => server.superclusterStream != null });
344+
await wait({ until: () => server.superclusterStreams != null });
345345
});
346346

347347
let sub;
348348
it("subscribes and verifies that trimming does nothing", async () => {
349349
sub = await client.sub("389");
350-
const seqs = await trimSuperclusterStream(
351-
server.superclusterStream,
352-
server.interest,
350+
const seqs = await trimSuperclusterStreams(
351+
server.superclusterStreams,
352+
server,
353353
0,
354354
);
355355
expect(seqs).toEqual([]);
@@ -358,9 +358,9 @@ describe("test trimming the interest stream", () => {
358358
it("unsubscribes and verifies that trimming with a 5s maxAge does nothing", async () => {
359359
sub.close();
360360
await delay(100);
361-
const seqs = await trimSuperclusterStream(
362-
server.superclusterStream,
363-
server.interest,
361+
const seqs = await trimSuperclusterStreams(
362+
server.superclusterStreams,
363+
server,
364364
5000,
365365
);
366366
expect(seqs).toEqual([]);
@@ -372,17 +372,17 @@ describe("test trimming the interest stream", () => {
372372
let seqs;
373373
await wait({
374374
until: async () => {
375-
seqs = await trimSuperclusterStream(
376-
server.superclusterStream,
377-
server.interest,
375+
seqs = await trimSuperclusterStreams(
376+
server.superclusterStreams,
377+
server,
378378
0,
379379
);
380380
return seqs.length >= 2;
381381
},
382382
});
383383
expect(seqs.length).toBe(2);
384384
await delay(1);
385-
for (const update of server.superclusterStream.getAll()) {
385+
for (const update of server.superclusterStreams.interest.getAll()) {
386386
if (update.subject == "389" && update.op == "add") {
387387
throw Error("adding 389 should have been removed");
388388
}

src/packages/conat/core/server.ts

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ import ConsistentHash from "consistent-hash";
8383
import { is_array } from "@cocalc/util/misc";
8484
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
8585
import { once, until } from "@cocalc/util/async-utils";
86-
import type { DStream } from "@cocalc/conat/sync/dstream";
8786
import {
8887
superclusterLink,
8988
type SuperclusterLink,
90-
superclusterStream,
91-
trimSuperclusterStream,
89+
superclusterStreams,
90+
type ClusterStreams,
91+
trimSuperclusterStreams,
9292
createSuperclusterPersistServer,
9393
} from "./supercluster";
9494
import { type ConatSocketServer } from "@cocalc/conat/socket";
@@ -202,7 +202,7 @@ export class ConatServer {
202202
// which is the socket.io room to send the messages to.
203203
private sticky: { [pattern: string]: { [subject: string]: string } } = {};
204204

205-
private superclusterStream?: DStream<InterestUpdate>;
205+
private superclusterStreams?: ClusterStreams;
206206
private superclusterLinks: { [clusterName: string]: SuperclusterLink } = {};
207207
private superclusterPersistServer?: ConatSocketServer;
208208
private clusterName?: string;
@@ -348,8 +348,12 @@ export class ConatServer {
348348
}
349349
this.state = "closed";
350350

351-
this.superclusterStream?.close();
352-
delete this.superclusterStream;
351+
if (this.superclusterStreams != null) {
352+
for (const name in this.superclusterStreams) {
353+
this.superclusterStreams[name].close();
354+
}
355+
delete this.superclusterStreams;
356+
}
353357
for (const name in this.superclusterLinks) {
354358
this.superclusterLinks[name].close();
355359
delete this.superclusterLinks[name];
@@ -475,8 +479,8 @@ export class ConatServer {
475479
private queuedSuperclusterUpdates: InterestUpdate[] = [];
476480
private updateSuperclusterStream = (update: InterestUpdate) => {
477481
if (!this.clusterName) return;
478-
if (this.superclusterStream !== undefined) {
479-
this.superclusterStream.publish(update);
482+
if (this.superclusterStreams !== undefined) {
483+
this.superclusterStreams.interest.publish(update);
480484
this.trimSuperclusterStream();
481485
} else {
482486
this.queuedSuperclusterUpdates.push(update);
@@ -490,12 +494,13 @@ export class ConatServer {
490494
private trimSuperclusterStream = throttle(
491495
async () => {
492496
if (
493-
this.superclusterStream !== undefined &&
494-
this.interest !== undefined
497+
this.superclusterStreams !== undefined &&
498+
this.interest !== undefined &&
499+
this.sticky !== undefined
495500
) {
496-
await trimSuperclusterStream(
497-
this.superclusterStream,
498-
this.interest,
501+
await trimSuperclusterStreams(
502+
this.superclusterStreams,
503+
{ interest: this.interest, sticky: this.sticky },
499504
5 * 60000,
500505
);
501506
}
@@ -942,20 +947,15 @@ export class ConatServer {
942947
client,
943948
clusterName: this.clusterName,
944949
});
945-
this.log("creating interest stream");
946-
const stream = await superclusterStream({
950+
this.log("creating cluster streams");
951+
this.superclusterStreams = await superclusterStreams({
947952
client,
948953
clusterName: this.clusterName,
949954
});
950-
this.log("initializing interest stream");
951-
if (stream.length > 0) {
952-
await stream.delete({ all: true });
953-
}
954-
this.superclusterStream = stream;
955955
// add in everything so far in interest (TODO)
956956
if (this.queuedSuperclusterUpdates.length > 0) {
957957
for (const update0 of this.queuedSuperclusterUpdates) {
958-
this.superclusterStream.publish(update0);
958+
this.superclusterStreams.interest.publish(update0);
959959
}
960960
this.queuedSuperclusterUpdates.length = 0;
961961
}

src/packages/conat/core/supercluster.ts

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
randomChoice,
1010
updateInterest,
1111
type InterestUpdate,
12+
type StickyUpdate,
1213
} from "@cocalc/conat/core/server";
1314
import type { DStream } from "@cocalc/conat/sync/dstream";
1415
import { once } from "@cocalc/util/async-utils";
@@ -32,7 +33,7 @@ export async function superclusterLink({
3233
export class SuperclusterLink {
3334
private interest: Patterns<{ [queue: string]: Set<string> }> = new Patterns();
3435
private sticky: { [pattern: string]: { [subject: string]: string } } = {};
35-
private stream: DStream<InterestUpdate>;
36+
private streams: ClusterStreams;
3637
private state: "init" | "ready" | "closed" = "init";
3738

3839
constructor(
@@ -48,25 +49,33 @@ export class SuperclusterLink {
4849
}
4950

5051
init = async () => {
51-
this.stream = await superclusterStream({
52+
this.streams = await superclusterStreams({
5253
client: this.client,
5354
clusterName: this.clusterName,
5455
});
55-
for (const update of this.stream.getAll()) {
56+
for (const update of this.streams.interest.getAll()) {
5657
updateInterest(update, this.interest, this.sticky);
5758
}
58-
this.stream.on("change", this.handleUpdate);
59+
this.streams.interest.on("change", this.handleInterestUpdate);
5960
this.state = "ready";
6061
};
6162

62-
handleUpdate = (update) => {
63+
handleInterestUpdate = (update) => {
6364
updateInterest(update, this.interest, this.sticky);
6465
};
6566

6667
close = () => {
68+
if (this.state == "closed") {
69+
return;
70+
}
6771
this.state = "closed";
68-
this.stream?.removeListener("change", this.handleUpdate);
69-
this.stream?.close();
72+
if (this.streams != null) {
73+
this.streams.interest.removeListener("change", this.handleInterestUpdate);
74+
this.streams.interest.close();
75+
this.streams.sticky.close();
76+
// @ts-ignore
77+
delete this.streams;
78+
}
7079
};
7180

7281
publish = ({ subject, data }) => {
@@ -164,56 +173,74 @@ export async function createSuperclusterPersistServer({
164173
return await createPersistServer({ client, service });
165174
}
166175

167-
export async function superclusterStream({
176+
export interface ClusterStreams {
177+
interest: DStream<InterestUpdate>;
178+
sticky: DStream<StickyUpdate>;
179+
}
180+
181+
export async function superclusterStreams({
168182
client,
169183
clusterName,
170184
}: {
171185
client: Client;
172186
clusterName: string;
173-
}): Promise<DStream<InterestUpdate>> {
187+
}): Promise<ClusterStreams> {
174188
logger.debug("superclusterStream: ", { clusterName });
175189
if (!clusterName) {
176190
throw Error("clusterName must be set");
177191
}
178-
const stream = await client.sync.dstream<InterestUpdate>({
179-
name: superclusterStreamNames(clusterName).interest,
192+
const names = superclusterStreamNames(clusterName);
193+
const opts = {
180194
service: superclusterService(clusterName),
181195
noCache: true,
196+
ephemeral: true,
197+
};
198+
const interest = await client.sync.dstream<InterestUpdate>({
199+
name: names.interest,
200+
...opts,
201+
});
202+
const sticky = await client.sync.dstream<StickyUpdate>({
203+
name: names.sticky,
204+
...opts,
182205
});
183-
logger.debug("superclusterStream: GOT IT", { clusterName });
184-
return stream;
206+
logger.debug("superclusterStreams: got them", { clusterName });
207+
return { interest, sticky };
185208
}
186209

187210
// Periodically delete not-necessary updates from the interest stream
188-
export async function trimSuperclusterStream(
189-
stream: DStream<InterestUpdate>,
190-
interest: Patterns<{ [queue: string]: Set<string> }>,
211+
export async function trimSuperclusterStreams(
212+
streams: ClusterStreams,
213+
data: {
214+
interest: Patterns<{ [queue: string]: Set<string> }>;
215+
sticky: { [pattern: string]: { [subject: string]: string } };
216+
},
191217
// don't delete anything that isn't at lest minAge ms old.
192218
minAge: number,
193219
): Promise<number[]> {
194-
// we simply iterate over the stream checking for subjects
220+
const { interest } = streams;
221+
// we iterate over the interest stream checking for subjects
195222
// with no current interest at all; in such cases it is safe
196223
// to purge them entirely from the stream.
197224
const seqs: number[] = [];
198225
const now = Date.now();
199-
for (let n = 0; n < stream.length; n++) {
200-
const time = stream.time(n);
226+
for (let n = 0; n < interest.length; n++) {
227+
const time = interest.time(n);
201228
if (time == null || now - time.valueOf() <= minAge) {
202229
break;
203230
}
204-
const update = stream.get(n) as InterestUpdate;
205-
if (!interest.hasPattern(update.subject)) {
206-
const seq = stream.seq(n);
231+
const update = interest.get(n) as InterestUpdate;
232+
if (!data.interest.hasPattern(update.subject)) {
233+
const seq = interest.seq(n);
207234
if (seq != null) {
208235
seqs.push(seq);
209236
}
210237
}
211238
}
212239
if (seqs.length > 0) {
213-
// [ ] todo -- add to stream.delete a version where it takes an array of sequence numbers
240+
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
214241
logger.debug("trimSuperclusterStream: trimming", { seqs });
215242
for (const seq of seqs) {
216-
await stream.delete({ seq });
243+
await interest.delete({ seq });
217244
}
218245
}
219246
return seqs;

0 commit comments

Comments
 (0)