跳转至

agent

feishu.agent

LlmBackend

Bases: Protocol

大模型后端协议,是自定义模型后端的扩展契约。

实现该协议即可接入 feishu.agent.loop.Agent;内置实现见 feishu.agent.adapters.anthropic.AnthropicBackendfeishu.agent.adapters.openai.OpenAIBackendstream 须返回逐个产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器。该协议标注了 runtime_checkable,可用 isinstance 校验实现是否符合契约。

示例:

Python Console Session
1
2
3
4
5
6
7
>>> class EchoBackend:
...     def stream(self, *, messages, tools=(), system=None, **kwargs):
...         async def gen():
...             yield TextDelta(text="hi")
...         return gen()
>>> isinstance(EchoBackend(), LlmBackend)
True
源代码位于: feishu/agent/llm.py
Python
@runtime_checkable
class LlmBackend(Protocol):
    r"""
    大模型后端协议,是自定义模型后端的扩展契约。

    实现该协议即可接入 [feishu.agent.loop.Agent][];内置实现见 [feishu.agent.adapters.anthropic.AnthropicBackend][]
    与 [feishu.agent.adapters.openai.OpenAIBackend][]。`stream` 须返回逐个产出 [feishu.agent.llm.StreamChunk][]
    的异步迭代器。该协议标注了 `runtime_checkable`,可用 `isinstance` 校验实现是否符合契约。

    Examples:
        >>> class EchoBackend:
        ...     def stream(self, *, messages, tools=(), system=None, **kwargs):
        ...         async def gen():
        ...             yield TextDelta(text="hi")
        ...         return gen()
        >>> isinstance(EchoBackend(), LlmBackend)
        True
    """

    def stream(
        self,
        *,
        messages: Sequence[Message],
        tools: Sequence[ToolSpec] = (),
        system: str | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[StreamChunk]:
        r"""
        以流式方式生成一轮模型响应。

        Args:
            messages: 截至当前轮次的对话历史。
            tools: 本轮可供模型调用的工具声明。
            system: 系统提示词。
            **kwargs: 透传给底层大模型 API 的额外参数。

        Returns:
            逐个产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器。
        """
        ...

stream

Python
stream(*, messages: Sequence[Message], tools: Sequence[ToolSpec] = (), system: str | None = None, **kwargs: Any) -> AsyncIterator[StreamChunk]

以流式方式生成一轮模型响应。

参数:

名称 类型 描述 默认
messages
Sequence[Message]

截至当前轮次的对话历史。

必需
tools
Sequence[ToolSpec]

本轮可供模型调用的工具声明。

()
system
str | None

系统提示词。

None
**kwargs
Any

透传给底层大模型 API 的额外参数。

{}

返回:

类型 描述
AsyncIterator[StreamChunk]

逐个产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器。

源代码位于: feishu/agent/llm.py
Python
def stream(
    self,
    *,
    messages: Sequence[Message],
    tools: Sequence[ToolSpec] = (),
    system: str | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamChunk]:
    r"""
    以流式方式生成一轮模型响应。

    Args:
        messages: 截至当前轮次的对话历史。
        tools: 本轮可供模型调用的工具声明。
        system: 系统提示词。
        **kwargs: 透传给底层大模型 API 的额外参数。

    Returns:
        逐个产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器。
    """
    ...

Message dataclass

一条对话消息,由角色与若干内容块组成。

roleuserassistanttool 之一;content 是 [feishu.agent.llm.ContentPart][] 列表, 适配器会将其翻译为各家大模型 API 的消息格式。

示例:

Python Console Session
1
2
3
4
5
>>> msg = Message(role="user", content=[TextPart(text="你好")])
>>> msg.role
'user'
>>> msg.content[0].text
'你好'
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class Message:
    r"""
    一条对话消息,由角色与若干内容块组成。

    `role` 取 `user`、`assistant` 或 `tool` 之一;`content` 是 [feishu.agent.llm.ContentPart][] 列表,
    适配器会将其翻译为各家大模型 API 的消息格式。

    Examples:
        >>> msg = Message(role="user", content=[TextPart(text="你好")])
        >>> msg.role
        'user'
        >>> msg.content[0].text
        '你好'
    """

    role: Role
    content: list[ContentPart]

MessageStop dataclass

流式响应的终止信号,携带停止原因与可选的用量统计。

示例:

Python Console Session
1
2
3
4
5
>>> stop = MessageStop(stop_reason=StopReason.END_TURN, usage={"output_tokens": 12})
>>> stop.stop_reason
<StopReason.END_TURN: 'end_turn'>
>>> stop.usage
{'output_tokens': 12}
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class MessageStop:
    r"""
    流式响应的终止信号,携带停止原因与可选的用量统计。

    Examples:
        >>> stop = MessageStop(stop_reason=StopReason.END_TURN, usage={"output_tokens": 12})
        >>> stop.stop_reason
        <StopReason.END_TURN: 'end_turn'>
        >>> stop.usage
        {'output_tokens': 12}
    """

    stop_reason: StopReason
    usage: dict[str, int] | None = None

StopReason

Bases: str, Enum

归一化后的模型停止原因。

各适配器会将厂商返回的原始停止原因映射到这些枚举值。由于继承自 str,枚举成员可直接与对应的 字符串字面量比较,便于序列化与传输。

示例:

Python Console Session
1
2
3
4
>>> StopReason.TOOL_USE == "tool_use"
True
>>> StopReason("end_turn") is StopReason.END_TURN
True
源代码位于: feishu/agent/llm.py
Python
class StopReason(str, Enum):
    r"""
    归一化后的模型停止原因。

    各适配器会将厂商返回的原始停止原因映射到这些枚举值。由于继承自 `str`,枚举成员可直接与对应的
    字符串字面量比较,便于序列化与传输。

    Examples:
        >>> StopReason.TOOL_USE == "tool_use"
        True
        >>> StopReason("end_turn") is StopReason.END_TURN
        True
    """

    END_TURN = "end_turn"
    TOOL_USE = "tool_use"
    MAX_TOKENS = "max_tokens"
    REFUSAL = "refusal"
    OTHER = "other"

TextDelta dataclass

流式响应中的一个文本增量片段。

示例:

Python Console Session
>>> TextDelta(text="你").text
'你'
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class TextDelta:
    r"""
    流式响应中的一个文本增量片段。

    Examples:
        >>> TextDelta(text="你").text
        '你'
    """

    text: str

TextPart dataclass

消息中的一段纯文本内容。

示例:

Python Console Session
>>> TextPart(text="你好").text
'你好'
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class TextPart:
    r"""
    消息中的一段纯文本内容。

    Examples:
        >>> TextPart(text="你好").text
        '你好'
    """

    text: str

ToolCall dataclass

由流式片段归并而成的完整工具调用。

feishu.agent.llm.ToolUsePart 不同,此处的 arguments 为完整的 JSON 字符串,由 feishu.agent.loop.Agent 在分发前 json.loads() 解析。

示例:

Python Console Session
1
2
3
>>> call = ToolCall(id="c1", name="weather", arguments='{"city":"上海"}')
>>> call.arguments
'{"city":"上海"}'
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class ToolCall:
    r"""
    由流式片段归并而成的完整工具调用。

    与 [feishu.agent.llm.ToolUsePart][] 不同,此处的 `arguments` 为完整的 JSON 字符串,由
    [feishu.agent.loop.Agent][] 在分发前 `json.loads()` 解析。

    Examples:
        >>> call = ToolCall(id="c1", name="weather", arguments='{"city":"上海"}')
        >>> call.arguments
        '{"city":"上海"}'
    """

    id: str
    name: str
    arguments: str  # complete JSON string; the loop json.loads() it

ToolCallDelta dataclass

流式响应中的一个工具调用增量片段。

同一次工具调用的多个片段共享相同的 indexidname 通常仅在首个片段出现,而 arguments 会逐段累积成完整的参数 JSON 字符串。feishu.agent.loop.accumulate_stream 负责按 index 归并这些片段。

示例:

Python Console Session
1
2
3
4
>>> head = ToolCallDelta(index=0, id="c1", name="weather", arguments='{"ci')
>>> tail = ToolCallDelta(index=0, arguments='ty":"上海"}')
>>> head.id, tail.arguments
('c1', 'ty":"上海"}')
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class ToolCallDelta:
    r"""
    流式响应中的一个工具调用增量片段。

    同一次工具调用的多个片段共享相同的 `index`;`id` 与 `name` 通常仅在首个片段出现,而 `arguments`
    会逐段累积成完整的参数 JSON 字符串。[feishu.agent.loop.accumulate_stream][] 负责按 `index` 归并这些片段。

    Examples:
        >>> head = ToolCallDelta(index=0, id="c1", name="weather", arguments='{"ci')
        >>> tail = ToolCallDelta(index=0, arguments='ty":"上海"}')
        >>> head.id, tail.arguments
        ('c1', 'ty":"上海"}')
    """

    index: int
    id: str | None = None
    name: str | None = None
    arguments: str = ""

ToolResultPart dataclass

工具执行结果,作为工具消息(role="tool")的内容块回传给模型。

tool_call_id 须与触发执行的 feishu.agent.llm.ToolUsePartid 对应。当工具执行失败时, 将 is_error 置为 True,模型即可据此调整后续行为。

示例:

Python Console Session
1
2
3
4
5
6
>>> ok = ToolResultPart(tool_call_id="call_1", content="晴")
>>> ok.is_error
False
>>> err = ToolResultPart(tool_call_id="call_1", content="超时", is_error=True)
>>> err.is_error
True
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class ToolResultPart:
    r"""
    工具执行结果,作为工具消息(`role="tool"`)的内容块回传给模型。

    `tool_call_id` 须与触发执行的 [feishu.agent.llm.ToolUsePart][] 的 `id` 对应。当工具执行失败时,
    将 `is_error` 置为 `True`,模型即可据此调整后续行为。

    Examples:
        >>> ok = ToolResultPart(tool_call_id="call_1", content="晴")
        >>> ok.is_error
        False
        >>> err = ToolResultPart(tool_call_id="call_1", content="超时", is_error=True)
        >>> err.is_error
        True
    """

    tool_call_id: str
    content: str
    is_error: bool = False

ToolSpec dataclass

工具的与厂商无关的声明,供模型据此决定是否调用。

input_schema 为描述参数的 JSON Schema。适配器会将其翻译为各家大模型所需的工具格式 (Anthropic 的 input_schema、OpenAI 的 function.parameters)。

示例:

Python Console Session
1
2
3
4
5
6
7
>>> spec = ToolSpec(
...     name="weather",
...     description="查询天气",
...     input_schema={"type": "object", "properties": {"city": {"type": "string"}}},
... )
>>> spec.name
'weather'
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class ToolSpec:
    r"""
    工具的与厂商无关的声明,供模型据此决定是否调用。

    `input_schema` 为描述参数的 JSON Schema。适配器会将其翻译为各家大模型所需的工具格式
    (Anthropic 的 `input_schema`、OpenAI 的 `function.parameters`)。

    Examples:
        >>> spec = ToolSpec(
        ...     name="weather",
        ...     description="查询天气",
        ...     input_schema={"type": "object", "properties": {"city": {"type": "string"}}},
        ... )
        >>> spec.name
        'weather'
    """

    name: str
    description: str
    input_schema: dict[str, Any]

ToolUsePart dataclass

模型发起的一次工具调用,作为助手消息的一个内容块。

arguments 为已解析的参数字典;与 feishu.agent.llm.ToolCall 不同,此处的参数已经是 dict, 而非待解析的 JSON 字符串。

示例:

Python Console Session
1
2
3
4
5
>>> part = ToolUsePart(id="call_1", name="weather", arguments={"city": "上海"})
>>> part.name
'weather'
>>> part.arguments
{'city': '上海'}
源代码位于: feishu/agent/llm.py
Python
@dataclass(slots=True)
class ToolUsePart:
    r"""
    模型发起的一次工具调用,作为助手消息的一个内容块。

    `arguments` 为已解析的参数字典;与 [feishu.agent.llm.ToolCall][] 不同,此处的参数已经是 `dict`,
    而非待解析的 JSON 字符串。

    Examples:
        >>> part = ToolUsePart(id="call_1", name="weather", arguments={"city": "上海"})
        >>> part.name
        'weather'
        >>> part.arguments
        {'city': '上海'}
    """

    id: str
    name: str
    arguments: dict[str, Any]

Agent

智能体主循环:驱动大模型与工具协作,自动回复飞书消息。

每收到一条消息,便载入会话历史、调用 feishu.agent.llm.LlmBackend 流式生成响应,并由 feishu.agent.loop.accumulate_stream 归并结果。若模型请求调用工具,则经 feishu.agent.tools.ToolRegistry 分发执行,并将结果回传后继续下一轮,直至产出最终文本或触及 max_iterations 上限。需要审批的工具会先发送审批卡片并挂起本轮,待用户在卡片上批准或拒绝后由 feishu.agent.loop.Agent.handle_card_action 恢复。

feishu.agent.dispatch.register_agent 注册到事件分发器后,即可自动处理消息与卡片回调事件。

参数:

名称 类型 描述 默认

backend

LlmBackend

大模型后端,须实现 feishu.agent.llm.LlmBackend

必需

registry

ToolRegistry

工具注册表 feishu.agent.tools.ToolRegistry

必需

store

SessionStore | None

会话历史存储。默认使用 feishu.agent.session.InMemorySessionStore

None

client

FeishuClient | None

飞书客户端,用于回复消息与发送卡片;为 None 时跳过发送。

None

approvals

PendingApprovalStore | None

挂起审批存储。默认使用 feishu.agent.session.InMemoryPendingApprovalStore

None

max_iterations

int

单轮对话中模型与工具往返的最大次数。默认为 8

8

system

str | None

系统提示词。

None

stream

bool

是否以流式卡片回复。为 True 时经 client.stream_card 输出,否则调用 client.im.reply

False

**backend_kwargs

Any

透传给 feishu.agent.llm.LlmBackend.stream 的额外参数。

{}

引发:

类型 描述
ValueError

max_iterations 小于 1 时抛出。

飞书文档

接收消息

卡片回传交互

示例:

Python Console Session
1
2
3
4
5
6
7
8
9
>>> from feishu.agent import Agent, ToolRegistry
>>> from feishu.agent.adapters.anthropic import AnthropicBackend
>>> agent = Agent(
...     backend=AnthropicBackend(model="claude-sonnet-4-5"),
...     registry=ToolRegistry(),
...     client=client,
...     system="你是一个乐于助人的助手。",
... )
>>> register_agent(dispatcher, agent)
源代码位于: feishu/agent/loop.py
Python
class Agent:
    r"""
    智能体主循环:驱动大模型与工具协作,自动回复飞书消息。

    每收到一条消息,便载入会话历史、调用 [feishu.agent.llm.LlmBackend][] 流式生成响应,并由
    [feishu.agent.loop.accumulate_stream][] 归并结果。若模型请求调用工具,则经
    [feishu.agent.tools.ToolRegistry][] 分发执行,并将结果回传后继续下一轮,直至产出最终文本或触及
    `max_iterations` 上限。需要审批的工具会先发送审批卡片并挂起本轮,待用户在卡片上批准或拒绝后由
    [feishu.agent.loop.Agent.handle_card_action][] 恢复。

    经 [feishu.agent.dispatch.register_agent][] 注册到事件分发器后,即可自动处理消息与卡片回调事件。

    Args:
        backend: 大模型后端,须实现 [feishu.agent.llm.LlmBackend][]。
        registry: 工具注册表 [feishu.agent.tools.ToolRegistry][]。
        store: 会话历史存储。默认使用 [feishu.agent.session.InMemorySessionStore][]。
        client: 飞书客户端,用于回复消息与发送卡片;为 `None` 时跳过发送。
        approvals: 挂起审批存储。默认使用 [feishu.agent.session.InMemoryPendingApprovalStore][]。
        max_iterations: 单轮对话中模型与工具往返的最大次数。默认为 `8`。
        system: 系统提示词。
        stream: 是否以流式卡片回复。为 `True` 时经 `client.stream_card` 输出,否则调用 `client.im.reply`。
        **backend_kwargs: 透传给 [feishu.agent.llm.LlmBackend.stream][] 的额外参数。

    Raises:
        ValueError: `max_iterations` 小于 `1` 时抛出。

    飞书文档:
        [接收消息](https://open.feishu.cn/document/server-docs/im-v1/message/events/receive)

        [卡片回传交互](https://open.feishu.cn/document/uAjLw4CM/ukzMukzMukzM/feishu-cards/card-callback-communication)

    Examples:
        >>> from feishu.agent import Agent, ToolRegistry  # doctest:+SKIP
        >>> from feishu.agent.adapters.anthropic import AnthropicBackend  # doctest:+SKIP
        >>> agent = Agent(  # doctest:+SKIP
        ...     backend=AnthropicBackend(model="claude-sonnet-4-5"),
        ...     registry=ToolRegistry(),
        ...     client=client,
        ...     system="你是一个乐于助人的助手。",
        ... )
        >>> register_agent(dispatcher, agent)  # doctest:+SKIP
    """

    def __init__(
        self,
        *,
        backend: LlmBackend,
        registry: ToolRegistry,
        store: SessionStore | None = None,
        client: FeishuClient | None = None,
        approvals: PendingApprovalStore | None = None,
        max_iterations: int = 8,
        system: str | None = None,
        stream: bool = False,
        **backend_kwargs: Any,
    ) -> None:
        if max_iterations < 1:
            raise ValueError(f"max_iterations must be >= 1, got {max_iterations}")
        self.backend = backend
        self.registry = registry
        self.store: SessionStore = store or InMemorySessionStore()
        self.client = client
        self.approvals: PendingApprovalStore = approvals or InMemoryPendingApprovalStore()
        self.max_iterations = max_iterations
        self.system = system
        self.stream = stream
        self.backend_kwargs = backend_kwargs

    async def run(self, event: Event) -> None:
        r"""
        处理一条飞书消息事件:载入历史、追加用户消息并驱动主循环。

        通常无需直接调用,而是经 [feishu.agent.dispatch.register_agent][] 注册为消息事件的处理函数。

        Args:
            event: 飞书消息事件,须具备 `.body` 属性。

        飞书文档:
            [接收消息](https://open.feishu.cn/document/server-docs/im-v1/message/events/receive)

        Examples:
            >>> await agent.run(event)  # doctest:+SKIP
        """
        session_id = session_id_for(event)
        history = await self.store.get(session_id)
        history.append(user_message_from_event(event))
        await self.store.set(session_id, history)
        await self._loop(event, session_id, history)

    async def _loop(self, event: Event, session_id: str, history: list[Message]) -> None:
        result = None
        for _ in range(self.max_iterations):
            result = await accumulate_stream(
                self.backend.stream(
                    messages=history,
                    tools=self.registry.specs(),
                    system=self.system,
                    **self.backend_kwargs,
                )
            )
            if result.tool_calls and result.stop_reason == StopReason.TOOL_USE:
                assistant = self._assistant_tool_message(result)
                history.append(assistant)
                await self.store.append(session_id, assistant)
                suspended = await self._dispatch_tool_calls(event, session_id, history, result.tool_calls)
                if suspended:
                    return  # approval seam ended the turn
                continue
            assistant = Message(role="assistant", content=[TextPart(text=result.text)])
            history.append(assistant)
            await self.store.append(session_id, assistant)
            await self._finalize(event, result.text)
            return
        # Loop exhausted max_iterations without a final text turn — send a fallback reply.
        logging.getLogger("feishu").warning(
            "Agent loop reached max_iterations=%s without completing the request; sending fallback reply.",
            self.max_iterations,
        )
        fallback = (
            result.text
            if result and result.text
            else "[Reached the maximum number of steps without completing the request.]"
        )
        await self._finalize(event, fallback)

    def _assistant_tool_message(self, result: StreamResult) -> Message:
        content: list = []
        if result.text:
            content.append(TextPart(text=result.text))
        for call in result.tool_calls:
            content.append(ToolUsePart(id=call.id, name=call.name, arguments=_loads(call.arguments)))
        return Message(role="assistant", content=content)

    async def _dispatch_tool_calls(
        self, event: Event, session_id: str, history: list[Message], tool_calls: list[ToolCall]
    ) -> bool:
        for call in tool_calls:
            tool = self.registry.get(call.name)
            if tool.requires_approval:
                await self._request_approval(event, session_id, call)
                return True  # suspend the turn
            result = await self.registry.dispatch(call.name, _loads(call.arguments))
            tool_msg = Message(role="tool", content=[ToolResultPart(tool_call_id=call.id, content=_stringify(result))])
            history.append(tool_msg)
            await self.store.append(session_id, tool_msg)
        return False

    async def _request_approval(self, event: Event, session_id: str, call: ToolCall) -> None:
        approval_id = uuid4().hex
        await self.approvals.put(
            PendingApproval(
                approval_id=approval_id,
                session_id=session_id,
                tool_call_id=call.id,
                tool_name=call.name,
                arguments=_loads(call.arguments),
            )
        )
        message = event.body.get("message") or {}
        chat_id = message.get("chat_id")
        card = self._approval_card(call.name, _loads(call.arguments), approval_id)
        if self.client is not None and chat_id:
            await self.client.im.send(chat_id, card, msg_type="interactive", receive_id_type="chat_id")

    async def handle_card_action(self, event: Event) -> dict[str, Any]:
        r"""
        处理审批卡片的回传交互,恢复或终止此前挂起的对话。

        从卡片回传值中读取 `__approval__` 与 `decision`。决策无效时不消费挂起审批,用户可重试;批准则执行
        对应工具并恢复主循环,拒绝则向模型回传一条错误工具结果再恢复。无论恢复过程是否抛错,都会同步返回
        包含 `toast` 与更新后 `card` 的飞书响应。

        通常无需直接调用,而是经 [feishu.agent.dispatch.register_agent][] 注册为卡片回调事件的处理函数。

        Args:
            event: 飞书卡片回调事件,须具备 `.body` 属性。

        Returns:
            供飞书更新卡片的同步响应字典,含 `toast`(及在处理审批时的更新后 `card`)。

        飞书文档:
            [卡片回传交互](https://open.feishu.cn/document/uAjLw4CM/ukzMukzMukzM/feishu-cards/card-callback-communication)

        Examples:
            >>> await agent.handle_card_action(event)  # doctest:+SKIP
            {'toast': {'type': 'success', 'content': 'Approved'}, 'card': {...}}
        """
        value = _action_value(event)
        approval_id = value.get("__approval__")
        if not approval_id:
            return {"toast": {"type": "info", "content": "no pending approval"}}
        # Validate the decision BEFORE consuming the approval from the store.
        # A bogus/unrecognised decision must not destroy the PendingApproval so
        # the user can retry with a valid decision.
        decision = value.get("decision")
        if decision not in ("approve", "reject"):
            return {"toast": {"type": "info", "content": "invalid decision"}}
        approval = await self.approvals.pop(approval_id)
        if approval is None:
            return {"toast": {"type": "info", "content": "no pending approval"}}
        history = await self.store.get(approval.session_id)
        decided_card = self._decided_card(approval.tool_name, decision)
        toast_content = "Approved" if decision == "approve" else "Rejected"
        toast_type = "success" if decision == "approve" else "info"
        try:
            if decision == "approve":
                result = await self.registry.dispatch(approval.tool_name, approval.arguments)
                tool_msg = Message(
                    role="tool",
                    content=[ToolResultPart(tool_call_id=approval.tool_call_id, content=_stringify(result))],
                )
            else:
                tool_msg = Message(
                    role="tool",
                    content=[
                        ToolResultPart(
                            tool_call_id=approval.tool_call_id, content="User rejected this action.", is_error=True
                        )
                    ],
                )
            history.append(tool_msg)
            await self.store.append(approval.session_id, tool_msg)
            await self._loop(event, approval.session_id, history)
        except Exception:
            logging.getLogger("feishu").exception(
                "handle_card_action: error resuming agent after %s of %s (approval=%s)",
                decision,
                approval.tool_name,
                approval_id,
            )
        return {
            "toast": {"type": toast_type, "content": toast_content},
            "card": decided_card,
        }

    def _approval_card(self, tool_name: str, arguments: dict[str, Any], approval_id: str) -> dict[str, Any]:
        from ..cards.builder import Card

        return (
            Card()
            .header(f"Approve {tool_name}?", template="orange")
            .markdown(f"The agent wants to run **{tool_name}** with:\n```json\n{json.dumps(arguments, indent=2)}\n```")
            .button("Approve", value={"__approval__": approval_id, "decision": "approve"}, type="primary")
            .button("Reject", value={"__approval__": approval_id, "decision": "reject"}, type="danger")
            .to_dict()
        )

    def _decided_card(self, tool_name: str, decision: str) -> dict[str, Any]:
        from ..cards.builder import Card

        verb = "approved" if decision == "approve" else "rejected"
        return (
            Card()
            .header(f"{tool_name} {verb}", template="green" if decision == "approve" else "grey")
            .markdown(f"Action **{tool_name}** was {verb}.")
            .to_dict()
        )

    async def _finalize(self, event: Event, text: str) -> None:
        message = event.body.get("message") or {}
        message_id = message.get("message_id")
        if self.stream and self.client is not None:
            await self._finalize_stream(event, text)
            return
        if self.client is not None and message_id:
            await self.client.im.reply(message_id, text, msg_type="text")

    async def _finalize_stream(self, event: Event, text: str) -> None:
        message = event.body.get("message") or {}
        message_id = message.get("message_id")
        # Mirror the non-stream _finalize: reply in-thread to the inbound message; skip if absent.
        if not message_id:
            return

        async def _one_token() -> AsyncIterator[str]:
            yield text

        await self.client.stream_card(_one_token(), reply_to_message_id=message_id)  # type: ignore[union-attr]

run async

Python
run(event: Event) -> None

处理一条飞书消息事件:载入历史、追加用户消息并驱动主循环。

通常无需直接调用,而是经 feishu.agent.dispatch.register_agent 注册为消息事件的处理函数。

参数:

名称 类型 描述 默认
event
Event

飞书消息事件,须具备 .body 属性。

必需
飞书文档

接收消息

示例:

Python Console Session
>>> await agent.run(event)
源代码位于: feishu/agent/loop.py
Python
async def run(self, event: Event) -> None:
    r"""
    处理一条飞书消息事件:载入历史、追加用户消息并驱动主循环。

    通常无需直接调用,而是经 [feishu.agent.dispatch.register_agent][] 注册为消息事件的处理函数。

    Args:
        event: 飞书消息事件,须具备 `.body` 属性。

    飞书文档:
        [接收消息](https://open.feishu.cn/document/server-docs/im-v1/message/events/receive)

    Examples:
        >>> await agent.run(event)  # doctest:+SKIP
    """
    session_id = session_id_for(event)
    history = await self.store.get(session_id)
    history.append(user_message_from_event(event))
    await self.store.set(session_id, history)
    await self._loop(event, session_id, history)

handle_card_action async

Python
handle_card_action(event: Event) -> dict[str, Any]

处理审批卡片的回传交互,恢复或终止此前挂起的对话。

从卡片回传值中读取 __approval__decision。决策无效时不消费挂起审批,用户可重试;批准则执行 对应工具并恢复主循环,拒绝则向模型回传一条错误工具结果再恢复。无论恢复过程是否抛错,都会同步返回 包含 toast 与更新后 card 的飞书响应。

通常无需直接调用,而是经 feishu.agent.dispatch.register_agent 注册为卡片回调事件的处理函数。

参数:

名称 类型 描述 默认
event
Event

飞书卡片回调事件,须具备 .body 属性。

必需

返回:

类型 描述
dict[str, Any]

供飞书更新卡片的同步响应字典,含 toast(及在处理审批时的更新后 card)。

飞书文档

卡片回传交互

示例:

Python Console Session
>>> await agent.handle_card_action(event)
{'toast': {'type': 'success', 'content': 'Approved'}, 'card': {...}}
源代码位于: feishu/agent/loop.py
Python
async def handle_card_action(self, event: Event) -> dict[str, Any]:
    r"""
    处理审批卡片的回传交互,恢复或终止此前挂起的对话。

    从卡片回传值中读取 `__approval__` 与 `decision`。决策无效时不消费挂起审批,用户可重试;批准则执行
    对应工具并恢复主循环,拒绝则向模型回传一条错误工具结果再恢复。无论恢复过程是否抛错,都会同步返回
    包含 `toast` 与更新后 `card` 的飞书响应。

    通常无需直接调用,而是经 [feishu.agent.dispatch.register_agent][] 注册为卡片回调事件的处理函数。

    Args:
        event: 飞书卡片回调事件,须具备 `.body` 属性。

    Returns:
        供飞书更新卡片的同步响应字典,含 `toast`(及在处理审批时的更新后 `card`)。

    飞书文档:
        [卡片回传交互](https://open.feishu.cn/document/uAjLw4CM/ukzMukzMukzM/feishu-cards/card-callback-communication)

    Examples:
        >>> await agent.handle_card_action(event)  # doctest:+SKIP
        {'toast': {'type': 'success', 'content': 'Approved'}, 'card': {...}}
    """
    value = _action_value(event)
    approval_id = value.get("__approval__")
    if not approval_id:
        return {"toast": {"type": "info", "content": "no pending approval"}}
    # Validate the decision BEFORE consuming the approval from the store.
    # A bogus/unrecognised decision must not destroy the PendingApproval so
    # the user can retry with a valid decision.
    decision = value.get("decision")
    if decision not in ("approve", "reject"):
        return {"toast": {"type": "info", "content": "invalid decision"}}
    approval = await self.approvals.pop(approval_id)
    if approval is None:
        return {"toast": {"type": "info", "content": "no pending approval"}}
    history = await self.store.get(approval.session_id)
    decided_card = self._decided_card(approval.tool_name, decision)
    toast_content = "Approved" if decision == "approve" else "Rejected"
    toast_type = "success" if decision == "approve" else "info"
    try:
        if decision == "approve":
            result = await self.registry.dispatch(approval.tool_name, approval.arguments)
            tool_msg = Message(
                role="tool",
                content=[ToolResultPart(tool_call_id=approval.tool_call_id, content=_stringify(result))],
            )
        else:
            tool_msg = Message(
                role="tool",
                content=[
                    ToolResultPart(
                        tool_call_id=approval.tool_call_id, content="User rejected this action.", is_error=True
                    )
                ],
            )
        history.append(tool_msg)
        await self.store.append(approval.session_id, tool_msg)
        await self._loop(event, approval.session_id, history)
    except Exception:
        logging.getLogger("feishu").exception(
            "handle_card_action: error resuming agent after %s of %s (approval=%s)",
            decision,
            approval.tool_name,
            approval_id,
        )
    return {
        "toast": {"type": toast_type, "content": toast_content},
        "card": decided_card,
    }

StreamResult dataclass

一轮流式响应归并后的完整结果。

feishu.agent.loop.accumulate_stream 将逐个 [feishu.agent.llm.StreamChunk][] 归并而成: text 为拼接后的全部文本,tool_calls 为重组完成的工具调用列表,stop_reason 为归一化的停止原因, usage 为可选的用量统计。

示例:

Python Console Session
1
2
3
4
5
>>> result = StreamResult(text="你好", tool_calls=[], stop_reason=StopReason.END_TURN)
>>> result.text
'你好'
>>> result.tool_calls
[]
源代码位于: feishu/agent/loop.py
Python
@dataclass
class StreamResult:
    r"""
    一轮流式响应归并后的完整结果。

    由 [feishu.agent.loop.accumulate_stream][] 将逐个 [feishu.agent.llm.StreamChunk][] 归并而成:
    `text` 为拼接后的全部文本,`tool_calls` 为重组完成的工具调用列表,`stop_reason` 为归一化的停止原因,
    `usage` 为可选的用量统计。

    Examples:
        >>> result = StreamResult(text="你好", tool_calls=[], stop_reason=StopReason.END_TURN)
        >>> result.text
        '你好'
        >>> result.tool_calls
        []
    """

    text: str
    tool_calls: list[ToolCall]
    stop_reason: StopReason
    usage: dict[str, int] | None = None

InMemoryPendingApprovalStore

基于内存的 feishu.agent.session.PendingApprovalStore 实现。

将挂起审批保存在进程内字典中,写操作以锁保护因而并发安全。每个审批仅可被取出一次,取出即移除,可天然 防止重复执行。仅适用于单进程场景;生产环境请自行实现 feishu.agent.session.PendingApprovalStore

示例:

Python Console Session
>>> import asyncio
>>> store = InMemoryPendingApprovalStore()
>>> approval = PendingApproval(
...     approval_id="ap_1",
...     session_id="oc_1",
...     tool_call_id="c1",
...     tool_name="deploy",
...     arguments={"env": "prod"},
... )
>>> async def demo():
...     await store.put(approval)
...     first = await store.pop("ap_1")
...     second = await store.pop("ap_1")
...     return first.tool_name, second
>>> asyncio.run(demo())
('deploy', None)
源代码位于: feishu/agent/session.py
Python
class InMemoryPendingApprovalStore:
    r"""
    基于内存的 [feishu.agent.session.PendingApprovalStore][] 实现。

    将挂起审批保存在进程内字典中,写操作以锁保护因而并发安全。每个审批仅可被取出一次,取出即移除,可天然
    防止重复执行。仅适用于单进程场景;生产环境请自行实现 [feishu.agent.session.PendingApprovalStore][]。

    Examples:
        >>> import asyncio
        >>> store = InMemoryPendingApprovalStore()
        >>> approval = PendingApproval(
        ...     approval_id="ap_1",
        ...     session_id="oc_1",
        ...     tool_call_id="c1",
        ...     tool_name="deploy",
        ...     arguments={"env": "prod"},
        ... )
        >>> async def demo():
        ...     await store.put(approval)
        ...     first = await store.pop("ap_1")
        ...     second = await store.pop("ap_1")
        ...     return first.tool_name, second
        >>> asyncio.run(demo())
        ('deploy', None)
    """

    def __init__(self) -> None:
        self._store: dict[str, PendingApproval] = {}
        self._lock = asyncio.Lock()

    async def put(self, approval: PendingApproval) -> None:
        r"""
        保存一次挂起的审批。

        Args:
            approval: 待保存的 [feishu.agent.session.PendingApproval][]。
        """
        async with self._lock:
            self._store[approval.approval_id] = approval

    async def pop(self, approval_id: str) -> PendingApproval | None:
        r"""
        按 `approval_id` 取出并移除一次挂起的审批。

        Args:
            approval_id: 审批标识。

        Returns:
            对应的 [feishu.agent.session.PendingApproval][];不存在或已被取出时返回 `None`。
        """
        async with self._lock:
            return self._store.pop(approval_id, None)

put async

Python
put(approval: PendingApproval) -> None

保存一次挂起的审批。

参数:

名称 类型 描述 默认
approval
PendingApproval 必需
源代码位于: feishu/agent/session.py
Python
async def put(self, approval: PendingApproval) -> None:
    r"""
    保存一次挂起的审批。

    Args:
        approval: 待保存的 [feishu.agent.session.PendingApproval][]。
    """
    async with self._lock:
        self._store[approval.approval_id] = approval

pop async

Python
pop(approval_id: str) -> PendingApproval | None

approval_id 取出并移除一次挂起的审批。

参数:

名称 类型 描述 默认
approval_id
str

审批标识。

必需

返回:

类型 描述
PendingApproval | None

对应的 feishu.agent.session.PendingApproval;不存在或已被取出时返回 None

源代码位于: feishu/agent/session.py
Python
async def pop(self, approval_id: str) -> PendingApproval | None:
    r"""
    按 `approval_id` 取出并移除一次挂起的审批。

    Args:
        approval_id: 审批标识。

    Returns:
        对应的 [feishu.agent.session.PendingApproval][];不存在或已被取出时返回 `None`。
    """
    async with self._lock:
        return self._store.pop(approval_id, None)

InMemorySessionStore

基于内存的 feishu.agent.session.SessionStore 实现。

将各会话历史保存在进程内字典中,写操作以锁保护因而并发安全。仅适用于单进程、可接受重启即丢失历史的 场景;生产环境请自行实现 feishu.agent.session.SessionStore 接入持久化后端。

示例:

Python Console Session
1
2
3
4
5
6
7
8
9
>>> import asyncio
>>> from feishu.agent.llm import Message, TextPart
>>> store = InMemorySessionStore()
>>> async def demo():
...     await store.append("oc_1", Message(role="user", content=[TextPart(text="你好")]))
...     history = await store.get("oc_1")
...     return history[0].content[0].text
>>> asyncio.run(demo())
'你好'
源代码位于: feishu/agent/session.py
Python
class InMemorySessionStore:
    r"""
    基于内存的 [feishu.agent.session.SessionStore][] 实现。

    将各会话历史保存在进程内字典中,写操作以锁保护因而并发安全。仅适用于单进程、可接受重启即丢失历史的
    场景;生产环境请自行实现 [feishu.agent.session.SessionStore][] 接入持久化后端。

    Examples:
        >>> import asyncio
        >>> from feishu.agent.llm import Message, TextPart
        >>> store = InMemorySessionStore()
        >>> async def demo():
        ...     await store.append("oc_1", Message(role="user", content=[TextPart(text="你好")]))
        ...     history = await store.get("oc_1")
        ...     return history[0].content[0].text
        >>> asyncio.run(demo())
        '你好'
    """

    def __init__(self) -> None:
        self._store: dict[str, list[Message]] = {}
        self._lock = asyncio.Lock()

    async def get(self, session_id: str) -> list[Message]:
        r"""
        读取指定会话的全部历史消息。

        Args:
            session_id: 会话标识。

        Returns:
            历史消息的副本;会话不存在时返回空列表。对返回列表的修改不会影响内部状态。
        """
        return list(self._store.get(session_id, []))

    async def append(self, session_id: str, *messages: Message) -> None:
        r"""
        向指定会话追加一条或多条消息。

        Args:
            session_id: 会话标识。
            *messages: 待追加的消息。
        """
        async with self._lock:
            bucket = self._store.get(session_id)
            if bucket is None:  # double-checked read after acquiring the lock
                bucket = self._store.setdefault(session_id, [])
            bucket.extend(messages)

    async def set(self, session_id: str, messages: list[Message]) -> None:
        r"""
        以给定的消息列表整体替换指定会话的历史。

        Args:
            session_id: 会话标识。
            messages: 用于替换的消息列表,将被拷贝保存。
        """
        async with self._lock:
            self._store[session_id] = list(messages)

get async

Python
get(session_id: str) -> list[Message]

读取指定会话的全部历史消息。

参数:

名称 类型 描述 默认
session_id
str

会话标识。

必需

返回:

类型 描述
list[Message]

历史消息的副本;会话不存在时返回空列表。对返回列表的修改不会影响内部状态。

源代码位于: feishu/agent/session.py
Python
async def get(self, session_id: str) -> list[Message]:
    r"""
    读取指定会话的全部历史消息。

    Args:
        session_id: 会话标识。

    Returns:
        历史消息的副本;会话不存在时返回空列表。对返回列表的修改不会影响内部状态。
    """
    return list(self._store.get(session_id, []))

append async

Python
append(session_id: str, *messages: Message) -> None

向指定会话追加一条或多条消息。

参数:

名称 类型 描述 默认
session_id
str

会话标识。

必需
*messages
Message

待追加的消息。

()
源代码位于: feishu/agent/session.py
Python
async def append(self, session_id: str, *messages: Message) -> None:
    r"""
    向指定会话追加一条或多条消息。

    Args:
        session_id: 会话标识。
        *messages: 待追加的消息。
    """
    async with self._lock:
        bucket = self._store.get(session_id)
        if bucket is None:  # double-checked read after acquiring the lock
            bucket = self._store.setdefault(session_id, [])
        bucket.extend(messages)

set async

Python
set(session_id: str, messages: list[Message]) -> None

以给定的消息列表整体替换指定会话的历史。

参数:

名称 类型 描述 默认
session_id
str

会话标识。

必需
messages
list[Message]

用于替换的消息列表,将被拷贝保存。

必需
源代码位于: feishu/agent/session.py
Python
async def set(self, session_id: str, messages: list[Message]) -> None:
    r"""
    以给定的消息列表整体替换指定会话的历史。

    Args:
        session_id: 会话标识。
        messages: 用于替换的消息列表,将被拷贝保存。
    """
    async with self._lock:
        self._store[session_id] = list(messages)

PendingApproval dataclass

一次挂起的工具审批,记录恢复对话所需的全部上下文。

当工具的 requires_approvalTrue 时,feishu.agent.loop.Agent 会创建该记录并发送审批卡片; 用户在卡片上批准或拒绝后,依据其中保存的会话与工具调用信息恢复本轮对话。

示例:

Python Console Session
1
2
3
4
5
6
7
8
9
>>> approval = PendingApproval(
...     approval_id="ap_1",
...     session_id="oc_1",
...     tool_call_id="c1",
...     tool_name="deploy",
...     arguments={"env": "prod"},
... )
>>> approval.tool_name
'deploy'
源代码位于: feishu/agent/session.py
Python
@dataclass
class PendingApproval:
    r"""
    一次挂起的工具审批,记录恢复对话所需的全部上下文。

    当工具的 `requires_approval` 为 `True` 时,[feishu.agent.loop.Agent][] 会创建该记录并发送审批卡片;
    用户在卡片上批准或拒绝后,依据其中保存的会话与工具调用信息恢复本轮对话。

    Examples:
        >>> approval = PendingApproval(
        ...     approval_id="ap_1",
        ...     session_id="oc_1",
        ...     tool_call_id="c1",
        ...     tool_name="deploy",
        ...     arguments={"env": "prod"},
        ... )
        >>> approval.tool_name
        'deploy'
    """

    approval_id: str
    session_id: str
    tool_call_id: str
    tool_name: str
    arguments: dict[str, Any]

PendingApprovalStore

Bases: Protocol

挂起审批存储协议,是自定义审批持久化后端的扩展契约。

feishu.agent.loop.Agent 通过该协议保存与取回挂起的 feishu.agent.session.PendingApproval; 内置实现为 feishu.agent.session.InMemoryPendingApprovalStore。该协议标注了 runtime_checkable, 可用 isinstance 校验实现是否符合契约。

示例:

Python Console Session
>>> isinstance(InMemoryPendingApprovalStore(), PendingApprovalStore)
True
源代码位于: feishu/agent/session.py
Python
@runtime_checkable
class PendingApprovalStore(Protocol):
    r"""
    挂起审批存储协议,是自定义审批持久化后端的扩展契约。

    [feishu.agent.loop.Agent][] 通过该协议保存与取回挂起的 [feishu.agent.session.PendingApproval][];
    内置实现为 [feishu.agent.session.InMemoryPendingApprovalStore][]。该协议标注了 `runtime_checkable`,
    可用 `isinstance` 校验实现是否符合契约。

    Examples:
        >>> isinstance(InMemoryPendingApprovalStore(), PendingApprovalStore)
        True
    """

    async def put(self, approval: PendingApproval) -> None:
        r"""保存一次挂起的审批。"""
        ...

    async def pop(self, approval_id: str) -> PendingApproval | None:
        r"""按 `approval_id` 取出并移除一次挂起的审批,不存在时返回 `None`。"""
        ...

put async

Python
put(approval: PendingApproval) -> None

保存一次挂起的审批。

源代码位于: feishu/agent/session.py
Python
async def put(self, approval: PendingApproval) -> None:
    r"""保存一次挂起的审批。"""
    ...

pop async

Python
pop(approval_id: str) -> PendingApproval | None

approval_id 取出并移除一次挂起的审批,不存在时返回 None

源代码位于: feishu/agent/session.py
Python
async def pop(self, approval_id: str) -> PendingApproval | None:
    r"""按 `approval_id` 取出并移除一次挂起的审批,不存在时返回 `None`。"""
    ...

SessionStore

Bases: Protocol

会话历史存储协议,是自定义持久化后端的扩展契约。

feishu.agent.loop.Agent 通过该协议读写各会话的对话历史;内置实现为 feishu.agent.session.InMemorySessionStore,可自行实现该协议接入数据库等持久化后端。该协议标注了 runtime_checkable,可用 isinstance 校验实现是否符合契约。

示例:

Python Console Session
>>> isinstance(InMemorySessionStore(), SessionStore)
True
源代码位于: feishu/agent/session.py
Python
@runtime_checkable
class SessionStore(Protocol):
    r"""
    会话历史存储协议,是自定义持久化后端的扩展契约。

    [feishu.agent.loop.Agent][] 通过该协议读写各会话的对话历史;内置实现为
    [feishu.agent.session.InMemorySessionStore][],可自行实现该协议接入数据库等持久化后端。该协议标注了
    `runtime_checkable`,可用 `isinstance` 校验实现是否符合契约。

    Examples:
        >>> isinstance(InMemorySessionStore(), SessionStore)
        True
    """

    async def get(self, session_id: str) -> list[Message]:
        r"""读取指定会话的全部历史消息。"""
        ...

    async def append(self, session_id: str, *messages: Message) -> None:
        r"""向指定会话追加一条或多条消息。"""
        ...

    async def set(self, session_id: str, messages: list[Message]) -> None:
        r"""以给定的消息列表整体替换指定会话的历史。"""
        ...

get async

Python
get(session_id: str) -> list[Message]

读取指定会话的全部历史消息。

源代码位于: feishu/agent/session.py
Python
async def get(self, session_id: str) -> list[Message]:
    r"""读取指定会话的全部历史消息。"""
    ...

append async

Python
append(session_id: str, *messages: Message) -> None

向指定会话追加一条或多条消息。

源代码位于: feishu/agent/session.py
Python
async def append(self, session_id: str, *messages: Message) -> None:
    r"""向指定会话追加一条或多条消息。"""
    ...

set async

Python
set(session_id: str, messages: list[Message]) -> None

以给定的消息列表整体替换指定会话的历史。

源代码位于: feishu/agent/session.py
Python
async def set(self, session_id: str, messages: list[Message]) -> None:
    r"""以给定的消息列表整体替换指定会话的历史。"""
    ...

Tool dataclass

一个已注册的工具:名称、描述、参数 Schema、处理函数及是否需要审批。

handler 既可为同步函数也可为协程函数;同步函数在分发时会被放到工作线程中执行,避免阻塞事件循环。 当 requires_approvalTrue 时,feishu.agent.loop.Agent 会先发送审批卡片并挂起本轮对话, 待用户批准后再执行。

示例:

Python Console Session
>>> async def weather(city):
...     return f"{city}:晴"
>>> tool = Tool(
...     name="weather",
...     description="查询天气",
...     input_schema={"type": "object", "properties": {"city": {"type": "string"}}},
...     handler=weather,
... )
>>> tool.requires_approval
False
源代码位于: feishu/agent/tools.py
Python
@dataclass
class Tool:
    r"""
    一个已注册的工具:名称、描述、参数 Schema、处理函数及是否需要审批。

    `handler` 既可为同步函数也可为协程函数;同步函数在分发时会被放到工作线程中执行,避免阻塞事件循环。
    当 `requires_approval` 为 `True` 时,[feishu.agent.loop.Agent][] 会先发送审批卡片并挂起本轮对话,
    待用户批准后再执行。

    Examples:
        >>> async def weather(city):
        ...     return f"{city}:晴"
        >>> tool = Tool(
        ...     name="weather",
        ...     description="查询天气",
        ...     input_schema={"type": "object", "properties": {"city": {"type": "string"}}},
        ...     handler=weather,
        ... )
        >>> tool.requires_approval
        False
    """

    name: str
    description: str
    input_schema: dict[str, Any]
    handler: Callable[..., Awaitable[Any] | Any]
    requires_approval: bool = False

ToolRegistry

工具注册表,负责工具的注册、声明导出与分发执行。

既支持装饰器形式注册,也支持直接传入处理函数;通过 feishu.agent.tools.ToolRegistry.specs 将 已注册工具导出为 feishu.agent.llm.ToolSpec 列表交给模型,再由 feishu.agent.tools.ToolRegistry.dispatch 校验参数并执行对应处理函数。

示例:

Python Console Session
>>> import asyncio
>>> reg = ToolRegistry()
>>> schema = {"type": "object"}
>>> async def weather(city):
...     return f"{city}:晴"
>>> _ = reg.register("weather", weather, input_schema=schema, description="天气")
>>> reg.specs()
[ToolSpec(name='weather', description='天气', input_schema={'type': 'object'})]
>>> asyncio.run(reg.dispatch("weather", {"city": "上海"}))
'上海:晴'
源代码位于: feishu/agent/tools.py
Python
class ToolRegistry:
    r"""
    工具注册表,负责工具的注册、声明导出与分发执行。

    既支持装饰器形式注册,也支持直接传入处理函数;通过 [feishu.agent.tools.ToolRegistry.specs][] 将
    已注册工具导出为 [feishu.agent.llm.ToolSpec][] 列表交给模型,再由
    [feishu.agent.tools.ToolRegistry.dispatch][] 校验参数并执行对应处理函数。

    Examples:
        >>> import asyncio
        >>> reg = ToolRegistry()
        >>> schema = {"type": "object"}
        >>> async def weather(city):
        ...     return f"{city}:晴"
        >>> _ = reg.register("weather", weather, input_schema=schema, description="天气")
        >>> reg.specs()
        [ToolSpec(name='weather', description='天气', input_schema={'type': 'object'})]
        >>> asyncio.run(reg.dispatch("weather", {"city": "上海"}))
        '上海:晴'
    """

    def __init__(self) -> None:
        self._tools: dict[str, Tool] = {}

    def register(
        self,
        name: str | None = None,
        handler: Callable[..., Any] | None = None,
        *,
        input_schema: dict[str, Any],
        description: str,
        requires_approval: bool = False,
    ) -> Callable[..., Any] | None:
        r"""
        注册一个工具,支持装饰器与直接调用两种形式。

        直接传入 `handler` 时立即注册并原样返回该处理函数;省略 `handler` 时返回一个装饰器,可直接装饰处理
        函数。未显式指定 `name` 时取处理函数的 `__name__` 作为工具名。

        Args:
            name: 工具名称。省略时取处理函数的 `__name__`。
            handler: 工具处理函数,可为同步函数或协程函数。省略时本方法返回装饰器。
            input_schema: 描述工具参数的 JSON Schema。
            description: 工具描述,供模型理解其用途。
            requires_approval: 是否在执行前要求用户审批。默认为 `False`。

        Returns:
            直接调用形式下原样返回 `handler`;装饰器形式下返回用于装饰处理函数的装饰器。

        Raises:
            ValueError: 既未提供 `name` 又无法从处理函数推断出名称时抛出。

        Examples:
            >>> reg = ToolRegistry()
            >>> schema = {"type": "object", "properties": {}}
            >>> @reg.register("ping", input_schema=schema, description="心跳")
            ... async def ping():
            ...     return "pong"
            >>> reg.get("ping").name
            'ping'
        """

        def _add(fn: Callable[..., Any]) -> Callable[..., Any]:
            tool_name = name or getattr(fn, "__name__", None)
            if not tool_name:
                raise ValueError("tool name is required")
            self._tools[tool_name] = Tool(
                name=tool_name,
                description=description,
                input_schema=input_schema,
                handler=fn,
                requires_approval=requires_approval,
            )
            return fn

        if handler is not None:
            return _add(handler)
        return _add  # decorator form

    def specs(self) -> list[ToolSpec]:
        r"""
        将所有已注册工具导出为 [feishu.agent.llm.ToolSpec][] 列表。

        Returns:
            工具声明列表,可直接作为 `tools` 参数传给 [feishu.agent.llm.LlmBackend.stream][]。

        Examples:
            >>> reg = ToolRegistry()
            >>> async def ping():
            ...     return "pong"
            >>> _ = reg.register("ping", ping, input_schema={"type": "object"}, description="心跳")
            >>> reg.specs()
            [ToolSpec(name='ping', description='心跳', input_schema={'type': 'object'})]
        """
        return [
            ToolSpec(name=t.name, description=t.description, input_schema=t.input_schema) for t in self._tools.values()
        ]

    def get(self, name: str) -> Tool:
        r"""
        按名称获取已注册的工具。

        Args:
            name: 工具名称。

        Returns:
            对应的 [feishu.agent.tools.Tool][]。

        Raises:
            KeyError: 工具未注册时抛出。
        """
        return self._tools[name]

    async def dispatch(self, name: str, arguments: dict[str, Any]) -> Any:
        r"""
        校验参数并执行指定工具,返回其结果。

        先依据工具的 `input_schema` 校验 `arguments`,再调用对应处理函数。协程处理函数会被 `await`;
        同步处理函数则放到工作线程中执行,避免阻塞事件循环。

        Args:
            name: 工具名称。
            arguments: 已解析为字典的工具参数。

        Returns:
            工具处理函数的返回值。

        Raises:
            KeyError: 工具未注册时抛出。
            ToolValidationError: 参数未通过 `input_schema` 校验时抛出。

        Examples:
            >>> import asyncio
            >>> reg = ToolRegistry()
            >>> schema = {"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}
            >>> async def weather(city):
            ...     return f"{city}:晴"
            >>> _ = reg.register("weather", weather, input_schema=schema, description="查询天气")
            >>> asyncio.run(reg.dispatch("weather", {"city": "北京"}))
            '北京:晴'
        """
        tool = self._tools[name]  # raises KeyError if unknown
        _validate(name, tool.input_schema, arguments)
        if inspect.iscoroutinefunction(tool.handler) or inspect.iscoroutinefunction(type(tool.handler).__call__):
            return await tool.handler(**arguments)
        result = await asyncio.to_thread(tool.handler, **arguments)
        if inspect.isawaitable(result):
            return await result
        return result

register

Python
register(name: str | None = None, handler: Callable[..., Any] | None = None, *, input_schema: dict[str, Any], description: str, requires_approval: bool = False) -> Callable[..., Any] | None

注册一个工具,支持装饰器与直接调用两种形式。

直接传入 handler 时立即注册并原样返回该处理函数;省略 handler 时返回一个装饰器,可直接装饰处理 函数。未显式指定 name 时取处理函数的 __name__ 作为工具名。

参数:

名称 类型 描述 默认
name
str | None

工具名称。省略时取处理函数的 __name__

None
handler
Callable[..., Any] | None

工具处理函数,可为同步函数或协程函数。省略时本方法返回装饰器。

None
input_schema
dict[str, Any]

描述工具参数的 JSON Schema。

必需
description
str

工具描述,供模型理解其用途。

必需
requires_approval
bool

是否在执行前要求用户审批。默认为 False

False

返回:

类型 描述
Callable[..., Any] | None

直接调用形式下原样返回 handler;装饰器形式下返回用于装饰处理函数的装饰器。

引发:

类型 描述
ValueError

既未提供 name 又无法从处理函数推断出名称时抛出。

示例:

Python Console Session
1
2
3
4
5
6
7
>>> reg = ToolRegistry()
>>> schema = {"type": "object", "properties": {}}
>>> @reg.register("ping", input_schema=schema, description="心跳")
... async def ping():
...     return "pong"
>>> reg.get("ping").name
'ping'
源代码位于: feishu/agent/tools.py
Python
def register(
    self,
    name: str | None = None,
    handler: Callable[..., Any] | None = None,
    *,
    input_schema: dict[str, Any],
    description: str,
    requires_approval: bool = False,
) -> Callable[..., Any] | None:
    r"""
    注册一个工具,支持装饰器与直接调用两种形式。

    直接传入 `handler` 时立即注册并原样返回该处理函数;省略 `handler` 时返回一个装饰器,可直接装饰处理
    函数。未显式指定 `name` 时取处理函数的 `__name__` 作为工具名。

    Args:
        name: 工具名称。省略时取处理函数的 `__name__`。
        handler: 工具处理函数,可为同步函数或协程函数。省略时本方法返回装饰器。
        input_schema: 描述工具参数的 JSON Schema。
        description: 工具描述,供模型理解其用途。
        requires_approval: 是否在执行前要求用户审批。默认为 `False`。

    Returns:
        直接调用形式下原样返回 `handler`;装饰器形式下返回用于装饰处理函数的装饰器。

    Raises:
        ValueError: 既未提供 `name` 又无法从处理函数推断出名称时抛出。

    Examples:
        >>> reg = ToolRegistry()
        >>> schema = {"type": "object", "properties": {}}
        >>> @reg.register("ping", input_schema=schema, description="心跳")
        ... async def ping():
        ...     return "pong"
        >>> reg.get("ping").name
        'ping'
    """

    def _add(fn: Callable[..., Any]) -> Callable[..., Any]:
        tool_name = name or getattr(fn, "__name__", None)
        if not tool_name:
            raise ValueError("tool name is required")
        self._tools[tool_name] = Tool(
            name=tool_name,
            description=description,
            input_schema=input_schema,
            handler=fn,
            requires_approval=requires_approval,
        )
        return fn

    if handler is not None:
        return _add(handler)
    return _add  # decorator form

specs

Python
specs() -> list[ToolSpec]

将所有已注册工具导出为 feishu.agent.llm.ToolSpec 列表。

返回:

类型 描述
list[ToolSpec]

工具声明列表,可直接作为 tools 参数传给 feishu.agent.llm.LlmBackend.stream

示例:

Python Console Session
1
2
3
4
5
6
>>> reg = ToolRegistry()
>>> async def ping():
...     return "pong"
>>> _ = reg.register("ping", ping, input_schema={"type": "object"}, description="心跳")
>>> reg.specs()
[ToolSpec(name='ping', description='心跳', input_schema={'type': 'object'})]
源代码位于: feishu/agent/tools.py
Python
def specs(self) -> list[ToolSpec]:
    r"""
    将所有已注册工具导出为 [feishu.agent.llm.ToolSpec][] 列表。

    Returns:
        工具声明列表,可直接作为 `tools` 参数传给 [feishu.agent.llm.LlmBackend.stream][]。

    Examples:
        >>> reg = ToolRegistry()
        >>> async def ping():
        ...     return "pong"
        >>> _ = reg.register("ping", ping, input_schema={"type": "object"}, description="心跳")
        >>> reg.specs()
        [ToolSpec(name='ping', description='心跳', input_schema={'type': 'object'})]
    """
    return [
        ToolSpec(name=t.name, description=t.description, input_schema=t.input_schema) for t in self._tools.values()
    ]

get

Python
get(name: str) -> Tool

按名称获取已注册的工具。

参数:

名称 类型 描述 默认
name
str

工具名称。

必需

返回:

类型 描述
Tool

引发:

类型 描述
KeyError

工具未注册时抛出。

源代码位于: feishu/agent/tools.py
Python
def get(self, name: str) -> Tool:
    r"""
    按名称获取已注册的工具。

    Args:
        name: 工具名称。

    Returns:
        对应的 [feishu.agent.tools.Tool][]。

    Raises:
        KeyError: 工具未注册时抛出。
    """
    return self._tools[name]

dispatch async

Python
dispatch(name: str, arguments: dict[str, Any]) -> Any

校验参数并执行指定工具,返回其结果。

先依据工具的 input_schema 校验 arguments,再调用对应处理函数。协程处理函数会被 await; 同步处理函数则放到工作线程中执行,避免阻塞事件循环。

参数:

名称 类型 描述 默认
name
str

工具名称。

必需
arguments
dict[str, Any]

已解析为字典的工具参数。

必需

返回:

类型 描述
Any

工具处理函数的返回值。

引发:

类型 描述
KeyError

工具未注册时抛出。

ToolValidationError

参数未通过 input_schema 校验时抛出。

示例:

Python Console Session
1
2
3
4
5
6
7
8
>>> import asyncio
>>> reg = ToolRegistry()
>>> schema = {"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}
>>> async def weather(city):
...     return f"{city}:晴"
>>> _ = reg.register("weather", weather, input_schema=schema, description="查询天气")
>>> asyncio.run(reg.dispatch("weather", {"city": "北京"}))
'北京:晴'
源代码位于: feishu/agent/tools.py
Python
async def dispatch(self, name: str, arguments: dict[str, Any]) -> Any:
    r"""
    校验参数并执行指定工具,返回其结果。

    先依据工具的 `input_schema` 校验 `arguments`,再调用对应处理函数。协程处理函数会被 `await`;
    同步处理函数则放到工作线程中执行,避免阻塞事件循环。

    Args:
        name: 工具名称。
        arguments: 已解析为字典的工具参数。

    Returns:
        工具处理函数的返回值。

    Raises:
        KeyError: 工具未注册时抛出。
        ToolValidationError: 参数未通过 `input_schema` 校验时抛出。

    Examples:
        >>> import asyncio
        >>> reg = ToolRegistry()
        >>> schema = {"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}
        >>> async def weather(city):
        ...     return f"{city}:晴"
        >>> _ = reg.register("weather", weather, input_schema=schema, description="查询天气")
        >>> asyncio.run(reg.dispatch("weather", {"city": "北京"}))
        '北京:晴'
    """
    tool = self._tools[name]  # raises KeyError if unknown
    _validate(name, tool.input_schema, arguments)
    if inspect.iscoroutinefunction(tool.handler) or inspect.iscoroutinefunction(type(tool.handler).__call__):
        return await tool.handler(**arguments)
    result = await asyncio.to_thread(tool.handler, **arguments)
    if inspect.isawaitable(result):
        return await result
    return result

ToolValidationError

Bases: ValueError

工具参数校验失败时抛出。

feishu.agent.tools.ToolRegistry.dispatch 收到的参数不是对象、缺少必填字段,或在 additionalPropertiesFalse 时出现多余字段,即抛出该异常。

示例:

Python Console Session
1
2
3
4
>>> raise ToolValidationError("missing required argument")
Traceback (most recent call last):
    ...
feishu.agent.tools.ToolValidationError: missing required argument
源代码位于: feishu/agent/tools.py
Python
class ToolValidationError(ValueError):
    r"""
    工具参数校验失败时抛出。

    当 [feishu.agent.tools.ToolRegistry.dispatch][] 收到的参数不是对象、缺少必填字段,或在
    `additionalProperties` 为 `False` 时出现多余字段,即抛出该异常。

    Examples:
        >>> raise ToolValidationError("missing required argument")
        Traceback (most recent call last):
            ...
        feishu.agent.tools.ToolValidationError: missing required argument
    """

register_agent

Python
register_agent(dispatcher: EventDispatcher, agent: Agent, *, message_event: str = 'im.message.receive_v1', card_event: str = 'card.action.trigger') -> None

将智能体的消息处理与卡片回调挂载到事件分发器上。

feishu.agent.loop.Agent.run 注册为消息事件的处理函数,把 feishu.agent.loop.Agent.handle_card_action 注册为卡片回调事件的处理函数。dispatcher 须提供与 feishu.events.dispatcher.EventDispatcher 一致的 on(event_type) 装饰器接口。

参数:

名称 类型 描述 默认

dispatcher

EventDispatcher

事件分发器,须提供 on(event_type) 装饰器接口。

必需

agent

Agent

已构造的 feishu.agent.loop.Agent

必需

message_event

str

消息事件类型。默认为 im.message.receive_v1

'im.message.receive_v1'

card_event

str

卡片回调事件类型。默认为 card.action.trigger

'card.action.trigger'
飞书文档

接收消息

卡片回传交互

示例:

Python Console Session
>>> register_agent(dispatcher, agent)
源代码位于: feishu/agent/dispatch.py
Python
def register_agent(
    dispatcher: EventDispatcher,
    agent: Agent,
    *,
    message_event: str = "im.message.receive_v1",
    card_event: str = "card.action.trigger",
) -> None:
    r"""
    将智能体的消息处理与卡片回调挂载到事件分发器上。

    把 [feishu.agent.loop.Agent.run][] 注册为消息事件的处理函数,把
    [feishu.agent.loop.Agent.handle_card_action][] 注册为卡片回调事件的处理函数。`dispatcher` 须提供与
    [feishu.events.dispatcher.EventDispatcher][] 一致的 `on(event_type)` 装饰器接口。

    Args:
        dispatcher: 事件分发器,须提供 `on(event_type)` 装饰器接口。
        agent: 已构造的 [feishu.agent.loop.Agent][]。
        message_event: 消息事件类型。默认为 `im.message.receive_v1`。
        card_event: 卡片回调事件类型。默认为 `card.action.trigger`。

    飞书文档:
        [接收消息](https://open.feishu.cn/document/server-docs/im-v1/message/events/receive)

        [卡片回传交互](https://open.feishu.cn/document/uAjLw4CM/ukzMukzMukzM/feishu-cards/card-callback-communication)

    Examples:
        >>> register_agent(dispatcher, agent)  # doctest:+SKIP
    """
    dispatcher.on(message_event)(agent.run)
    dispatcher.on(card_event)(agent.handle_card_action)

accumulate_stream async

Python
accumulate_stream(chunks: AsyncIterator[StreamChunk]) -> StreamResult

将一轮流式响应的增量片段归并为一个 feishu.agent.loop.StreamResult

文本片段按序拼接;工具调用片段按 index 归并,逐段累积出完整的参数 JSON 字符串,并产出有序的 feishu.agent.llm.ToolCall 列表;停止原因与用量统计取自 feishu.agent.llm.MessageStop

参数:

名称 类型 描述 默认

chunks

AsyncIterator[StreamChunk]

逐个产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器,通常来自 feishu.agent.llm.LlmBackend.stream

必需

返回:

类型 描述
StreamResult

示例:

Python Console Session
>>> import asyncio
>>> async def chunks():
...     yield TextDelta(text="晴")
...     yield ToolCallDelta(index=0, id="c1", name="weather", arguments='{"city":"上海"}')
...     yield MessageStop(stop_reason=StopReason.TOOL_USE)
>>> result = asyncio.run(accumulate_stream(chunks()))
>>> result.text
'晴'
>>> result.tool_calls
[ToolCall(id='c1', name='weather', arguments='{"city":"上海"}')]
>>> result.stop_reason
<StopReason.TOOL_USE: 'tool_use'>
源代码位于: feishu/agent/loop.py
Python
async def accumulate_stream(chunks: AsyncIterator[StreamChunk]) -> StreamResult:
    r"""
    将一轮流式响应的增量片段归并为一个 [feishu.agent.loop.StreamResult][]。

    文本片段按序拼接;工具调用片段按 `index` 归并,逐段累积出完整的参数 JSON 字符串,并产出有序的
    [feishu.agent.llm.ToolCall][] 列表;停止原因与用量统计取自 [feishu.agent.llm.MessageStop][]。

    Args:
        chunks: 逐个产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器,通常来自
            [feishu.agent.llm.LlmBackend.stream][]。

    Returns:
        归并后的 [feishu.agent.loop.StreamResult][]。

    Examples:
        >>> import asyncio
        >>> async def chunks():
        ...     yield TextDelta(text="晴")
        ...     yield ToolCallDelta(index=0, id="c1", name="weather", arguments='{"city":"上海"}')
        ...     yield MessageStop(stop_reason=StopReason.TOOL_USE)
        >>> result = asyncio.run(accumulate_stream(chunks()))
        >>> result.text
        '晴'
        >>> result.tool_calls
        [ToolCall(id='c1', name='weather', arguments='{"city":"上海"}')]
        >>> result.stop_reason
        <StopReason.TOOL_USE: 'tool_use'>
    """
    text_parts: list[str] = []
    by_index: dict[int, _Accum] = {}
    stop_reason = StopReason.OTHER
    usage: dict[str, int] | None = None
    async for chunk in chunks:
        if isinstance(chunk, TextDelta):
            text_parts.append(chunk.text)
        elif isinstance(chunk, ToolCallDelta):
            acc = by_index.setdefault(chunk.index, _Accum())
            if acc.id is None and chunk.id is not None:
                acc.id = chunk.id
            if acc.name is None and chunk.name is not None:
                acc.name = chunk.name
            acc.arguments += chunk.arguments
        elif isinstance(chunk, MessageStop):
            stop_reason = chunk.stop_reason
            usage = chunk.usage
    tool_calls = [
        ToolCall(id=acc.id or "", name=acc.name or "", arguments=acc.arguments) for _, acc in sorted(by_index.items())
    ]
    return StreamResult(text="".join(text_parts), tool_calls=tool_calls, stop_reason=stop_reason, usage=usage)

session_id_for

Python
session_id_for(event: Event) -> str

从消息事件推导会话标识,用于隔离不同会话的对话历史。

优先使用 chat_id;当消息属于话题(thread)回复时,附加 root_id 以将同一话题归为独立会话; 若事件中没有 chat_id,则回退为 message_id

参数:

名称 类型 描述 默认

event

Event

飞书消息事件,须具备 .body 属性。

必需

返回:

类型 描述
str

会话标识字符串。

飞书文档

接收消息

示例:

Python Console Session
>>> from types import SimpleNamespace
>>> ev = SimpleNamespace(body={"message": {"chat_id": "oc_1", "message_id": "om_1"}})
>>> session_id_for(ev)
'oc_1'
>>> thread = SimpleNamespace(body={"message": {"chat_id": "oc_1", "root_id": "om_root"}})
>>> session_id_for(thread)
'oc_1:om_root'
>>> dm = SimpleNamespace(body={"message": {"message_id": "om_9"}})
>>> session_id_for(dm)
'om_9'
源代码位于: feishu/agent/loop.py
Python
def session_id_for(event: Event) -> str:
    r"""
    从消息事件推导会话标识,用于隔离不同会话的对话历史。

    优先使用 `chat_id`;当消息属于话题(thread)回复时,附加 `root_id` 以将同一话题归为独立会话;
    若事件中没有 `chat_id`,则回退为 `message_id`。

    Args:
        event: 飞书消息事件,须具备 `.body` 属性。

    Returns:
        会话标识字符串。

    飞书文档:
        [接收消息](https://open.feishu.cn/document/server-docs/im-v1/message/events/receive)

    Examples:
        >>> from types import SimpleNamespace
        >>> ev = SimpleNamespace(body={"message": {"chat_id": "oc_1", "message_id": "om_1"}})
        >>> session_id_for(ev)
        'oc_1'
        >>> thread = SimpleNamespace(body={"message": {"chat_id": "oc_1", "root_id": "om_root"}})
        >>> session_id_for(thread)
        'oc_1:om_root'
        >>> dm = SimpleNamespace(body={"message": {"message_id": "om_9"}})
        >>> session_id_for(dm)
        'om_9'
    """
    message = event.body.get("message") or {}
    chat_id = message.get("chat_id")
    root_id = message.get("root_id")
    if chat_id:
        return f"{chat_id}:{root_id}" if root_id else chat_id
    return message.get("message_id") or ""

user_message_from_event

Python
user_message_from_event(event: Event) -> Message

将飞书消息事件转换为一条用户角色的 feishu.agent.llm.Message

文本提取委托给 feishu.im.inbound.message_text,因此除纯文本外还支持富文本(post)消息, 并会依据消息的 mentions 数组将 @_user_N 提及占位符解析为 @<姓名>;未被解析的开头占位符 (例如事件未携带 mentions 时)会被去除。当无法解析出任何文本时(如图片等非文本消息),退回使用 原始 content 作为文本。

参数:

名称 类型 描述 默认

event

Event

飞书消息事件,须具备 .body 属性。

必需

返回:

类型 描述
Message

角色为 userfeishu.agent.llm.Message

引发:

类型 描述
ValueError

事件体中不存在 message 对象时抛出。

飞书文档

接收消息

示例:

Python Console Session
1
2
3
4
5
6
7
8
9
>>> import json
>>> from types import SimpleNamespace
>>> body = {"message": {"message_type": "text", "content": json.dumps({"text": "@_user_1 你好"})}}
>>> ev = SimpleNamespace(body=body)
>>> msg = user_message_from_event(ev)
>>> msg.role
'user'
>>> msg.content[0].text
'你好'
源代码位于: feishu/agent/loop.py
Python
def user_message_from_event(event: Event) -> Message:
    r"""
    将飞书消息事件转换为一条用户角色的 [feishu.agent.llm.Message][]。

    文本提取委托给 [feishu.im.inbound.message_text][],因此除纯文本外还支持富文本(`post`)消息,
    并会依据消息的 `mentions` 数组将 `@_user_N` 提及占位符解析为 `@<姓名>`;未被解析的开头占位符
    (例如事件未携带 `mentions` 时)会被去除。当无法解析出任何文本时(如图片等非文本消息),退回使用
    原始 `content` 作为文本。

    Args:
        event: 飞书消息事件,须具备 `.body` 属性。

    Returns:
        角色为 `user` 的 [feishu.agent.llm.Message][]。

    Raises:
        ValueError: 事件体中不存在 `message` 对象时抛出。

    飞书文档:
        [接收消息](https://open.feishu.cn/document/server-docs/im-v1/message/events/receive)

    Examples:
        >>> import json
        >>> from types import SimpleNamespace
        >>> body = {"message": {"message_type": "text", "content": json.dumps({"text": "@_user_1 你好"})}}
        >>> ev = SimpleNamespace(body=body)
        >>> msg = user_message_from_event(ev)
        >>> msg.role
        'user'
        >>> msg.content[0].text
        '你好'
    """
    from ..im.inbound import message_text

    message = event.body.get("message")
    if not message:
        raise ValueError("event body has no 'message' object")
    try:
        text = message_text(message)
    except (ValueError, TypeError):
        text = ""
    if text:
        text = _MENTION_RE.sub("", text).strip()
    else:
        text = message.get("content") or ""
    return Message(role="user", content=[TextPart(text=text)])

stream_text async

Python
stream_text(provider_stream: Any) -> AsyncIterator[str]

将任意 LLM 提供商的流式响应转换为纯文本增量的异步迭代器。

本函数是 feishu.client.FeishuClient.stream_card 的配套适配器:将大模型流式 响应(同步或异步)归一化为逐个字符串 token,可直接传入 stream_cardtokens 参数,从而把 LLM 输出实时推送至飞书消息卡片。

适配策略:

  • Anthropic 上下文管理器AsyncMessageStreamManager,带 __aenter__ 但无 __aiter__):进入上下文后迭代原始 SSE 事件,经 [feishu.agent.adapters.anthropic._translate_events][] 归一化。

  • OpenAI 异步流AsyncStream[ChatCompletionChunk] 或 chunk 带 choices 属性):直接异步迭代,经 [feishu.agent.adapters.openai._translate_chunks][] 归一化。

  • OpenAI 同步流Stream[ChatCompletionChunk],同步可迭代,首个元素有 choices 属性):通过 loop.run_in_executor 包装后同上处理。

  • 归一化 StreamChunk 序列(已经 feishu.agent.llm.LlmBackend 适配器处理 的异步迭代器,首个元素为 feishu.agent.llm.TextDelta):直接消费,仅产出 TextDelta.text,跳过工具调用增量与停止信号。

  • Anthropic 原始事件异步/同步流(默认 fallback):经 Anthropic 适配器归一化。

