Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion langgraph/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"author": "LangChain",
"license": "MIT",
"dependencies": {
"@langchain/core": "^0.1.51"
"@langchain/core": "^0.1.51",
"better-sqlite3": "^9.5.0"
},
"devDependencies": {
"@jest/globals": "^29.5.0",
Expand All @@ -47,6 +48,7 @@
"@swc/core": "^1.3.90",
"@swc/jest": "^0.2.29",
"@tsconfig/recommended": "^1.0.3",
"@types/better-sqlite3": "^7.6.9",
"@typescript-eslint/eslint-plugin": "^6.12.0",
"@typescript-eslint/parser": "^6.12.0",
"dotenv": "^16.3.1",
Expand Down
20 changes: 1 addition & 19 deletions langgraph/src/channels/base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Checkpoint } from "../checkpoint/index.js";
import { Checkpoint, deepCopy } from "../checkpoint/index.js";

export abstract class BaseChannel<
ValueType = unknown,
Expand Down Expand Up @@ -63,24 +63,6 @@ export class InvalidUpdateError extends Error {
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function deepCopy(obj: any): any {
if (typeof obj !== "object" || obj === null) {
return obj;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const newObj: any = Array.isArray(obj) ? [] : {};

for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
newObj[key] = deepCopy(obj[key]);
}
}

return newObj;
}

