AsyncMongoDBSaver#
- class langgraph.checkpoint.mongodb.aio.AsyncMongoDBSaver(client: AsyncMongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints_aio', writes_collection_name: str = 'checkpoint_writes_aio', ttl: int | None = None, **kwargs: Any)[source]#
A checkpoint saver that stores checkpoints in a MongoDB database asynchronously.
The synchronous MongoDBSaver has extended documentation, but Asynchronous usage is shown below.
Examples
>>> import asyncio >>> from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSaver >>> from langgraph.graph import StateGraph
>>> async def main() -> None: >>> builder = StateGraph(int) >>> builder.add_node("add_one", lambda x: x + 1) >>> builder.set_entry_point("add_one") >>> builder.set_finish_point("add_one") >>> async with AsyncMongoDBSaver.from_conn_string("mongodb://localhost:27017") as memory: >>> graph = builder.compile(checkpointer=memory) >>> config = {"configurable": {"thread_id": "1"}} >>> input = 3 >>> output = await graph.ainvoke(input, config) >>> print(f"{input=}, {output=}")
>>> if __name__ == "__main__": >>> asyncio.run(main()) input=3, output=4
Methods
__init__(client[, db_name, ...])adelete_thread(thread_id)Delete all checkpoints and writes associated with a specific thread ID asynchronously.
aget(config)Asynchronously fetch a checkpoint using the given configuration.
aget_tuple(config)Get a checkpoint tuple from the database asynchronously.
alist(config, *[, filter, before, limit])List checkpoints from the database asynchronously.
aput(config, checkpoint, metadata, new_versions)Save a checkpoint to the database asynchronously.
aput_writes(config, writes, task_id[, task_path])Store intermediate writes linked to a checkpoint asynchronously.
delete_thread(thread_id)Delete all checkpoints and writes associated with a specific thread ID.
from_conn_string(conn_string[, db_name, ...])Create asynchronous checkpointer
get(config)Fetch a checkpoint using the given configuration.
get_next_version(current, channel)Generate the next version ID for a channel.
get_tuple(config)Get a checkpoint tuple from the database.
list(config, *[, filter, before, limit])List checkpoints from the database.
put(config, checkpoint, metadata, new_versions)Save a checkpoint to the database.
put_writes(config, writes, task_id[, task_path])Store intermediate writes linked to a checkpoint.
- Parameters:
client (AsyncMongoClient)
db_name (str)
checkpoint_collection_name (str)
writes_collection_name (str)
ttl (Optional[int])
kwargs (Any)
- __init__(client: AsyncMongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints_aio', writes_collection_name: str = 'checkpoint_writes_aio', ttl: int | None = None, **kwargs: Any) None[source]#
- Parameters:
client (AsyncMongoClient)
db_name (str)
checkpoint_collection_name (str)
writes_collection_name (str)
ttl (int | None)
kwargs (Any)
- Return type:
None
- async adelete_thread(thread_id: str) None[source]#
Delete all checkpoints and writes associated with a specific thread ID asynchronously.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- async aget(config: RunnableConfig) Checkpoint | None#
Asynchronously fetch a checkpoint using the given configuration.
- Parameters:
config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.
- Returns:
The requested checkpoint, or None if not found.
- Return type:
Optional[Checkpoint]
- async aget_tuple(config: RunnableConfig) CheckpointTuple | None[source]#
Get a checkpoint tuple from the database asynchronously.
This method retrieves a checkpoint tuple from the MongoDB database based on the provided config. If the config contains a “checkpoint_id” key, the checkpoint with the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint for the given thread ID is retrieved.
- Parameters:
config (RunnableConfig) – The config to use for retrieving the checkpoint.
- Returns:
The retrieved checkpoint tuple, or None if no matching checkpoint was found.
- Return type:
Optional[CheckpointTuple]
- async alist(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) AsyncIterator[CheckpointTuple][source]#
List checkpoints from the database asynchronously.
This method retrieves a list of checkpoint tuples from the MongoDB database based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
- Parameters:
config (Optional[RunnableConfig]) – Base configuration for filtering checkpoints.
filter (Optional[dict[str, Any]]) – Additional filtering criteria for metadata.
before (Optional[RunnableConfig]) – If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]) – Maximum number of checkpoints to return.
- Yields:
AsyncIterator[CheckpointTuple] – An asynchronous iterator of matching checkpoint tuples.
- Return type:
AsyncIterator[CheckpointTuple]
- async aput(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#
Save a checkpoint to the database asynchronously.
This method saves a checkpoint to the MongoDB database. The checkpoint is associated with the provided config and its parent config (if any).
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint.
checkpoint (Checkpoint) – The checkpoint to save.
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.
new_versions (ChannelVersions) – New channel versions as of this write.
- Returns:
Updated configuration after storing the checkpoint.
- Return type:
RunnableConfig
- async aput_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#
Store intermediate writes linked to a checkpoint asynchronously.
This method saves intermediate writes associated with a checkpoint to the database.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Return type:
None
- delete_thread(thread_id: str) None[source]#
Delete all checkpoints and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- classmethod from_conn_string(conn_string: str, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints_aio', writes_collection_name: str = 'checkpoint_writes_aio', ttl: int | None = None, **kwargs: Any) AsyncIterator[AsyncMongoDBSaver][source]#
Create asynchronous checkpointer
This includes creation of collections and indexes if they don’t exist
- Parameters:
conn_string (str)
db_name (str)
checkpoint_collection_name (str)
writes_collection_name (str)
ttl (int | None)
kwargs (Any)
- Return type:
AsyncIterator[AsyncMongoDBSaver]
- get(config: RunnableConfig) Checkpoint | None#
Fetch a checkpoint using the given configuration.
- Parameters:
config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.
- Returns:
The requested checkpoint, or None if not found.
- Return type:
Optional[Checkpoint]
- get_next_version(current: V | None, channel: ChannelProtocol) V#
Generate the next version ID for a channel.
Default is to use integer versions, incrementing by 1. If you override, you can use str/int/float versions, as long as they are monotonically increasing.
- Parameters:
current (Optional[V]) – The current version identifier (int, float, or str).
channel (BaseChannel) – The channel being versioned.
- Returns:
The next version identifier, which must be increasing.
- Return type:
V
- get_tuple(config: RunnableConfig) CheckpointTuple | None[source]#
Get a checkpoint tuple from the database.
This method retrieves a checkpoint tuple from the MongoDB database based on the provided config. If the config contains a “checkpoint_id” key, the checkpoint with the matching thread ID and “checkpoint_id” is retrieved. Otherwise, the latest checkpoint for the given thread ID is retrieved.
- Parameters:
config (RunnableConfig) – The config to use for retrieving the checkpoint.
- Returns:
The retrieved checkpoint tuple, or None if no matching checkpoint was found.
- Return type:
Optional[CheckpointTuple]
- list(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) Iterator[CheckpointTuple][source]#
List checkpoints from the database.
- This method retrieves a list of checkpoint tuples from the MongoDB database
based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
- Parameters:
config (Optional[RunnableConfig]) – Base configuration for filtering checkpoints.
filter (Optional[dict[str, Any]]) – Additional filtering criteria for metadata.
before (Optional[RunnableConfig]) – If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]) – Maximum number of checkpoints to return.
- Yields:
Iterator[CheckpointTuple] – An iterator of matching checkpoint tuples.
- Return type:
Iterator[CheckpointTuple]
- put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#
Save a checkpoint to the database.
This method saves a checkpoint to the MongoDB database. The checkpoint is associated with the provided config and its parent config (if any).
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint.
checkpoint (Checkpoint) – The checkpoint to save.
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.
new_versions (ChannelVersions) – New channel versions as of this write.
- Returns:
Updated configuration after storing the checkpoint.
- Return type:
RunnableConfig
- put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#
Store intermediate writes linked to a checkpoint.
This method saves intermediate writes associated with a checkpoint to the database.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.
task_id (str) – Identifier for the task creating the writes.
task_path (str)
- Return type:
None