InfoMongoDBDatabaseTool#

class langchain_mongodb.agent_toolkit.tool.InfoMongoDBDatabaseTool[source]#

Bases: BaseMongoDBDatabaseTool, BaseTool

Tool for getting metadata about a MongoDB database.

Note

InfoMongoDBDatabaseTool implements the standard Runnable Interface. 🏃

The Runnable Interface has additional methods that are available on runnables, such as with_types, with_retry, assign, bind, get_graph, and more.

param args_schema: Type[BaseModel] = <class 'langchain_mongodb.agent_toolkit.tool._InfoMongoDBDatabaseToolInput'>#

Pydantic model class to validate and parse the tool’s input arguments.

Args schema should be either:

  • A subclass of pydantic.BaseModel.

  • A subclass of pydantic.v1.BaseModel if accessing v1 namespace in pydantic 2

  • A JSON schema dict

param callbacks: Callbacks = None#

Callbacks to be called during tool execution.

param db: MongoDBDatabase [Required]#
param description: str = 'Get the schema and sample documents for the specified MongoDB collections.'#

Used to tell the model how/when/why to use the tool.

You can provide few-shot examples as a part of the description.

param extras: dict[str, Any] | None = None#

Optional provider-specific extra fields for the tool.

This is used to pass provider-specific configuration that doesn’t fit into standard tool fields.

Example

Anthropic-specific fields like [cache_control](https://docs.langchain.com/oss/python/integrations/chat/anthropic#prompt-caching), [defer_loading](https://docs.langchain.com/oss/python/integrations/chat/anthropic#tool-search), or input_examples.

```python @tool(extras={“defer_loading”: True, “cache_control”: {“type”: “ephemeral”}}) def my_tool(x: str) -> str:

return x

```

param handle_tool_error: bool | str | Callable[[ToolException], str] | None = False#

Handle the content of the ToolException thrown.

param handle_validation_error: bool | str | Callable[[ValidationError | ValidationErrorV1], str] | None = False#

Handle the content of the ValidationError thrown.

param metadata: dict[str, Any] | None = None#

Optional metadata associated with the tool.

This metadata will be associated with each call to this tool, and passed as arguments to the handlers defined in callbacks.

You can use these to, e.g., identify a specific instance of a tool with its usecase.

param response_format: Literal['content', 'content_and_artifact'] = 'content'#

The tool response format.

If ‘content’ then the output of the tool is interpreted as the contents of a ToolMessage. If ‘content_and_artifact’ then the output is expected to be a two-tuple corresponding to the (content, artifact) of a ToolMessage.

param return_direct: bool = False#

Whether to return the tool’s output directly.

Setting this to True means that after the tool is called, the AgentExecutor will stop looping.

param tags: list[str] | None = None#

Optional list of tags associated with the tool.

These tags will be associated with each call to this tool, and passed as arguments to the handlers defined in callbacks.

You can use these to, e.g., identify a specific instance of a tool with its use case.

param verbose: bool = False#

Whether to log the tool’s progress.

async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output]#

Default implementation runs ainvoke in parallel using asyncio.gather.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

Parameters:
  • inputs (list[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | list[RunnableConfig] | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Returns:

A list of outputs from the Runnable.

Return type:

list[Output]

async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]]#

Run ainvoke in parallel on a list of inputs.

Yields results as they complete.

