MongoDBAtlasFullTextSearchRetriever#

class langchain_mongodb.retrievers.full_text_search.MongoDBAtlasFullTextSearchRetriever[source]#

Bases: BaseRetriever

Retriever performs full-text searches using Lucene’s standard (BM25) analyzer.

Note

MongoDBAtlasFullTextSearchRetriever implements the standard Runnable Interface. 🏃

The Runnable Interface has additional methods that are available on runnables, such as with_types, with_retry, assign, bind, get_graph, and more.

param collection: Collection [Required]#

MongoDB Collection on an Atlas cluster

param filter: Dict[str, Any] | None = None#

(Optional) List of MQL match expression comparing an indexed field

param include_scores: bool = True#

If True, include scores that provide measure of relative relevance

param k: int | None = None#

Number of documents to return. Default is no limit

param metadata: dict[str, Any] | None = None#

Optional metadata associated with the retriever.

This metadata will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks.

You can use these to eg identify a specific instance of a retriever with its use case.

param num_docs_to_rerank: int | None = None#

Candidates passed to the reranker. Defaults to k. Max 1000.

param rerank_model: str | None = None#

Voyage AI reranking model (e.g. ‘rerank-2.5’). Uses latest model if omitted.

param rerank_path: str | List[str] | None = None#

Field or list of fields to rerank on. Enables $rerank when set.

param search_field: str | List[str] [Required]#

Collection field that contains the text to be searched. It must be indexed

param search_index_name: str [Required]#

Atlas Search Index name

param tags: list[str] | None = None#

Optional list of tags associated with the retriever.

These tags will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks.

You can use these to eg identify a specific instance of a retriever with its use case.

async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output]#

Default implementation runs ainvoke in parallel using asyncio.gather.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

Parameters:
  • inputs (list[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | list[RunnableConfig] | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Returns:

A list of outputs from the Runnable.

Return type:

list[Output]

async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]]#

Run ainvoke in parallel on a list of inputs.

Yields results as they complete.

