MongoDBSaver#

class langgraph.checkpoint.mongodb.saver.MongoDBSaver(client: MongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints', writes_collection_name: str = 'checkpoint_writes', ttl: int | None = None, **kwargs: Any)[source]#

A checkpointer that stores StateGraph checkpoints in a MongoDB database.

A compound index as shown below will be added to each of the collections backing the saver (checkpoints, pending writes). If the collections pre-exist, and have indexes already, nothing will be done during initialization:

keys=[("thread_id", 1), ("checkpoint_ns", 1), ("checkpoint_id", -1)],
unique=True,
Parameters:
  • client (MongoClient) – The MongoDB connection.

  • db_name (Optional[str]) – Database name

  • checkpoint_collection_name (Optional[str]) – Name of Collection of Checkpoints

  • writes_collection_name (Optional[str]) – Name of Collection of intermediate writes.

  • ttl (Optional[int]) – Time to live in seconds. See https://www.mongodb.com/docs/manual/core/index-ttl/.

  • kwargs (Any)

Examples

>>> from langgraph.checkpoint.mongodb import MongoDBSaver
>>> from langgraph.graph import StateGraph
>>> from pymongo import MongoClient
>>>
>>> builder = StateGraph(int)
>>> builder.add_node("add_one", lambda x: x + 1)
>>> builder.set_entry_point("add_one")
>>> builder.set_finish_point("add_one")
>>> client = MongoClient("mongodb://localhost:27017")
>>> memory = MongoDBSaver(client)
>>> graph = builder.compile(checkpointer=memory)
>>> config = {"configurable": {"thread_id": "1"}}
>>> graph.get_state(config)
>>> result = graph.invoke(3, config)
>>> graph.get_state(config)
StateSnapshot(values=4, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef8b22d-df71-6ddc-8001-7c821b5c45fd'}}, metadata={'source': 'loop', 'writes': {'add_one': 4}, 'step': 1, 'parents': {}}, created_at='2024-10-15T18:25:34.088329+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef8b22d-df6f-6eec-8000-20f621dcf3b7'}}, tasks=())

Adding sharding support:

>>> from langgraph.checkpoint.mongodb import MongoDBSaver
>>> from pymongo import MongoClient
>>> memory = MongoDBSaver(client)
>>> client = MongoClient("mongodb://localhost:27017")
>>> client.admin.command('enableSharding', memory.db_name)
>>> shard_key = {'your_shard_key': 1}  # Specify your shard key
>>> client.admin.command('shardCollection', f'{memory.db_name}.{memory.checkpoint_collection_name}', key=shard_key)
>>> client.admin.command('shardCollection', f'{memory.db_name}.{memory.writes_collection_name}', key=shard_key)

Methods

__init__(client[, db_name, ...])

adelete_thread(thread_id)

Delete all checkpoints and writes associated with a specific thread ID.

aget(config)

Asynchronously fetch a checkpoint using the given configuration.

aget_tuple(config)

Asynchronously fetch a checkpoint tuple using the given configuration.

alist(config, *[, filter, before, limit])

Asynchronously list checkpoints that match the given criteria.

aput(config, checkpoint, metadata, new_versions)

Asynchronously store a checkpoint with its configuration and metadata.

aput_writes(config, writes, task_id[, task_path])

Asynchronously store intermediate writes linked to a checkpoint.

close()

Close the resources used by the MongoDBSaver.

delete_thread(thread_id)

Delete all checkpoints and writes associated with a specific thread ID.

from_conn_string([conn_string, db_name, ...])

Context manager to create a MongoDB checkpoint saver.

get(config)

Fetch a checkpoint using the given configuration.

get_next_version(current, channel)

Generate the next version ID for a channel.

get_tuple(config)

Get a checkpoint tuple from the database.

list(config, *[, filter, before, limit])

List checkpoints from the database.

put(config, checkpoint, metadata, new_versions)

Save a checkpoint to the database.

put_writes(config, writes, task_id[, task_path])

Store intermediate writes linked to a checkpoint.

__init__(client: MongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints', writes_collection_name: str = 'checkpoint_writes', ttl: int | None = None, **kwargs: Any) None[source]#
Parameters:
  • client (MongoClient)

  • db_name (str)

  • checkpoint_collection_name (str)

  • writes_collection_name (str)

  • ttl (int | None)

  • kwargs (Any)

Return type:

None

async adelete_thread(thread_id: str) None[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Asynchronously wraps the blocking self.delete_thread method.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

async aget(config: RunnableConfig) Checkpoint | None#

Asynchronously fetch a checkpoint using the given configuration.

Parameters:

config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.

Returns:

The requested checkpoint, or None if not found.

Return type:

Optional[Checkpoint]

async aget_tuple(config: RunnableConfig) CheckpointTuple | None[source]#

Asynchronously fetch a checkpoint tuple using the given configuration.

Asynchronously wraps the blocking self.get_tuple method.

Parameters:

config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.

Returns:

The requested checkpoint tuple, or None if not found.

Return type:

Optional[CheckpointTuple]

async alist(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) AsyncIterator[CheckpointTuple][source]#

Asynchronously list checkpoints that match the given criteria.

Asynchronously wraps the blocking self.list generator.

Runs self.list(…) in a background thread and yields its items asynchronously from an asyncio.Queue. This allows integration of synchronous iterators into async code.

Parameters:
  • config (RunnableConfig | None) – Configuration object passed to self.list.

  • filter (dict[str, Any] | None) – Optional filter dictionary.

  • before (RunnableConfig | None) – Optional parameter to limit results before a given checkpoint.

  • limit (int | None) – Optional maximum number of results to yield.

Yields:

AsyncIterator[CheckpointTuple] – An iterator of checkpoint tuples.

Return type:

AsyncIterator[CheckpointTuple]

async aput(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#

Asynchronously store a checkpoint with its configuration and metadata.

Asynchronously wraps the blocking self.put method.

Parameters:
  • config (RunnableConfig) – Configuration for the checkpoint.

  • checkpoint (Checkpoint) – The checkpoint to store.

  • metadata (CheckpointMetadata) – Additional metadata for the checkpoint.

  • new_versions (dict[str, str | int | float]) – New channel versions as of this write.

Returns:

Updated configuration after storing the checkpoint.

Return type:

RunnableConfig

async aput_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#

Asynchronously store intermediate writes linked to a checkpoint.

Asynchronously wraps the blocking self.put_writes method.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Return type:

None

close() None[source]#

Close the resources used by the MongoDBSaver.

Return type:

None

delete_thread(thread_id: str) None[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

classmethod from_conn_string(conn_string: str | None = None, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints', writes_collection_name: str = 'checkpoint_writes', ttl: int | None = None, **kwargs: Any) Iterator[MongoDBSaver][source]#

Context manager to create a MongoDB checkpoint saver.

A compound index as shown below will be added to each of the collections backing the saver (checkpoints, pending writes). If the collections pre-exist, and have indexes already, nothing will be done during initialization:

keys=[(“thread_id”, 1), (“checkpoint_ns”, 1), (“checkpoint_id”, -1)], unique=True

Parameters:
  • conn_string (str | None) – MongoDB connection string. See [class:~pymongo.MongoClient].

  • db_name (str) – Database name. It will be created if it doesn’t exist.

  • checkpoint_collection_name (str) – Checkpoint Collection name. Created if it doesn’t exist.

  • writes_collection_name (str) – Collection name of intermediate writes. Created if it doesn’t exist.

  • ttl (Optional[int]) – Time to live in seconds.

  • kwargs (Any)

Return type:

Iterator[MongoDBSaver]

Yields: A new MongoDBSaver.

get(config: RunnableConfig) Checkpoint | None#

Fetch a checkpoint using the given configuration.

Parameters:

config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.

Returns:

The requested checkpoint, or None if not found.

Return type:

Optional[Checkpoint]

get_next_version(current: V | None, channel: ChannelProtocol) V#

Generate the next version ID for a channel.

Default is to use integer versions, incrementing by 1. If you override, you can use str/int/float versions, as long as they are monotonically increasing.

Parameters:
  • current (Optional[V]) – The current version identifier (int, float, or str).

  • channel (BaseChannel) – The channel being versioned.

Returns:

The next version identifier, which must be increasing.

Return type:

V

get_tuple(config: RunnableConfig) CheckpointTuple | None[source]#

Get a checkpoint tuple from the database.

This method retrieves a checkpoint tuple from the MongoDB database based on the provided config. If the config contains a “checkpoint_id” key, the checkpoint with the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint for the given thread ID is retrieved.

Args:

config (RunnableConfig): The config to use for retrieving the checkpoint.

Returns:

Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found.

Examples

Basic: >>> config = {“configurable”: {“thread_id”: “1”}} >>> checkpoint_tuple = memory.get_tuple(config) >>> print(checkpoint_tuple) CheckpointTuple(…)

With checkpoint ID: >>> config = { … “configurable”: { … “thread_id”: “1”, … “checkpoint_ns”: “”, … “checkpoint_id”: “1ef4f797-8335-6428-8001-8a1503f9b875”, … } … } >>> checkpoint_tuple = memory.get_tuple(config) >>> print(checkpoint_tuple) CheckpointTuple(…)

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

list(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) Iterator[CheckpointTuple][source]#

List checkpoints from the database.

This method retrieves a list of checkpoint tuples from the MongoDB database based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).

Parameters:
  • config (RunnableConfig) – The config to use for listing the checkpoints.

  • filter (Optional[dict[str, Any]]) – Additional filtering criteria for metadata. Defaults to None.

  • before (Optional[RunnableConfig]) – If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.

  • limit (Optional[int]) – The maximum number of checkpoints to return. Defaults to None.

Yields:

Iterator[CheckpointTuple] – An iterator of checkpoint tuples.

Examples: >>> from langgraph.checkpoint.mongodb import MongoDBSaver >>> with MongoDBSaver.from_conn_string(“mongodb://localhost:27017”) as memory: … # Run a graph, then list the checkpoints >>> config = {“configurable”: {“thread_id”: “1”}} >>> checkpoints = list(memory.list(config, limit=2)) >>> print(checkpoints) [CheckpointTuple(…), CheckpointTuple(…)]

Return type:

Iterator[CheckpointTuple]

put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#

Save a checkpoint to the database.

This method saves a checkpoint to the MongoDB database. The checkpoint is associated with the provided config and its parent config (if any).

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint.

  • checkpoint (Checkpoint) – The checkpoint to save.

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.

  • new_versions (ChannelVersions) – New channel versions as of this write.

Returns:

Updated configuration after storing the checkpoint.

Return type:

RunnableConfig

Examples

>>> from langgraph.checkpoint.mongodb import MongoDBSaver
>>> with MongoDBSaver.from_conn_string("mongodb://localhost:27017") as memory:
>>>     config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
>>>     checkpoint = {"ts": "2024-05-04T06:32:42.235444+00:00", "id": "1ef4f797-8335-6428-8001-8a1503f9b875", "data": {"key": "value"}}
>>>     saved_config = memory.put(config, checkpoint, {"source": "input", "step": 1, "writes": {"key": "value"}}, {})
>>> print(saved_config)
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef4f797-8335-6428-8001-8a1503f9b875'}}
put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#

Store intermediate writes linked to a checkpoint.

This method saves intermediate writes associated with a checkpoint to the MongoDB database.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Return type:

None