MongoDBSaver#
- class langgraph.checkpoint.mongodb.saver.MongoDBSaver(client: MongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints', writes_collection_name: str = 'checkpoint_writes', ttl: int | None = None, **kwargs: Any)[source]#
A checkpointer that stores StateGraph checkpoints in a MongoDB database.
A compound index as shown below will be added to each of the collections backing the saver (checkpoints, pending writes). If the collections pre-exist, and have indexes already, nothing will be done during initialization:
keys=[("thread_id", 1), ("checkpoint_ns", 1), ("checkpoint_id", -1)], unique=True,
- Parameters:
client (MongoClient) – The MongoDB connection.
db_name (Optional[str]) – Database name
checkpoint_collection_name (Optional[str]) – Name of Collection of Checkpoints
writes_collection_name (Optional[str]) – Name of Collection of intermediate writes.
ttl (Optional[int]) – Time to live in seconds. See https://www.mongodb.com/docs/manual/core/index-ttl/.
kwargs (Any)
Examples
>>> from langgraph.checkpoint.mongodb import MongoDBSaver >>> from langgraph.graph import StateGraph >>> from pymongo import MongoClient >>> >>> builder = StateGraph(int) >>> builder.add_node("add_one", lambda x: x + 1) >>> builder.set_entry_point("add_one") >>> builder.set_finish_point("add_one") >>> client = MongoClient("mongodb://localhost:27017") >>> memory = MongoDBSaver(client) >>> graph = builder.compile(checkpointer=memory) >>> config = {"configurable": {"thread_id": "1"}} >>> graph.get_state(config) >>> result = graph.invoke(3, config) >>> graph.get_state(config) StateSnapshot(values=4, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef8b22d-df71-6ddc-8001-7c821b5c45fd'}}, metadata={'source': 'loop', 'writes': {'add_one': 4}, 'step': 1, 'parents': {}}, created_at='2024-10-15T18:25:34.088329+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef8b22d-df6f-6eec-8000-20f621dcf3b7'}}, tasks=())
Adding sharding support:
>>> from langgraph.checkpoint.mongodb import MongoDBSaver >>> from pymongo import MongoClient >>> memory = MongoDBSaver(client) >>> client = MongoClient("mongodb://localhost:27017") >>> client.admin.command('enableSharding', memory.db_name) >>> shard_key = {'your_shard_key': 1} # Specify your shard key >>> client.admin.command('shardCollection', f'{memory.db_name}.{memory.checkpoint_collection_name}', key=shard_key) >>> client.admin.command('shardCollection', f'{memory.db_name}.{memory.writes_collection_name}', key=shard_key)
Methods
__init__(client[, db_name, ...])adelete_thread(thread_id)Delete all checkpoints and writes associated with a specific thread ID.
aget(config)Asynchronously fetch a checkpoint using the given configuration.
aget_tuple(config)Asynchronously fetch a checkpoint tuple using the given configuration.
alist(config, *[, filter, before, limit])Asynchronously list checkpoints that match the given criteria.
aput(config, checkpoint, metadata, new_versions)Asynchronously store a checkpoint with its configuration and metadata.
aput_writes(config, writes, task_id[, task_path])Asynchronously store intermediate writes linked to a checkpoint.
close()Close the resources used by the MongoDBSaver.
delete_thread(thread_id)Delete all checkpoints and writes associated with a specific thread ID.
from_conn_string([conn_string, db_name, ...])Context manager to create a MongoDB checkpoint saver.
get(config)Fetch a checkpoint using the given configuration.
get_next_version(current, channel)Generate the next version ID for a channel.
get_tuple(config)Get a checkpoint tuple from the database.
list(config, *[, filter, before, limit])List checkpoints from the database.
put(config, checkpoint, metadata, new_versions)Save a checkpoint to the database.
put_writes(config, writes, task_id[, task_path])Store intermediate writes linked to a checkpoint.
- __init__(client: MongoClient, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints', writes_collection_name: str = 'checkpoint_writes', ttl: int | None = None, **kwargs: Any) None[source]#
- Parameters:
client (MongoClient)
db_name (str)
checkpoint_collection_name (str)
writes_collection_name (str)
ttl (int | None)
kwargs (Any)
- Return type:
None
- async adelete_thread(thread_id: str) None[source]#
Delete all checkpoints and writes associated with a specific thread ID.
Asynchronously wraps the blocking self.delete_thread method.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- async aget(config: RunnableConfig) Checkpoint | None#
Asynchronously fetch a checkpoint using the given configuration.
- Parameters:
config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.
- Returns:
The requested checkpoint, or None if not found.
- Return type:
Optional[Checkpoint]
- async aget_tuple(config: RunnableConfig) CheckpointTuple | None[source]#
Asynchronously fetch a checkpoint tuple using the given configuration.
Asynchronously wraps the blocking self.get_tuple method.
- Parameters:
config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.
- Returns:
The requested checkpoint tuple, or None if not found.
- Return type:
Optional[CheckpointTuple]
- async alist(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) AsyncIterator[CheckpointTuple][source]#
Asynchronously list checkpoints that match the given criteria.
Asynchronously wraps the blocking self.list generator.
Runs self.list(…) in a background thread and yields its items asynchronously from an asyncio.Queue. This allows integration of synchronous iterators into async code.
- Parameters:
config (RunnableConfig | None) – Configuration object passed to self.list.
filter (dict[str, Any] | None) – Optional filter dictionary.
before (RunnableConfig | None) – Optional parameter to limit results before a given checkpoint.
limit (int | None) – Optional maximum number of results to yield.
- Yields:
AsyncIterator[CheckpointTuple] – An iterator of checkpoint tuples.
- Return type:
AsyncIterator[CheckpointTuple]
- async aput(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#
Asynchronously store a checkpoint with its configuration and metadata.
Asynchronously wraps the blocking self.put method.
- Parameters:
config (RunnableConfig) – Configuration for the checkpoint.
checkpoint (Checkpoint) – The checkpoint to store.
metadata (CheckpointMetadata) – Additional metadata for the checkpoint.
new_versions (dict[str, str | int | float]) – New channel versions as of this write.
- Returns:
Updated configuration after storing the checkpoint.
- Return type:
RunnableConfig
- async aput_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#
Asynchronously store intermediate writes linked to a checkpoint.
Asynchronously wraps the blocking self.put_writes method.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Return type:
None
- delete_thread(thread_id: str) None[source]#
Delete all checkpoints and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- classmethod from_conn_string(conn_string: str | None = None, db_name: str = 'checkpointing_db', checkpoint_collection_name: str = 'checkpoints', writes_collection_name: str = 'checkpoint_writes', ttl: int | None = None, **kwargs: Any) Iterator[MongoDBSaver][source]#
Context manager to create a MongoDB checkpoint saver.
A compound index as shown below will be added to each of the collections backing the saver (checkpoints, pending writes). If the collections pre-exist, and have indexes already, nothing will be done during initialization:
keys=[(“thread_id”, 1), (“checkpoint_ns”, 1), (“checkpoint_id”, -1)], unique=True
- Parameters:
conn_string (str | None) – MongoDB connection string. See [class:~pymongo.MongoClient].
db_name (str) – Database name. It will be created if it doesn’t exist.
checkpoint_collection_name (str) – Checkpoint Collection name. Created if it doesn’t exist.
writes_collection_name (str) – Collection name of intermediate writes. Created if it doesn’t exist.
ttl (Optional[int]) – Time to live in seconds.
kwargs (Any)
- Return type:
Iterator[MongoDBSaver]
Yields: A new MongoDBSaver.
- get(config: RunnableConfig) Checkpoint | None#
Fetch a checkpoint using the given configuration.
- Parameters:
config (RunnableConfig) – Configuration specifying which checkpoint to retrieve.
- Returns:
The requested checkpoint, or None if not found.
- Return type:
Optional[Checkpoint]
- get_next_version(current: V | None, channel: ChannelProtocol) V#
Generate the next version ID for a channel.
Default is to use integer versions, incrementing by 1. If you override, you can use str/int/float versions, as long as they are monotonically increasing.
- Parameters:
current (Optional[V]) – The current version identifier (int, float, or str).
channel (BaseChannel) – The channel being versioned.
- Returns:
The next version identifier, which must be increasing.
- Return type:
V
- get_tuple(config: RunnableConfig) CheckpointTuple | None[source]#
Get a checkpoint tuple from the database.
This method retrieves a checkpoint tuple from the MongoDB database based on the provided config. If the config contains a “checkpoint_id” key, the checkpoint with the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint for the given thread ID is retrieved.
- Args:
config (RunnableConfig): The config to use for retrieving the checkpoint.
- Returns:
Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found.
Examples
Basic: >>> config = {“configurable”: {“thread_id”: “1”}} >>> checkpoint_tuple = memory.get_tuple(config) >>> print(checkpoint_tuple) CheckpointTuple(…)
With checkpoint ID: >>> config = { … “configurable”: { … “thread_id”: “1”, … “checkpoint_ns”: “”, … “checkpoint_id”: “1ef4f797-8335-6428-8001-8a1503f9b875”, … } … } >>> checkpoint_tuple = memory.get_tuple(config) >>> print(checkpoint_tuple) CheckpointTuple(…)
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- list(config: RunnableConfig | None, *, filter: dict[str, Any] | None = None, before: RunnableConfig | None = None, limit: int | None = None) Iterator[CheckpointTuple][source]#
List checkpoints from the database.
This method retrieves a list of checkpoint tuples from the MongoDB database based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
- Parameters:
config (RunnableConfig) – The config to use for listing the checkpoints.
filter (Optional[dict[str, Any]]) – Additional filtering criteria for metadata. Defaults to None.
before (Optional[RunnableConfig]) – If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]) – The maximum number of checkpoints to return. Defaults to None.
- Yields:
Iterator[CheckpointTuple] – An iterator of checkpoint tuples.
Examples: >>> from langgraph.checkpoint.mongodb import MongoDBSaver >>> with MongoDBSaver.from_conn_string(“mongodb://localhost:27017”) as memory: … # Run a graph, then list the checkpoints >>> config = {“configurable”: {“thread_id”: “1”}} >>> checkpoints = list(memory.list(config, limit=2)) >>> print(checkpoints) [CheckpointTuple(…), CheckpointTuple(…)]
- Return type:
Iterator[CheckpointTuple]
- put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: dict[str, str | int | float]) RunnableConfig[source]#
Save a checkpoint to the database.
This method saves a checkpoint to the MongoDB database. The checkpoint is associated with the provided config and its parent config (if any).
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint.
checkpoint (Checkpoint) – The checkpoint to save.
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.
new_versions (ChannelVersions) – New channel versions as of this write.
- Returns:
Updated configuration after storing the checkpoint.
- Return type:
RunnableConfig
Examples
>>> from langgraph.checkpoint.mongodb import MongoDBSaver >>> with MongoDBSaver.from_conn_string("mongodb://localhost:27017") as memory: >>> config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} >>> checkpoint = {"ts": "2024-05-04T06:32:42.235444+00:00", "id": "1ef4f797-8335-6428-8001-8a1503f9b875", "data": {"key": "value"}} >>> saved_config = memory.put(config, checkpoint, {"source": "input", "step": 1, "writes": {"key": "value"}}, {}) >>> print(saved_config) {'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef4f797-8335-6428-8001-8a1503f9b875'}}
- put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str = '') None[source]#
Store intermediate writes linked to a checkpoint.
This method saves intermediate writes associated with a checkpoint to the MongoDB database.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Return type:
None