Update static functions in pregel/index.ts to match corresponding Python implementation#125
Conversation
…unit test for _prepareNextTasks().
…aneously. These changes need to be made together.
| return newChannels; | ||
| } | ||
|
|
||
| export async function createCheckpoint<ValueType>( |
There was a problem hiding this comment.
This function needs to be non-async in order for _localRead() to be patched in RunnableConfig and eventually called so it doesn't return a Promise.
Need to double check if this is ok.
There was a problem hiding this comment.
yep I think this doesn't have to be async
|
|
||
| // Reassign nextCheckpoint to checkpoint because the subsequent implementation | ||
| // relies on side effects applied to checkpoint. Example: _applyWrites(). | ||
| checkpoint = nextCheckpoint as Checkpoint; |
There was a problem hiding this comment.
This is temporary. When stream() is reimplemented, I think this code will be removed.
| // A copy of the checkpoint is created because `checkpoint` is defined with `let`. | ||
| // `checkpoint` can be mutated during loop execution and when used in a function, | ||
| // may cause unintended consequences. | ||
| const checkpointCopy = copyCheckpoint(checkpoint); |
There was a problem hiding this comment.
This might be temporary. When stream() is reimplemented, this code might be removed.
| yield stepOutput; | ||
|
|
||
| if (typeof outputKeys !== "string") { | ||
| _applyWritesFromView(checkpoint, channels, stepOutput); |
There was a problem hiding this comment.
Removing this had no effect (i.e. all tests passed). Need to double check if I need to migrate it.
There was a problem hiding this comment.
ah this is an api that used to exist in py but we removed, so good to remove here too
| const newCheckpoint = createCheckpoint(checkpoint, channels); | ||
|
|
||
| // create a new copy of channels | ||
| const newChannels = Object.entries(channels).reduce( |
There was a problem hiding this comment.
Double check: This implementation is ported from a Python implementation that uses a context manager. What's the equivalent/preferred implementation in JS/TS?
There was a problem hiding this comment.
this is fine
| triggers: this.triggers, | ||
| mapper: this.mapper, | ||
| writers: [...this.writers, coerceable as ChannelWrite], | ||
| bound: this.bound.pipe(coerceable), |
There was a problem hiding this comment.
This was a regression from a previous PR. It was only possible to do this once _prepareNextTasks() was updated.
| @@ -1,4 +1,8 @@ | |||
| import { Runnable, RunnableConfig } from "@langchain/core/runnables"; | |||
| import { | |||
| mergeConfigs, | |||
There was a problem hiding this comment.
mergeConfigs() was exported in @langchain/core 0.1.61.
langgraph/src/utils.ts
Outdated
| @@ -48,9 +52,13 @@ export class RunnableCallable extends Runnable { | |||
|
|
|||
| // TODO: mergeConfigs() from @langchain/core is not exported | |||
There was a problem hiding this comment.
remove comment
| "author": "LangChain", | ||
| "license": "MIT", | ||
| "resolutions": { | ||
| "@langchain/core": "0.1.51" |
There was a problem hiding this comment.
why do we need this in both package json files?
There was a problem hiding this comment.
Not 100% sure, but my assumption is because the @langchain/core dependencies are also needed in the examples directory, which is at the same level as langgraph.
Summary
This PR updates static functions in
pregel/index.tsto the match the corresponding Python implementation. These functions are used in the corestream()andinvoke()methods of thePregelclass.Implementation
_shouldInterrupt()(currently unused)._applyWrites()to match Python implementation._applyWritesFromView(). Removing the function did not break any tests. Not sure if this needs to be migrated._prepareNextTasks()to match Python implementation. Note: The side effect from this function is removed and the calling code is updated to ensure that all existing tests pass._localRead()(replaced old implementation ofread).ChannelReadandChannelWriteto extend fromRunnableCallable(this was a regression).@langchain/coreand updateRunnableCallable.invoke()to callmergeConfigs()(this was a regression).streamChannelsListgetter.Next Steps
_default(),stream(), andinvoke()inPregelclass.