同步/异步均兼容。工具调用增量(feishu.agent.llm.ToolCallDelta)与停止信号 (feishu.agent.llm.MessageStop)均被静默跳过,只有文本内容被产出。

参数:

名称 类型 描述 默认

provider_stream

Any

LLM 提供商返回的流式对象,支持以下类型:

  • openai.AsyncStream[ChatCompletionChunk]
  • openai.Stream[ChatCompletionChunk](同步)
  • anthropic.AsyncMessageStreamManagerasync with 风格)
  • 产出原始 Anthropic SSE 事件的异步/同步可迭代对象
  • 产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器 (即 feishu.agent.llm.LlmBackend.stream 的返回值)
必需

返回:

类型 描述
AsyncIterator[str]

逐个产出文本增量字符串(str)的异步迭代器,不含工具调用增量或停止信号。

引发:

类型 描述
TypeError

provider_stream 既非异步可迭代也非同步可迭代时抛出。

飞书文档

CardKit 流式更新

示例:

Python Console Session
>>> import asyncio
>>> from feishu.agent.llm import TextDelta, MessageStop, StopReason
>>> async def _fake_normalized():
...     yield TextDelta(text="Hello")
...     yield TextDelta(text=" world")
...     yield MessageStop(stop_reason=StopReason.END_TURN)
>>> async def _run():
...     return [t async for t in stream_text(_fake_normalized())]
>>> asyncio.run(_run())
['Hello', ' world']
源代码位于: feishu/agent/streaming.py
Python
async def stream_text(provider_stream: Any) -> AsyncIterator[str]:
    r"""
    将任意 LLM 提供商的流式响应转换为纯文本增量的异步迭代器。

    本函数是 [feishu.client.FeishuClient.stream_card][] 的配套适配器:将大模型流式
    响应(同步或异步)归一化为逐个字符串 token,可直接传入 ``stream_card`` 的
    ``tokens`` 参数,从而把 LLM 输出实时推送至飞书消息卡片。

    适配策略:

    - **Anthropic 上下文管理器**(``AsyncMessageStreamManager``,带 ``__aenter__``
      但无 ``__aiter__``):进入上下文后迭代原始 SSE 事件,经
      [feishu.agent.adapters.anthropic._translate_events][] 归一化。

    - **OpenAI 异步流**(``AsyncStream[ChatCompletionChunk]`` 或 chunk 带
      ``choices`` 属性):直接异步迭代,经
      [feishu.agent.adapters.openai._translate_chunks][] 归一化。

    - **OpenAI 同步流**(``Stream[ChatCompletionChunk]``,同步可迭代,首个元素有
      ``choices`` 属性):通过 ``loop.run_in_executor`` 包装后同上处理。

    - **归一化 StreamChunk 序列**(已经 [feishu.agent.llm.LlmBackend][] 适配器处理
      的异步迭代器,首个元素为 [feishu.agent.llm.TextDelta][]):直接消费,仅产出
      ``TextDelta.text``,跳过工具调用增量与停止信号。

    - **Anthropic 原始事件异步/同步流**(默认 fallback):经 Anthropic 适配器归一化。

    同步/异步均兼容。工具调用增量([feishu.agent.llm.ToolCallDelta][])与停止信号
    ([feishu.agent.llm.MessageStop][])均被静默跳过,只有文本内容被产出。

    Args:
        provider_stream: LLM 提供商返回的流式对象,支持以下类型:

            - ``openai.AsyncStream[ChatCompletionChunk]``
            - ``openai.Stream[ChatCompletionChunk]``(同步)
            - ``anthropic.AsyncMessageStreamManager``(``async with`` 风格)
            - 产出原始 Anthropic SSE 事件的异步/同步可迭代对象
            - 产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器
              (即 [feishu.agent.llm.LlmBackend.stream][] 的返回值)

    Returns:
        逐个产出文本增量字符串(``str``)的异步迭代器,不含工具调用增量或停止信号。

    Raises:
        TypeError: 当 ``provider_stream`` 既非异步可迭代也非同步可迭代时抛出。

    飞书文档:
        [CardKit 流式更新](https://open.feishu.cn/document/server-docs/im-v1/message-card/card-kit)

    Examples:
        >>> import asyncio
        >>> from feishu.agent.llm import TextDelta, MessageStop, StopReason
        >>> async def _fake_normalized():
        ...     yield TextDelta(text="Hello")
        ...     yield TextDelta(text=" world")
        ...     yield MessageStop(stop_reason=StopReason.END_TURN)
        >>> async def _run():
        ...     return [t async for t in stream_text(_fake_normalized())]
        >>> asyncio.run(_run())
        ['Hello', ' world']
    """
    # ------------------------------------------------------------------
    # Case 1: Anthropic async-context-manager (e.g. AsyncMessageStreamManager).
    # Has __aenter__ but is NOT directly async-iterable.
    # ------------------------------------------------------------------
    is_async_iterable = hasattr(provider_stream, "__aiter__")
    is_context_manager = hasattr(provider_stream, "__aenter__")

    if is_context_manager and not is_async_iterable:
        async with provider_stream as raw_stream:
            async for text in _yield_text_from_anthropic(raw_stream):
                yield text
        return

    # ------------------------------------------------------------------
    # Case 2: Async iterable — detect provider by peeking at first chunk.
    # ------------------------------------------------------------------
    if is_async_iterable:
        async for text in _stream_text_async(provider_stream):
            yield text
        return

    # ------------------------------------------------------------------
    # Case 3: Synchronous iterable — wrap with run_in_executor, then detect.
    # ------------------------------------------------------------------
    if hasattr(provider_stream, "__iter__"):
        async for text in _stream_text_sync(provider_stream):
            yield text
        return

    raise TypeError(
        f"stream_text: unsupported provider_stream type {type(provider_stream)!r}; "
        "expected an async/sync iterable of LLM stream chunks."
    )