Parameters:
  • inputs (Sequence[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | Sequence[RunnableConfig] | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

A tuple of the index of the input and the output from the Runnable.

Return type:

AsyncIterator[tuple[int, Output | Exception]]

async ainvoke(input: str, config: RunnableConfig | None = None, **kwargs: Any) list[Document]#

Asynchronously invoke the retriever to get relevant documents.

Main entry point for asynchronous retriever invocations.

Parameters:
  • input (str) – The query string.

  • config (RunnableConfig | None) – Configuration for the retriever.

  • **kwargs (Any) – Additional arguments to pass to the retriever.

Returns:

List of relevant documents.

Return type:

list[Document]

Examples: `python await retriever.ainvoke("query") `

async astream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output]#

Default implementation of astream, which calls ainvoke.

Subclasses must override this method if they support streaming output.

Parameters:
  • input (Input) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

The output of the Runnable.

Return type:

AsyncIterator[Output]

astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2', 'v3'] = 'v2', include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StreamEvent] | Awaitable[Any]#

Generate a stream of events.

Use to create an iterator over StreamEvent that provide real-time information about the progress of the Runnable, including StreamEvent from intermediate results.

A StreamEvent is a dictionary with the following schema:

  • event: Event names are of the format:

    on_[runnable_type]_(start|stream|end).

  • name: The name of the Runnable that generated the event.

  • run_id: Randomly generated ID associated with the given execution of the

    Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.

  • parent_ids: The IDs of the parent runnables that generated the event. The

    root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

  • tags: The tags of the Runnable that generated the event.

  • metadata: The metadata of the Runnable that generated the event.

  • data: The data associated with the event. The contents of this field

    depend on the type of event. See the table below for more details.

Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.

!!! note

This reference table is for the v2 version of the schema.

event | name | chunk | input | output |
———————- | ——————– | ———————————– | ————————————————- | ————————————————— |
on_chat_model_start | ‘[model name]’ | | {“messages”: [[SystemMessage, HumanMessage]]} | |
on_chat_model_stream | ‘[model name]’ | AIMessageChunk(content=”hello”) | | |
on_chat_model_end | ‘[model name]’ | | {“messages”: [[SystemMessage, HumanMessage]]} | AIMessageChunk(content=”hello world”) |
on_llm_start | ‘[model name]’ | | {‘input’: ‘hello’} | |
on_llm_stream | ‘[model name]’ | `’Hello’ ` | | |
on_llm_end | ‘[model name]’ | | ‘Hello human!’ | |
on_chain_start | ‘format_docs’ | | | |
on_chain_stream | ‘format_docs’ | ‘hello world!, goodbye world!’ | | |
on_chain_end | ‘format_docs’ | | [Document(…)] | ‘hello world!, goodbye world!’ |
on_tool_start | ‘some_tool’ | | {“x”: 1, “y”: “2”} | |
on_tool_end | ‘some_tool’ | | | {“x”: 1, “y”: “2”} |
on_retriever_start | ‘[retriever name]’ | | {“query”: “hello”} | |
on_retriever_end | ‘[retriever name]’ | | {“query”: “hello”} | [Document(…), ..] |
on_prompt_start | ‘[template_name]’ | | {“question”: “hello”} | |
on_prompt_end | ‘[template_name]’ | | {“question”: “hello”} | ChatPromptValue(messages: [SystemMessage, …]) |

In addition to the standard events, users can also dispatch custom events (see example below).

Custom events will be only be surfaced with in the v2 version of the API!

A custom event has following format:

Attribute | Type | Description |
———– | —— | ——————————————————————————————————— |
name | str | A user defined name for the event. |
data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. |

Here are declarations associated with the standard events shown above:

format_docs:

```python def format_docs(docs: list[Document]) -> str:

‘’’Format the docs.’’’ return “, “.join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs) ```

some_tool:

```python @tool def some_tool(x: int, y: str) -> dict:

‘’’Some_tool.’’’ return {“x”: x, “y”: y}

```

prompt:

```python template = ChatPromptTemplate.from_messages(

[

(“system”, “You are Cat Agent 007”), (“human”, “{question}”),

]

).with_config({“run_name”: “my_template”, “tags”: [“my_template”]}) ```

!!! example

```python from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:

return s[::-1]

chain = RunnableLambda(func=reverse)

events = [

event async for event in chain.astream_events(“hello”, version=”v2”)

]

# Will produce the following events # (run_id, and parent_ids has been omitted for brevity): [

{

“data”: {“input”: “hello”}, “event”: “on_chain_start”, “metadata”: {}, “name”: “reverse”, “tags”: [],

}, {

“data”: {“chunk”: “olleh”}, “event”: “on_chain_stream”, “metadata”: {}, “name”: “reverse”, “tags”: [],

}, {

“data”: {“output”: “olleh”}, “event”: “on_chain_end”, “metadata”: {}, “name”: “reverse”, “tags”: [],

},

```python title=”Dispatch custom event” from langchain_core.callbacks.manager import (

adispatch_custom_event,

) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio

async def slow_thing(some_input: str, config: RunnableConfig) -> str:

“””Do something that takes a long time.””” await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event(

“progress_event”, {“message”: “Finished step 1 of 3”}, config=config # Must be included for python < 3.10

) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event(

“progress_event”, {“message”: “Finished step 2 of 3”}, config=config # Must be included for python < 3.10

) await asyncio.sleep(1) # Placeholder for some slow operation return “Done”

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events(“some_input”, version=”v2”):

print(event)

```

Parameters:
  • input (Any) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • version (Literal['v1', 'v2', 'v3']) –

    The version of the schema to use. One of ‘v1’, ‘v2’, or ‘v3’.

    Most callers should use ‘v2’ (the default), which yields StreamEvent dicts and supports custom events.

    ’v3’ selects the typed, content-block-centric streaming protocol and is only supported on Runnable subclasses that implement it (currently BaseChatModel and langgraph.CompiledGraph); on a generic Runnable it raises NotImplementedError. The ‘v3’ API is in beta and may change. See the subclass override (e.g. BaseChatModel.astream_events) for the v3 return shape.

    ’v1’ is retained for backwards compatibility and will be deprecated in 0.4.0. Custom events are only surfaced in ‘v2’ / ‘v3’.

  • include_names (Sequence[str] | None) – Only include events from Runnable objects with matching names.

  • include_types (Sequence[str] | None) – Only include events from Runnable objects with matching types.

  • include_tags (Sequence[str] | None) – Only include events from Runnable objects with matching tags.

  • exclude_names (Sequence[str] | None) – Exclude events from Runnable objects with matching names.

  • exclude_types (Sequence[str] | None) – Exclude events from Runnable objects with matching types.

  • exclude_tags (Sequence[str] | None) – Exclude events from Runnable objects with matching tags.

  • **kwargs (Any) – Additional keyword arguments to pass to the Runnable.

Yields:

An async stream of StreamEvent.

Raises:

NotImplementedError – If the version is not ‘v1’, ‘v2’, or ‘v3’, or if ‘v3’ is requested on a Runnable that does not implement the v3 streaming protocol.

Return type:

AsyncIterator[StreamEvent] | Awaitable[Any]

batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output]#

Default implementation runs invoke in parallel using a thread pool executor.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

Parameters:
  • inputs (list[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | list[RunnableConfig] | None) –

    A config to use when invoking the Runnable. The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Returns:

A list of outputs from the Runnable.

Return type:

list[Output]

batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) Iterator[tuple[int, Output | Exception]]#

Run invoke in parallel on a list of inputs.

Yields results as they complete.

Parameters:
  • inputs (Sequence[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | Sequence[RunnableConfig] | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

Tuples of the index of the input and the output from the Runnable.

Return type:

Iterator[tuple[int, Output | Exception]]

bind(**kwargs: Any) Runnable[Input, Output]#

Bind arguments to a Runnable, returning a new Runnable.

Useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.

Parameters:

**kwargs (Any) – The arguments to bind to the Runnable.

Returns:

A new Runnable with the arguments bound.

Return type:

Runnable[Input, Output]

Example

```python from langchain_ollama import ChatOllama from langchain_core.output_parsers import StrOutputParser

model = ChatOllama(model=”llama3.1”)

# Without bind chain = model | StrOutputParser()

chain.invoke(“Repeat quoted words exactly: ‘One two three four five.’”) # Output is ‘One two three four five.’

# With bind chain = model.bind(stop=[“three”]) | StrOutputParser()

chain.invoke(“Repeat quoted words exactly: ‘One two three four five.’”) # Output is ‘One two’ ```

close() None[source]#

Close the resources used by the MongoDBAtlasFullTextSearchRetriever.

Return type:

None

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) RunnableSerializable#

