AsyncMongoDBSaver#

class langgraph.checkpoint.mongodb.aio.AsyncMongoDBSaver(client: AsyncMongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints_aio', writes_collection_name: str = 'checkpoint_writes_aio', ttl: int | None = None, **kwargs: Any)[source]#

A checkpoint saver that stores checkpoints in a MongoDB database asynchronously.

The synchronous MongoDBSaver has extended documentation, but Asynchronous usage is shown below.

Examples

>>> import asyncio
>>> from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSaver
>>> from langgraph.graph import StateGraph
>>> async def main() -> None:
>>>     builder = StateGraph(int)
>>>     builder.add_node("add_one", lambda x: x + 1)
>>>     builder.set_entry_point("add_one")
>>>     builder.set_finish_point("add_one")
>>>     async with AsyncMongoDBSaver.from_conn_string("mongodb://localhost:27017") as memory:
>>>         graph = builder.compile(checkpointer=memory)
>>>         config = {"configurable": {"thread_id": "1"}}
>>>         input = 3
>>>         output = await graph.ainvoke(input, config)
>>>         print(f"{input=}, {output=}")
>>> if __name__ == "__main__":
>>>     asyncio.run(main())
input=3, output=4

Methods

__init__(client[, db_name, ...])

adelete_thread(thread_id)

Delete all checkpoints and writes associated with a specific thread ID asynchronously.

aget(config)

Asynchronously fetch a checkpoint using the given configuration.

aget_tuple(config)

Get a checkpoint tuple from the database asynchronously.

alist(config, *[, filter, before, limit])

List checkpoints from the database asynchronously.

aput(config, checkpoint, metadata, new_versions)

Save a checkpoint to the database asynchronously.

aput_writes(config, writes, task_id[, task_path])

Store intermediate writes linked to a checkpoint asynchronously.

delete_thread(thread_id)

Delete all checkpoints and writes associated with a specific thread ID.

from_conn_string(conn_string[, db_name, ...])

Create asynchronous checkpointer

get(config)

Fetch a checkpoint using the given configuration.

get_next_version(current, channel)

Generate the next version ID for a channel.

get_tuple(config)

Get a checkpoint tuple from the database.

list(config, *[, filter, before, limit])

List checkpoints from the database.

put(config, checkpoint, metadata, new_versions)

Save a checkpoint to the database.

put_writes(config, writes, task_id[, task_path])

Store intermediate writes linked to a checkpoint.

Parameters:
  • client (AsyncMongoClient)

  • db_name (str)

  • checkpoint_collection_name (str)

  • writes_collection_name (str)

  • ttl (Optional[int])

  • kwargs (Any)

__init__(client: AsyncMongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints_aio', writes_collection_name: str = 'checkpoint_writes_aio', ttl: int | None = None, **kwargs: Any) None[source]#
Parameters:
  • client (AsyncMongoClient)

  • db_name (str)

  • checkpoint_collection_name (str)

  • writes_collection_name (str)

  • ttl (int | None)

  • kwargs (Any)

Return type:

None

async adelete_thread(thread_id: str) None[source]#

Delete all checkpoints and writes associated with a specific thread ID asynchronously.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

async aget(config: RunnableConfig) Checkpoint | None#

Asynchronously fetch a checkpoint using the given configuration.

Parameters:

config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.

Returns:

The requested checkpoint, or None if not found.

Return type:

Optional[Checkpoint]

async aget_tuple(config: RunnableConfig) CheckpointTuple | None[source]#

Get a checkpoint tuple from the database asynchronously.

This method retrieves a checkpoint tuple from the MongoDB database based on the provided config. If the config contains a “checkpoint_id” key, the checkpoint with the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint for the given thread ID is retrieved.

Parameters:

config (RunnableConfig) – The config to use for retrieving the checkpoint.

Returns:

The retrieved checkpoint tuple, or None if no matching checkpoint was found.

Return type:

Optional[CheckpointTuple]

async alist(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) AsyncIterator[CheckpointTuple][source]#

List checkpoints from the database asynchronously.

This method retrieves a list of checkpoint tuples from the MongoDB database based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).

Parameters:
  • config (Optional[RunnableConfig]) – Base configuration for filtering checkpoints.

  • filter (Optional[dict[str, Any]]) – Additional filtering criteria for metadata.

  • before (Optional[RunnableConfig]) – If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.

  • limit (Optional[int]) – Maximum number of checkpoints to return.

Yields:

AsyncIterator[CheckpointTuple] – An asynchronous iterator of matching checkpoint tuples.

Return type:

AsyncIterator[CheckpointTuple]

async aput(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#

Save a checkpoint to the database asynchronously.

This method saves a checkpoint to the MongoDB database. The checkpoint is associated with the provided config and its parent config (if any).

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint.

  • checkpoint (Checkpoint) – The checkpoint to save.

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.

  • new_versions (ChannelVersions) – New channel versions as of this write.

Returns:

Updated configuration after storing the checkpoint.

Return type:

RunnableConfig

async aput_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#

Store intermediate writes linked to a checkpoint asynchronously.

This method saves intermediate writes associated with a checkpoint to the database.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Return type:

None

delete_thread(thread_id: str) None[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

classmethod from_conn_string(conn_string: str, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints_aio', writes_collection_name: str = 'checkpoint_writes_aio', ttl: int | None = None, **kwargs: Any) AsyncIterator[AsyncMongoDBSaver][source]#

Create asynchronous checkpointer

This includes creation of collections and indexes if they don’t exist

Parameters:
  • conn_string (str)

  • db_name (str)

  • checkpoint_collection_name (str)

  • writes_collection_name (str)

  • ttl (int | None)

  • kwargs (Any)

Return type:

AsyncIterator[AsyncMongoDBSaver]

get(config: RunnableConfig) Checkpoint | None#

Fetch a checkpoint using the given configuration.

Parameters:

config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.

Returns:

The requested checkpoint, or None if not found.

Return type:

Optional[Checkpoint]

get_next_version(current: V | None, channel: ChannelProtocol) V#

Generate the next version ID for a channel.

Default is to use integer versions, incrementing by 1. If you override, you can use str/int/float versions, as long as they are monotonically increasing.

Parameters:
  • current (Optional[V]) – The current version identifier (int, float, or str).

  • channel (BaseChannel) – The channel being versioned.

Returns:

The next version identifier, which must be increasing.

Return type:

V

get_tuple(config: RunnableConfig) CheckpointTuple | None[source]#

Get a checkpoint tuple from the database.

This method retrieves a checkpoint tuple from the MongoDB database based on the provided config. If the config contains a “checkpoint_id” key, the checkpoint with the matching thread ID and “checkpoint_id” is retrieved. Otherwise, the latest checkpoint for the given thread ID is retrieved.

Parameters:

config (RunnableConfig) – The config to use for retrieving the checkpoint.

Returns:

The retrieved checkpoint tuple, or None if no matching checkpoint was found.

Return type:

Optional[CheckpointTuple]

list(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) Iterator[CheckpointTuple][source]#

List checkpoints from the database.

This method retrieves a list of checkpoint tuples from the MongoDB database

based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).

Parameters:
  • config (Optional[RunnableConfig]) – Base configuration for filtering checkpoints.

  • filter (Optional[dict[str, Any]]) – Additional filtering criteria for metadata.

  • before (Optional[RunnableConfig]) – If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.

  • limit (Optional[int]) – Maximum number of checkpoints to return.

Yields:

Iterator[CheckpointTuple] – An iterator of matching checkpoint tuples.

Return type:

Iterator[CheckpointTuple]

put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#

Save a checkpoint to the database.

This method saves a checkpoint to the MongoDB database. The checkpoint is associated with the provided config and its parent config (if any).

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint.

  • checkpoint (Checkpoint) – The checkpoint to save.

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.

  • new_versions (ChannelVersions) – New channel versions as of this write.

Returns:

Updated configuration after storing the checkpoint.

Return type:

RunnableConfig

put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#

Store intermediate writes linked to a checkpoint.

This method saves intermediate writes associated with a checkpoint to the database.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str)

Return type:

None