Parameters:
  • inputs (Sequence[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | Sequence[RunnableConfig] | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

A tuple of the index of the input and the output from the Runnable.

Return type:

AsyncIterator[tuple[int, Output | Exception]]

async ainvoke(input: str | dict | ToolCall, config: RunnableConfig | None = None, **kwargs: Any) Any#

Transform a single input into an output.

Parameters:
  • input (str | dict | ToolCall) – The input to the Runnable.

  • config (RunnableConfig | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • kwargs (Any)

Returns:

The output of the Runnable.

Return type:

Any

async arun(tool_input: str | dict, verbose: bool | None = None, start_color: str | None = 'green', color: str | None = 'green', callbacks: Callbacks = None, *, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, run_name: str | None = None, run_id: uuid.UUID | None = None, config: RunnableConfig | None = None, tool_call_id: str | None = None, **kwargs: Any) Any#

Run the tool asynchronously.

Parameters:
  • tool_input (str | dict) – The input to the tool.

  • verbose (bool | None) – Whether to log the tool’s progress.

  • start_color (str | None) – The color to use when starting the tool.

  • color (str | None) – The color to use when ending the tool.

  • callbacks (Callbacks) – Callbacks to be called during tool execution.

  • tags (list[str] | None) – Optional list of tags associated with the tool.

  • metadata (dict[str, Any] | None) – Optional metadata associated with the tool.

  • run_name (str | None) – The name of the run.

  • run_id (uuid.UUID | None) – The id of the run.

  • config (RunnableConfig | None) – The configuration for the tool.

  • tool_call_id (str | None) – The id of the tool call.

  • **kwargs (Any) – Keyword arguments to be passed to tool callbacks

Returns:

The output of the tool.

Raises:

ToolException – If an error occurs during tool execution.

Return type:

Any

async astream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output]#

Default implementation of astream, which calls ainvoke.

Subclasses must override this method if they support streaming output.

Parameters:
  • input (Input) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

The output of the Runnable.

Return type:

AsyncIterator[Output]

astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2', 'v3'] = 'v2', include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StreamEvent] | Awaitable[Any]#

Generate a stream of events.

Use to create an iterator over StreamEvent that provide real-time information about the progress of the Runnable, including StreamEvent from intermediate results.

A StreamEvent is a dictionary with the following schema:

  • event: Event names are of the format:

    on_[runnable_type]_(start|stream|end).

  • name: The name of the Runnable that generated the event.

  • run_id: Randomly generated ID associated with the given execution of the

    Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.

  • parent_ids: The IDs of the parent runnables that generated the event. The

    root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

  • tags: The tags of the Runnable that generated the event.

  • metadata: The metadata of the Runnable that generated the event.

  • data: The data associated with the event. The contents of this field

    depend on the type of event. See the table below for more details.

Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.

!!! note

This reference table is for the v2 version of the schema.

event | name | chunk | input | output |
———————- | ——————– | ———————————– | ————————————————- | ————————————————— |
on_chat_model_start | ‘[model name]’ | | {“messages”: [[SystemMessage, HumanMessage]]} | |
on_chat_model_stream | ‘[model name]’ | AIMessageChunk(content=”hello”) | | |
on_chat_model_end | ‘[model name]’ | | {“messages”: [[SystemMessage, HumanMessage]]} | AIMessageChunk(content=”hello world”) |
on_llm_start | ‘[model name]’ | | {‘input’: ‘hello’} | |
on_llm_stream | ‘[model name]’ | `’Hello’ ` | | |
on_llm_end | ‘[model name]’ | | ‘Hello human!’ | |
on_chain_start | ‘format_docs’ | | | |
on_chain_stream | ‘format_docs’ | ‘hello world!, goodbye world!’ | | |
on_chain_end | ‘format_docs’ | | [Document(…)] | ‘hello world!, goodbye world!’ |
on_tool_start | ‘some_tool’ | | {“x”: 1, “y”: “2”} | |
on_tool_end | ‘some_tool’ | | | {“x”: 1, “y”: “2”} |
on_retriever_start | ‘[retriever name]’ | | {“query”: “hello”} | |
on_retriever_end | ‘[retriever name]’ | | {“query”: “hello”} | [Document(…), ..] |
on_prompt_start | ‘[template_name]’ | | {“question”: “hello”} | |
on_prompt_end | ‘[template_name]’ | | {“question”: “hello”} | ChatPromptValue(messages: [SystemMessage, …]) |

In addition to the standard events, users can also dispatch custom events (see example below).

Custom events will be only be surfaced with in the v2 version of the API!

A custom event has following format:

Attribute | Type | Description |
———– | —— | ——————————————————————————————————— |
name | str | A user defined name for the event. |
data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. |

Here are declarations associated with the standard events shown above:

format_docs:

```python def format_docs(docs: list[Document]) -> str:

‘’’Format the docs.’’’ return “, “.join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs) ```

some_tool:

```python @tool def some_tool(x: int, y: str) -> dict:

‘’’Some_tool.’’’ return {“x”: x, “y”: y}

```

prompt:

```python template = ChatPromptTemplate.from_messages(

[

(“system”, “You are Cat Agent 007”), (“human”, “{question}”),

]

).with_config({“run_name”: “my_template”, “tags”: [“my_template”]}) ```

!!! example

```python from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:

return s[::-1]

chain = RunnableLambda(func=reverse)

events = [

event async for event in chain.astream_events(“hello”, version=”v2”)

]

# Will produce the following events # (run_id, and parent_ids has been omitted for brevity): [

{

“data”: {“input”: “hello”}, “event”: “on_chain_start”, “metadata”: {}, “name”: “reverse”, “tags”: [],

}, {

“data”: {“chunk”: “olleh”}, “event”: “on_chain_stream”, “metadata”: {}, “name”: “reverse”, “tags”: [],

}, {

“data”: {“output”: “olleh”}, “event”: “on_chain_end”, “metadata”: {}, “name”: “reverse”, “tags”: [],

},

```python title=”Dispatch custom event” from langchain_core.callbacks.manager import (

adispatch_custom_event,

) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio

async def slow_thing(some_input: str, config: RunnableConfig) -> str:

“””Do something that takes a long time.””” await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event(

“progress_event”, {“message”: “Finished step 1 of 3”}, config=config # Must be included for python < 3.10

) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event(

“progress_event”, {“message”: “Finished step 2 of 3”}, config=config # Must be included for python < 3.10

) await asyncio.sleep(1) # Placeholder for some slow operation return “Done”

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events(“some_input”, version=”v2”):

print(event)

```

Parameters:
  • input (Any) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • version (Literal['v1', 'v2', 'v3']) –

    The version of the schema to use. One of ‘v1’, ‘v2’, or ‘v3’.

    Most callers should use ‘v2’ (the default), which yields StreamEvent dicts and supports custom events.

    ’v3’ selects the typed, content-block-centric streaming protocol and is only supported on Runnable subclasses that implement it (currently BaseChatModel and langgraph.CompiledGraph); on a generic Runnable it raises NotImplementedError. The ‘v3’ API is in beta and may change. See the subclass override (e.g. BaseChatModel.astream_events) for the v3 return shape.

    ’v1’ is retained for backwards compatibility and will be deprecated in 0.4.0. Custom events are only surfaced in ‘v2’ / ‘v3’.

  • include_names (Sequence[str] | None) – Only include events from Runnable objects with matching names.

  • include_types (Sequence[str] | None) – Only include events from Runnable objects with matching types.

  • include_tags (Sequence[str] | None) – Only include events from Runnable objects with matching tags.

  • exclude_names (Sequence[str] | None) – Exclude events from Runnable objects with matching names.

  • exclude_types (Sequence[str] | None) – Exclude events from Runnable objects with matching types.

  • exclude_tags (Sequence[str] | None) – Exclude events from Runnable objects with matching tags.

  • **kwargs (Any) – Additional keyword arguments to pass to the Runnable.

Yields:

An async stream of StreamEvent.

Raises:

NotImplementedError – If the version is not ‘v1’, ‘v2’, or ‘v3’, or if ‘v3’ is requested on a Runnable that does not implement the v3 streaming protocol.

Return type:

AsyncIterator[StreamEvent] | Awaitable[Any]

batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output]#

Default implementation runs invoke in parallel using a thread pool executor.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

Parameters:
  • inputs (list[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | list[RunnableConfig] | None) –

    A config to use when invoking the Runnable. The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Returns:

A list of outputs from the Runnable.

Return type:

list[Output]

batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) Iterator[tuple[int, Output | Exception]]#

Run invoke in parallel on a list of inputs.

Yields results as they complete.

Parameters:
  • inputs (Sequence[Input]) – A list of inputs to the Runnable.

  • config (RunnableConfig | Sequence[RunnableConfig] | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • return_exceptions (bool) – Whether to return exceptions instead of raising them.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

Tuples of the index of the input and the output from the Runnable.

Return type:

Iterator[tuple[int, Output | Exception]]

bind(**kwargs: Any) Runnable[Input, Output]#

Bind arguments to a Runnable, returning a new Runnable.

Useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.

Parameters:

**kwargs (Any) – The arguments to bind to the Runnable.

Returns:

A new Runnable with the arguments bound.

Return type:

Runnable[Input, Output]

Example

```python from langchain_ollama import ChatOllama from langchain_core.output_parsers import StrOutputParser

model = ChatOllama(model=”llama3.1”)

# Without bind chain = model | StrOutputParser()

chain.invoke(“Repeat quoted words exactly: ‘One two three four five.’”) # Output is ‘One two three four five.’

# With bind chain = model.bind(stop=[“three”]) | StrOutputParser()

chain.invoke(“Repeat quoted words exactly: ‘One two three four five.’”) # Output is ‘One two’ ```

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) RunnableSerializable#

Configure alternatives for Runnable objects that can be set at runtime.

Parameters:
  • which (ConfigurableField) – The ConfigurableField instance that will be used to select the alternative.

  • default_key (str) – The default key to use if no alternative is selected.

  • prefix_keys (bool) – Whether to prefix the keys with the ConfigurableField id.

  • **kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – A dictionary of keys to Runnable instances or callables that return Runnable instances.

Returns:

A new Runnable with the alternatives configured.

Return type:

RunnableSerializable

!!! example

```python from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI

model = ChatAnthropic(

model_name=”claude-sonnet-4-5-20250929”

).configurable_alternatives(

ConfigurableField(id=”llm”), default_key=”anthropic”, openai=ChatOpenAI(),

)

# uses the default model ChatAnthropic print(model.invoke(“which organization created you?”).content)

# uses ChatOpenAI print(

model.with_config(configurable={“llm”: “openai”}) .invoke(“which organization created you?”) .content

configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) RunnableSerializable#

Configure particular Runnable fields at runtime.

Parameters:

**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – A dictionary of ConfigurableField instances to configure.

Raises:

ValueError – If a configuration key is not found in the Runnable.

Returns:

A new Runnable with the fields configured.

Return type:

RunnableSerializable

!!! example

```python from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(

id=”output_token_number”, name=”Max tokens in the output”, description=”The maximum number of tokens in the output”,

)

)