Configure alternatives for Runnable objects that can be set at runtime.

Parameters:
  • which (ConfigurableField) – The ConfigurableField instance that will be used to select the alternative.

  • default_key (str) – The default key to use if no alternative is selected.

  • prefix_keys (bool) – Whether to prefix the keys with the ConfigurableField id.

  • **kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – A dictionary of keys to Runnable instances or callables that return Runnable instances.

Returns:

A new Runnable with the alternatives configured.

Return type:

RunnableSerializable

!!! example

```python from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI

model = ChatAnthropic(

model_name=”claude-sonnet-4-5-20250929”

).configurable_alternatives(

ConfigurableField(id=”llm”), default_key=”anthropic”, openai=ChatOpenAI(),

)

# uses the default model ChatAnthropic print(model.invoke(“which organization created you?”).content)

# uses ChatOpenAI print(

model.with_config(configurable={“llm”: “openai”}) .invoke(“which organization created you?”) .content

configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) RunnableSerializable#

Configure particular Runnable fields at runtime.

Parameters:

**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – A dictionary of ConfigurableField instances to configure.

Raises:

ValueError – If a configuration key is not found in the Runnable.

Returns:

A new Runnable with the fields configured.

Return type:

RunnableSerializable

!!! example

```python from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(

id=”output_token_number”, name=”Max tokens in the output”, description=”The maximum number of tokens in the output”,

)

)

# max_tokens = 20 print(

“max_tokens_20: “, model.invoke(“tell me something about chess”).content

)

# max_tokens = 200 print(

“max_tokens_200: “, model.with_config(configurable={“output_token_number”: 200}) .invoke(“tell me something about chess”) .content,