export function emptyChannels(
channels: Record<string, BaseChannel>,
checkpoint: Checkpoint
Expand Down
81 changes: 76 additions & 5 deletions langgraph/src/checkpoint/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ export interface Checkpoint {
versionsSeen: Record<string, Record<string, number>>;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function deepCopy(obj: any): any {
if (typeof obj !== "object" || obj === null) {
return obj;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const newObj: any = Array.isArray(obj) ? [] : {};

for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
newObj[key] = deepCopy(obj[key]);
}
}

return newObj;
}

export function emptyCheckpoint(): Checkpoint {
return {
v: 1,
Expand All @@ -56,7 +74,7 @@ export function copyCheckpoint(checkpoint: Checkpoint): Checkpoint {
ts: checkpoint.ts,
channelValues: { ...checkpoint.channelValues },
channelVersions: { ...checkpoint.channelVersions },
versionsSeen: { ...checkpoint.versionsSeen },
versionsSeen: deepCopy(checkpoint.versionsSeen),
};
}

Expand All @@ -65,14 +83,67 @@ export const enum CheckpointAt {
END_OF_RUN = "end_of_run",
}

export interface CheckpointTuple {
config: RunnableConfig;
checkpoint: Checkpoint;
parentConfig?: RunnableConfig;
}

const CheckpointThreadId: ConfigurableFieldSpec = {
id: "threadId",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nuno Campos (@nfcampos), general question: The JS implementation serializes threadId and threadTs in camelCase (as opposed to snake_case in Python). Is there any value in making them both one way or the other?

Hypothetical scenario: Python graph checkpoints state, pauses execution, and JS graph (or some other runtime) resumes execution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep would be nice to use same keys

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the changes in this PR.

annotation: typeof "",
name: "Thread ID",
description: null,
default: "",
isShared: true,
dependencies: null,
};

const CheckpointThreadTs: ConfigurableFieldSpec = {
id: "threadTs",
annotation: typeof "",
name: "Thread Timestamp",
description:
"Pass to fetch a past checkpoint. If None, fetches the latest checkpoint.",
default: null,
isShared: true,
dependencies: null,
};

export interface SerializerProtocol {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
dumps(obj: any): any;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the right input/output types are for dumps() and loads().

Copy link
Copy Markdown
Contributor

@nfcampos Nuno Campos (nfcampos) Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in JS the most common would be unknown -> string -> unknown

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you like this approach better: 348572c (commit)

I implemented generics on the BaseCheckpointSaver class and the SerializerProtocol interface. This ensures that a type incompatible serializer cannot be passed to the checkpoint saver constructor. For example, the SqliteSaver requires the checkpoint to be serialized to a string. A serializer that doesn't implement this correctly will raise an error. It's also nice to help developers implement dumps() and loads() with the correct types.

That being said, I'm not sure how much value this provides.

// eslint-disable-next-line @typescript-eslint/no-explicit-any
loads(data: any): any;
}

export abstract class BaseCheckpointSaver {
at: CheckpointAt = CheckpointAt.END_OF_RUN;
at: CheckpointAt = CheckpointAt.END_OF_STEP;

serde: SerializerProtocol;

constructor(serde?: SerializerProtocol, at?: CheckpointAt) {
this.serde = serde || this.serde;
this.at = at || this.at;
}

get configSpecs(): Array<ConfigurableFieldSpec> {
return [];
return [CheckpointThreadId, CheckpointThreadTs];
}

abstract get(config: RunnableConfig): Checkpoint | undefined;
async get(config: RunnableConfig): Promise<Checkpoint | undefined> {
const value = await this.getTuple(config);
return value ? value.checkpoint : undefined;
}

abstract getTuple(
config: RunnableConfig
): Promise<CheckpointTuple | undefined>;

abstract list(config: RunnableConfig): AsyncGenerator<CheckpointTuple>;

abstract put(config: RunnableConfig, checkpoint: Checkpoint): void;
abstract put(
config: RunnableConfig,
checkpoint: Checkpoint
): Promise<RunnableConfig>;
}
1 change: 1 addition & 0 deletions langgraph/src/checkpoint/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export {
type ConfigurableFieldSpec,
type Checkpoint,
type CheckpointAt,
deepCopy,
emptyCheckpoint,
BaseCheckpointSaver,
} from "./base.js";
108 changes: 87 additions & 21 deletions langgraph/src/checkpoint/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,96 @@ import {
BaseCheckpointSaver,
Checkpoint,
CheckpointAt,
ConfigurableFieldSpec,
CheckpointTuple,
copyCheckpoint,
SerializerProtocol,
} from "./base.js";

export class NoopSerializer implements SerializerProtocol {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
dumps(obj: any): any {
return obj;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
loads(data: any): any {
return data;
}
}

export class MemorySaver extends BaseCheckpointSaver {
storage: Record<string, Checkpoint> = {};

get configSpecs(): ConfigurableFieldSpec[] {
return [
{
id: "threadId",
name: "Thread ID",
annotation: null,
description: null,
default: null,
isShared: true,
dependencies: null,
},
];
serde = new NoopSerializer();

storage: Record<string, Record<string, Checkpoint>>;

constructor(serde?: SerializerProtocol, at?: CheckpointAt) {
super(serde, at);
this.storage = {};
}

async getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined> {
const threadId = config.configurable?.threadId;
const threadTs = config.configurable?.threadTs;
const checkpoints = this.storage[threadId];

if (threadTs) {
const checkpoint = checkpoints[threadTs];
if (checkpoint) {
return {
config,
checkpoint: this.serde.loads(checkpoint),
};
}
} else {
if (checkpoints) {
const maxThreadTs = Object.keys(checkpoints).sort((a, b) =>
b.localeCompare(a)
)[0];
return {
config: { configurable: { threadId, threadTs: maxThreadTs } },
checkpoint: this.serde.loads(checkpoints[maxThreadTs.toString()]),
};
}
}

return undefined;
}

get(config: RunnableConfig): Checkpoint | undefined {
return this.storage[config.configurable?.threadId];
async *list(config: RunnableConfig): AsyncGenerator<CheckpointTuple> {
const threadId = config.configurable?.threadId;
const checkpoints = this.storage[threadId] ?? {};

// sort in desc order
for (const [threadTs, checkpoint] of Object.entries(checkpoints).sort(
(a, b) => b[0].localeCompare(a[0])
)) {
yield {
config: { configurable: { threadId, threadTs } },
checkpoint: this.serde.loads(checkpoint),
};
}
}

put(config: RunnableConfig, checkpoint: Checkpoint): void {
this.storage[config.configurable?.threadId] = checkpoint;
async put(
config: RunnableConfig,
checkpoint: Checkpoint
): Promise<RunnableConfig> {
const threadId = config.configurable?.threadId;

if (this.storage[threadId]) {
this.storage[threadId][checkpoint.ts] = this.serde.dumps(checkpoint);
} else {
this.storage[threadId] = {
[checkpoint.ts]: this.serde.dumps(checkpoint),
};
}

return {
configurable: {
threadId,
threadTs: checkpoint.ts,
},
};
}
}

Expand All @@ -42,13 +105,16 @@ export class MemorySaverAssertImmutable extends MemorySaver {
this.at = CheckpointAt.END_OF_STEP;
}

put(config: RunnableConfig, checkpoint: Checkpoint): void {
async put(
config: RunnableConfig,
checkpoint: Checkpoint
): Promise<RunnableConfig> {
const threadId = config.configurable?.threadId;
if (!this.storageForCopies[threadId]) {
this.storageForCopies[threadId] = {};
}
// assert checkpoint hasn't been modified since last written
const saved = super.get(config);
const saved = await super.get(config);
if (saved) {
const savedTs = saved.ts;
if (this.storageForCopies[threadId][savedTs]) {
Expand Down
Loading