# max_tokens = 20 print(

“max_tokens_20: “, model.invoke(“tell me something about chess”).content

)

# max_tokens = 200 print(

“max_tokens_200: “, model.with_config(configurable={“output_token_number”: 200}) .invoke(“tell me something about chess”) .content,

invoke(input: str | dict | ToolCall, config: RunnableConfig | None = None, **kwargs: Any) Any#

Transform a single input into an output.

Parameters:
  • input (str | dict | ToolCall) – The input to the Runnable.

  • config (RunnableConfig | None) –

    A config to use when invoking the Runnable.

    The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys.

    Please refer to RunnableConfig for more details.

  • kwargs (Any)

Returns:

The output of the Runnable.

Return type:

Any

run(tool_input: str | dict[str, Any], verbose: bool | None = None, start_color: str | None = 'green', color: str | None = 'green', callbacks: Callbacks = None, *, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, run_name: str | None = None, run_id: uuid.UUID | None = None, config: RunnableConfig | None = None, tool_call_id: str | None = None, **kwargs: Any) Any#

Run the tool.

Parameters:
  • tool_input (str | dict[str, Any]) – The input to the tool.

  • verbose (bool | None) – Whether to log the tool’s progress.

  • start_color (str | None) – The color to use when starting the tool.

  • color (str | None) – The color to use when ending the tool.

  • callbacks (Callbacks) – Callbacks to be called during tool execution.

  • tags (list[str] | None) – Optional list of tags associated with the tool.

  • metadata (dict[str, Any] | None) – Optional metadata associated with the tool.

  • run_name (str | None) – The name of the run.

  • run_id (uuid.UUID | None) – The id of the run.

  • config (RunnableConfig | None) – The configuration for the tool.

  • tool_call_id (str | None) – The id of the tool call.

  • **kwargs (Any) – Keyword arguments to be passed to tool callbacks (event handler)