invoke(input: str, config: RunnableConfig | None = None, **kwargs: Any) list[Document]#

Invoke the retriever to get relevant documents.

Main entry point for synchronous retriever invocations.

Parameters:
  • input (str) – The query string.

  • config (RunnableConfig | None) – Configuration for the retriever.

  • **kwargs (Any) – Additional arguments to pass to the retriever.

Returns:

List of relevant documents.

Return type:

list[Document]

Examples: `python retriever.invoke("query") `

stream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output]#

Default implementation of stream, which calls invoke.

Subclasses must override this method if they support streaming output.

Parameters:
  • input (Input) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

The output of the Runnable.

Return type:

Iterator[Output]

stream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2', 'v3'] = 'v2', include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) Iterator[StreamEvent] | Iterator[Any]#

Generate a stream of events synchronously.

Synchronous counterpart to astream_events. For version=’v3’, subclasses that implement the v3 streaming protocol (BaseChatModel, CompiledGraph) override this method. All other versions and base-class calls raise NotImplementedError.

Parameters:
  • input (Any) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • version (Literal['v1', 'v2', 'v3']) – The version of the schema to use. ‘v3’ requires a subclass that implements the v3 streaming protocol. ‘v1’ and ‘v2’ are not supported on the sync path.

  • include_names (Sequence[str] | None) – Only include events from Runnable objects with matching names.

  • include_types (Sequence[str] | None) – Only include events from Runnable objects with matching types.

  • include_tags (Sequence[str] | None) – Only include events from Runnable objects with matching tags.

  • exclude_names (Sequence[str] | None) – Exclude events from Runnable objects with matching names.

  • exclude_types (Sequence[str] | None) – Exclude events from Runnable objects with matching types.

  • exclude_tags (Sequence[str] | None) – Exclude events from Runnable objects with matching tags.

  • **kwargs (Any) – Additional keyword arguments to pass to the Runnable.

Raises:

NotImplementedError – Always. Subclasses override this method for supported versions.

Return type:

Iterator[StreamEvent] | Iterator[Any]

with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output]#

Bind async lifecycle listeners to a Runnable.

Returns a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

Parameters:
  • on_start (AsyncListener | None) – Called asynchronously before the Runnable starts running, with the Run object.

  • on_end (AsyncListener | None) – Called asynchronously after the Runnable finishes running, with the Run object.

  • on_error (AsyncListener | None) – Called asynchronously if the Runnable throws an error, with the Run object.

Returns:

A new Runnable with the listeners bound.

Return type:

Runnable[Input, Output]

Example

```python from langchain_core.runnables import RunnableLambda, Runnable from datetime import datetime, timezone import time import asyncio

def format_t(timestamp: float) -> str:

return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()

async def test_runnable(time_to_sleep: int):

print(f”Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}”) await asyncio.sleep(time_to_sleep) print(f”Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}”)

async def fn_start(run_obj: Runnable):

print(f”on start callback starts at {format_t(time.time())}”) await asyncio.sleep(3) print(f”on start callback ends at {format_t(time.time())}”)

async def fn_end(run_obj: Runnable):

print(f”on end callback starts at {format_t(time.time())}”) await asyncio.sleep(2) print(f”on end callback ends at {format_t(time.time())}”)

runnable = RunnableLambda(test_runnable).with_alisteners(

on_start=fn_start, on_end=fn_end

)

async def concurrent_runs():

await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs()) # Result: # on start callback starts at 2025-03-01T07:05:22.875378+00:00 # on start callback starts at 2025-03-01T07:05:22.875495+00:00 # on start callback ends at 2025-03-01T07:05:25.878862+00:00 # on start callback ends at 2025-03-01T07:05:25.878947+00:00 # Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00 # Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00 # Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00 # on end callback starts at 2025-03-01T07:05:27.882360+00:00 # Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00 # on end callback starts at 2025-03-01T07:05:28.882428+00:00 # on end callback ends at 2025-03-01T07:05:29.883893+00:00 # on end callback ends at 2025-03-01T07:05:30.884831+00:00 ```

with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output]#

Bind config to a Runnable, returning a new Runnable.

