Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions langgraph/src/pregel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ export class Pregel<
: null;
let checkpoint = saved ? saved.checkpoint : emptyCheckpoint();
let checkpointConfig = saved ? saved.config : config;
const start = (saved?.metadata?.step ?? -2) + 1;
let start = (saved?.metadata?.step ?? -2) + 1;
// create channels from checkpoint
const channels = emptyChannels(this.channels, checkpoint);
// map inputs to channel updates
Expand All @@ -428,10 +428,39 @@ export class Pregel<
inputPendingWrites.push(value);
}
}

_applyWrites(checkpoint, channels, inputPendingWrites);

// TODO checkpoint inputs
if (inputPendingWrites.length) {
// discard any unfinished tasks from previous checkpoint
const discarded = _prepareNextTasks(
checkpoint,
processes,
channels,
true
);
checkpoint = discarded[0]; // eslint-disable-line prefer-destructuring
// apply input writes
_applyWrites(checkpoint, channels, inputPendingWrites);
// save input checkpoint
if (this.checkpointer) {
checkpoint = createCheckpoint(checkpoint, channels);
bg.push(
this.checkpointer.put(checkpointConfig, checkpoint, {
source: "input",
step: start,
writes: Object.fromEntries(inputPendingWrites),
})
);
checkpointConfig = {
configurable: {
...checkpointConfig.configurable,
threadTs: checkpoint.ts,
},
};
}
// increment start to 0
start += 1;
} else {
// TODO mark INTERRUPT as seen
}

// Similarly to Bulk Synchronous Parallel / Pregel model
// computation proceeds in steps, while there are channel updates
Expand Down