Returns:

The output of the tool.

Raises:

ToolException – If an error occurs during tool execution.

Return type:

Any

stream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output]#

Default implementation of stream, which calls invoke.

Subclasses must override this method if they support streaming output.

Parameters:
  • input (Input) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • **kwargs (Any | None) – Additional keyword arguments to pass to the Runnable.

Yields:

The output of the Runnable.

Return type:

Iterator[Output]

stream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2', 'v3'] = 'v2', include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) Iterator[StreamEvent] | Iterator[Any]#

Generate a stream of events synchronously.

Synchronous counterpart to astream_events. For version=’v3’, subclasses that implement the v3 streaming protocol (BaseChatModel, CompiledGraph) override this method. All other versions and base-class calls raise NotImplementedError.

Parameters:
  • input (Any) – The input to the Runnable.

  • config (RunnableConfig | None) – The config to use for the Runnable.

  • version (Literal['v1', 'v2', 'v3']) – The version of the schema to use. ‘v3’ requires a subclass that implements the v3 streaming protocol. ‘v1’ and ‘v2’ are not supported on the sync path.

  • include_names (Sequence[str] | None) – Only include events from Runnable objects with matching names.

  • include_types (Sequence[str] | None) – Only include events from Runnable objects with matching types.

  • include_tags (Sequence[str] | None) – Only include events from Runnable objects with matching tags.

  • exclude_names (Sequence[str] | None) – Exclude events from Runnable objects with matching names.

  • exclude_types (Sequence[str] | None) – Exclude events from Runnable objects with matching types.

  • exclude_tags (Sequence[str] | None) – Exclude events from Runnable objects with matching tags.

  • **kwargs (Any) – Additional keyword arguments to pass to the Runnable.