Parameters:
  • config (RunnableConfig | None) – The config to bind to the Runnable.

  • **kwargs (Any) – Additional keyword arguments to pass to the Runnable.

Returns:

A new Runnable with the config bound.

Return type:

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: str | None = None) RunnableWithFallbacksT[Input, Output]#

Add fallbacks to a Runnable, returning a new Runnable.

The new Runnable will try the original Runnable, and then each fallback in order, upon failures.

Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original Runnable fails.

  • exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle.

  • exception_key (str | None) –

    If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key.

    If None, exceptions will not be passed to fallbacks.

    If used, the base Runnable and its fallbacks must accept a dictionary as input.

Returns:

A new Runnable that will try the original Runnable, and then each

Fallback in order, upon failures.

Return type:

RunnableWithFallbacksT[Input, Output]

Example

```python from typing import Iterator

from langchain_core.runnables import RunnableGenerator

def _generate_immediate_error(input: Iterator) -> Iterator[str]:

raise ValueError() yield “”

def _generate(input: Iterator) -> Iterator[str]:

yield from “foo bar”

runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(

[RunnableGenerator(_generate)]

) print(“”.join(runnable.stream({}))) # foo bar ```

Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original Runnable fails.

  • exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle.

  • exception_key (str | None) –

    If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key.

    If None, exceptions will not be passed to fallbacks.

    If used, the base Runnable and its fallbacks must accept a dictionary as input.

Returns:

A new Runnable that will try the original Runnable, and then each

Fallback in order, upon failures.

Return type:

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output]#

Bind lifecycle listeners to a Runnable, returning a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

Parameters:
  • on_start (Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None) – Called before the Runnable starts running, with the Run object.

  • on_end (Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None) – Called after the Runnable finishes running, with the Run object.

  • on_error (Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None) – Called if the Runnable throws an error, with the Run object.

Returns:

A new Runnable with the listeners bound.

Return type:

Runnable[Input, Output]

Example

```python from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep: int):

time.sleep(time_to_sleep)

def fn_start(run_obj: Run):

print(“start_time:”, run_obj.start_time)

def fn_end(run_obj: Run):

print(“end_time:”, run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(

on_start=fn_start, on_end=fn_end

) chain.invoke(2) ```

with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, exponential_jitter_params: ExponentialJitterParams | None = None, stop_after_attempt: int = 3) Runnable[Input, Output]#

Create a new Runnable that retries the original Runnable on exceptions.

Parameters:
  • retry_if_exception_type (tuple[type[BaseException], ...]) – A tuple of exception types to retry on.

  • wait_exponential_jitter (bool) – Whether to add jitter to the wait time between retries.

  • stop_after_attempt (int) – The maximum number of attempts to make before giving up.

  • exponential_jitter_params (ExponentialJitterParams | None) – Parameters for tenacity.wait_exponential_jitter. Namely: initial, max, exp_base, and jitter (all float values).

Returns:

A new Runnable that retries the original Runnable on exceptions.

Return type:

Runnable[Input, Output]

Example

```python from langchain_core.runnables import RunnableLambda

count = 0

def _lambda(x: int) -> None:

global count count = count + 1 if x == 1:

raise ValueError(“x is 1”)

else:

pass

runnable = RunnableLambda(_lambda) try:

runnable.with_retry(

stop_after_attempt=2, retry_if_exception_type=(ValueError,),

).invoke(1)

except ValueError:

pass

assert count == 2 ```

with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output]#

Bind input and output types to a Runnable, returning a new Runnable.

Parameters:
  • input_type (type[Input] | None) – The input type to bind to the Runnable.

  • output_type (type[Output] | None) – The output type to bind to the Runnable.

Returns:

A new Runnable with the types bound.

Return type:

Runnable[Input, Output]

top_k: Annotated[int | None, FieldInfo(annotation=NoneType, required=True, deprecated='top_k is deprecated, use "k" instead')]#

Read-only data descriptor used to emit a runtime deprecation warning before accessing a deprecated field.

msg#

The deprecation message to be emitted.

wrapped_property#

The property instance if the deprecated field is a computed field, or None.

field_name#

The name of the field being deprecated.