Source code for langchain_mongodb.index

"""Search Index Commands"""

import logging
from time import monotonic, sleep
from typing import Any, Callable, Dict, List, Optional

from pymongo.collection import Collection
from pymongo.operations import SearchIndexModel

logger = logging.getLogger(__file__)


def _vector_search_index_definition(
    dimensions: int,
    path: str,
    similarity: str,
    filters: Optional[List[str]] = None,
    **kwargs: Any,
) -> Dict[str, Any]:
    # https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/
    fields = [
        {
            "numDimensions": dimensions,
            "path": path,
            "similarity": similarity,
            "type": "vector",
        },
    ]
    if filters:
        for field in filters:
            fields.append({"type": "filter", "path": field})
    definition = {"fields": fields}
    definition.update(kwargs)
    return definition


[docs] def create_vector_search_index( collection: Collection, index_name: str, dimensions: int, path: str, similarity: str, filters: Optional[List[str]] = None, *, wait_until_complete: Optional[float] = None, **kwargs: Any, ) -> None: """Experimental Utility function to create a vector search index Args: collection (Collection): MongoDB Collection index_name (str): Name of Index dimensions (int): Number of dimensions in embedding path (str): field with vector embedding similarity (str): The similarity score used for the index filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch wait_until_complete (Optional[float]): If provided, number of seconds to wait until search index is ready. kwargs: Keyword arguments supplying any additional options to SearchIndexModel. """ logger.info("Creating Search Index %s on %s", index_name, collection.name) if collection.name not in collection.database.list_collection_names(): collection.database.create_collection(collection.name) result = collection.create_search_index( SearchIndexModel( definition=_vector_search_index_definition( dimensions=dimensions, path=path, similarity=similarity, filters=filters, **kwargs, ), name=index_name, type="vectorSearch", ) ) if wait_until_complete: _wait_for_predicate( predicate=lambda: _is_index_ready(collection, index_name), err=f"{index_name=} did not complete in {wait_until_complete}!", timeout=wait_until_complete, ) logger.info(result)
[docs] def drop_vector_search_index( collection: Collection, index_name: str, *, wait_until_complete: Optional[float] = None, ) -> None: """Drop a created vector search index Args: collection (Collection): MongoDB Collection with index to be dropped index_name (str): Name of the MongoDB index wait_until_complete (Optional[float]): If provided, number of seconds to wait until search index is ready. """ logger.info( "Dropping Search Index %s from Collection: %s", index_name, collection.name ) collection.drop_search_index(index_name) if wait_until_complete: _wait_for_predicate( predicate=lambda: len(list(collection.list_search_indexes())) == 0, err=f"Index {index_name} did not drop in {wait_until_complete}!", timeout=wait_until_complete, ) logger.info("Vector Search index %s.%s dropped", collection.name, index_name)
[docs] def update_vector_search_index( collection: Collection, index_name: str, dimensions: int, path: str, similarity: str, filters: Optional[List[str]] = None, *, wait_until_complete: Optional[float] = None, **kwargs: Any, ) -> None: """Update a search index. Replace the existing index definition with the provided definition. Args: collection (Collection): MongoDB Collection index_name (str): Name of Index dimensions (int): Number of dimensions in embedding path (str): field with vector embedding similarity (str): The similarity score used for the index. filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch wait_until_complete (Optional[float]): If provided, number of seconds to wait until search index is ready. kwargs: Keyword arguments supplying any additional options to SearchIndexModel. """ logger.info( "Updating Search Index %s from Collection: %s", index_name, collection.name ) collection.update_search_index( name=index_name, definition=_vector_search_index_definition( dimensions=dimensions, path=path, similarity=similarity, filters=filters, **kwargs, ), ) if wait_until_complete: _wait_for_predicate( predicate=lambda: _is_index_ready(collection, index_name), err=f"Index {index_name} update did not complete in {wait_until_complete}!", timeout=wait_until_complete, ) logger.info("Update succeeded")
def _is_index_ready(collection: Collection, index_name: str) -> bool: """Check for the index name in the list of available search indexes to see if the specified index is of status READY Args: collection (Collection): MongoDB Collection to for the search indexes index_name (str): Vector Search Index name Returns: bool : True if the index is present and READY false otherwise """ for index in collection.list_search_indexes(index_name): if index["status"] == "READY": return True return False def _wait_for_predicate( predicate: Callable, err: str, timeout: float = 120, interval: float = 0.5 ) -> None: """Generic to block until the predicate returns true Args: predicate (Callable[, bool]): A function that returns a boolean value err (str): Error message to raise if nothing occurs timeout (float, optional): Wait time for predicate. Defaults to TIMEOUT. interval (float, optional): Interval to check predicate. Defaults to DELAY. Raises: TimeoutError: _description_ """ start = monotonic() while not predicate(): if monotonic() - start > timeout: raise TimeoutError(err) sleep(interval)
[docs] def create_fulltext_search_index( collection: Collection, index_name: str, field: str, *, wait_until_complete: Optional[float] = None, **kwargs: Any, ) -> None: """Experimental Utility function to create an Atlas Search index Args: collection (Collection): MongoDB Collection index_name (str): Name of Index field (str): Field to index wait_until_complete (Optional[float]): If provided, number of seconds to wait until search index is ready kwargs: Keyword arguments supplying any additional options to SearchIndexModel. """ logger.info("Creating Search Index %s on %s", index_name, collection.name) if collection.name not in collection.database.list_collection_names(): collection.database.create_collection(collection.name) definition = { "mappings": {"dynamic": False, "fields": {field: [{"type": "string"}]}} } result = collection.create_search_index( SearchIndexModel( definition=definition, name=index_name, type="search", **kwargs, ) ) if wait_until_complete: _wait_for_predicate( predicate=lambda: _is_index_ready(collection, index_name), err=f"{index_name=} did not complete in {wait_until_complete}!", timeout=wait_until_complete, ) logger.info(result)