Raises:

NotImplementedError – Always. Subclasses override this method for supported versions.

Return type:

Iterator[StreamEvent] | Iterator[Any]

with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output]#

Bind async lifecycle listeners to a Runnable.

Returns a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

Parameters:
  • on_start (AsyncListener | None) – Called asynchronously before the Runnable starts running, with the Run object.

  • on_end (AsyncListener | None) – Called asynchronously after the Runnable finishes running, with the Run object.

  • on_error (AsyncListener | None) – Called asynchronously if the Runnable throws an error, with the Run object.

Returns:

A new Runnable with the listeners bound.

Return type:

Runnable[Input, Output]

Example

```python from langchain_core.runnables import RunnableLambda, Runnable from datetime import datetime, timezone import time import asyncio

def format_t(timestamp: float) -> str:

return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()

async def test_runnable(time_to_sleep: int):

print(f”Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}”) await asyncio.sleep(time_to_sleep) print(f”Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}”)

async def fn_start(run_obj: Runnable):

print(f”on start callback starts at {format_t(time.time())}”) await asyncio.sleep(3) print(f”on start callback ends at {format_t(time.time())}”)

async def fn_end(run_obj: Runnable):

print(f”on end callback starts at {format_t(time.time())}”) await asyncio.sleep(2) print(f”on end callback ends at {format_t(time.time())}”)

runnable = RunnableLambda(test_runnable).with_alisteners(

on_start=fn_start, on_end=fn_end

)

async def concurrent_runs():

await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs()) # Result: # on start callback starts at 2025-03-01T07:05:22.875378+00:00 # on start callback starts at 2025-03-01T07:05:22.875495+00:00 # on start callback ends at 2025-03-01T07:05:25.878862+00:00 # on start callback ends at 2025-03-01T07:05:25.878947+00:00 # Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00 # Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00 # Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00 # on end callback starts at 2025-03-01T07:05:27.882360+00:00 # Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00 # on end callback starts at 2025-03-01T07:05:28.882428+00:00 # on end callback ends at 2025-03-01T07:05:29.883893+00:00 # on end callback ends at 2025-03-01T07:05:30.884831+00:00 ```

with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output]#

Bind config to a Runnable, returning a new Runnable.

Parameters:
  • config (RunnableConfig | None) – The config to bind to the Runnable.

  • **kwargs (Any) – Additional keyword arguments to pass to the Runnable.

Returns:

A new Runnable with the config bound.

Return type:

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: str | None = None) RunnableWithFallbacksT[Input, Output]#

Add fallbacks to a Runnable, returning a new Runnable.

The new Runnable will try the original Runnable, and then each fallback in order, upon failures.

Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original Runnable fails.

  • exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle.

  • exception_key (str | None) –

    If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key.

    If None, exceptions will not be passed to fallbacks.

    If used, the base Runnable and its fallbacks must accept a dictionary as input.

Returns:

A new Runnable that will try the original Runnable, and then each

Fallback in order, upon failures.

Return type:

RunnableWithFallbacksT[Input, Output]

Example

```python from typing import Iterator

from langchain_core.runnables import RunnableGenerator

def _generate_immediate_error(input: Iterator) -> Iterator[str]:

raise ValueError() yield “”

def _generate(input: Iterator) -> Iterator[str]:

yield from “foo bar”

runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(

[RunnableGenerator(_generate)]

) print(“”.join(runnable.stream({}))) # foo bar ```

Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original Runnable fails.

  • exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle.

  • exception_key (str | None) –

    If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key.

    If None, exceptions will not be passed to fallbacks.

    If used, the base Runnable and its fallbacks must accept a dictionary as input.

Returns:

A new Runnable that will try the original Runnable, and then each

Fallback in order, upon failures.

Return type:

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output]#

Bind lifecycle listeners to a Runnable, returning a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

Parameters:
  • on_start (Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None) – Called before the Runnable starts running, with the Run object.

  • on_end (Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None) – Called after the Runnable finishes running, with the Run object.

  • on_error (Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None) – Called if the Runnable throws an error, with the Run object.

Returns:

A new Runnable with the listeners bound.

Return type:

Runnable[Input, Output]

Example

```python from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep: int):

time.sleep(time_to_sleep)

def fn_start(run_obj: Run):

print(“start_time:”, run_obj.start_time)

def fn_end(run_obj: Run):

print(“end_time:”, run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(

on_start=fn_start, on_end=fn_end

) chain.invoke(2) ```

with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, exponential_jitter_params: ExponentialJitterParams | None = None, stop_after_attempt: int = 3) Runnable[Input, Output]#

Create a new Runnable that retries the original Runnable on exceptions.

Parameters:
  • retry_if_exception_type (tuple[type[BaseException], ...]) – A tuple of exception types to retry on.

  • wait_exponential_jitter (bool) – Whether to add jitter to the wait time between retries.

  • stop_after_attempt (int) – The maximum number of attempts to make before giving up.

  • exponential_jitter_params (ExponentialJitterParams | None) – Parameters for tenacity.wait_exponential_jitter. Namely: initial, max, exp_base, and jitter (all float values).

Returns:

A new Runnable that retries the original Runnable on exceptions.

Return type:

Runnable[Input, Output]

Example

```python from langchain_core.runnables import RunnableLambda

count = 0

def _lambda(x: int) -> None:

global count count = count + 1 if x == 1:

raise ValueError(“x is 1”)

else:

pass

runnable = RunnableLambda(_lambda) try:

runnable.with_retry(

stop_after_attempt=2, retry_if_exception_type=(ValueError,),

).invoke(1)

except ValueError:

pass

assert count == 2 ```

with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output]#

Bind input and output types to a Runnable, returning a new Runnable.

Parameters:
  • input_type (type[Input] | None) – The input type to bind to the Runnable.

  • output_type (type[Output] | None) – The output type to bind to the Runnable.

Returns:

A new Runnable with the types bound.

Return type:

Runnable[Input, Output]

property args: dict#

Get the tool’s input arguments schema.

Returns:

dict containing the tool’s argument properties.

property is_single_input: bool#

Check if the tool accepts only a single input argument.

Returns:

True if the tool has only one input argument, False otherwise.

property tool_call_schema: type[BaseModel] | dict[str, Any]#

Get the schema for tool calls, excluding injected arguments.

Returns:

The schema that should be used for tool calls from language models.