跳转至

streaming

feishu.agent.streaming

LLM 流式响应到文本增量的通用适配器。

stream_text async

Python
stream_text(provider_stream: Any) -> AsyncIterator[str]

将任意 LLM 提供商的流式响应转换为纯文本增量的异步迭代器。

本函数是 feishu.client.FeishuClient.stream_card 的配套适配器:将大模型流式 响应(同步或异步)归一化为逐个字符串 token,可直接传入 stream_cardtokens 参数,从而把 LLM 输出实时推送至飞书消息卡片。

适配策略:

  • Anthropic 上下文管理器AsyncMessageStreamManager,带 __aenter__ 但无 __aiter__):进入上下文后迭代原始 SSE 事件,经 [feishu.agent.adapters.anthropic._translate_events][] 归一化。

  • OpenAI 异步流AsyncStream[ChatCompletionChunk] 或 chunk 带 choices 属性):直接异步迭代,经 [feishu.agent.adapters.openai._translate_chunks][] 归一化。

  • OpenAI 同步流Stream[ChatCompletionChunk],同步可迭代,首个元素有 choices 属性):通过 loop.run_in_executor 包装后同上处理。

  • 归一化 StreamChunk 序列(已经 feishu.agent.llm.LlmBackend 适配器处理 的异步迭代器,首个元素为 feishu.agent.llm.TextDelta):直接消费,仅产出 TextDelta.text,跳过工具调用增量与停止信号。

  • Anthropic 原始事件异步/同步流(默认 fallback):经 Anthropic 适配器归一化。

同步/异步均兼容。工具调用增量(feishu.agent.llm.ToolCallDelta)与停止信号 (feishu.agent.llm.MessageStop)均被静默跳过,只有文本内容被产出。

参数:

名称 类型 描述 默认

provider_stream

Any

LLM 提供商返回的流式对象,支持以下类型:

  • openai.AsyncStream[ChatCompletionChunk]
  • openai.Stream[ChatCompletionChunk](同步)
  • anthropic.AsyncMessageStreamManagerasync with 风格)
  • 产出原始 Anthropic SSE 事件的异步/同步可迭代对象
  • 产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器 (即 feishu.agent.llm.LlmBackend.stream 的返回值)
必需

返回:

类型 描述
AsyncIterator[str]

逐个产出文本增量字符串(str)的异步迭代器,不含工具调用增量或停止信号。

引发:

类型 描述
TypeError

provider_stream 既非异步可迭代也非同步可迭代时抛出。

飞书文档

CardKit 流式更新

示例:

Python Console Session
>>> import asyncio
>>> from feishu.agent.llm import TextDelta, MessageStop, StopReason
>>> async def _fake_normalized():
...     yield TextDelta(text="Hello")
...     yield TextDelta(text=" world")
...     yield MessageStop(stop_reason=StopReason.END_TURN)
>>> async def _run():
...     return [t async for t in stream_text(_fake_normalized())]
>>> asyncio.run(_run())
['Hello', ' world']
源代码位于: feishu/agent/streaming.py
Python
async def stream_text(provider_stream: Any) -> AsyncIterator[str]:
    r"""
    将任意 LLM 提供商的流式响应转换为纯文本增量的异步迭代器。

    本函数是 [feishu.client.FeishuClient.stream_card][] 的配套适配器:将大模型流式
    响应(同步或异步)归一化为逐个字符串 token,可直接传入 ``stream_card`` 的
    ``tokens`` 参数,从而把 LLM 输出实时推送至飞书消息卡片。

    适配策略:

    - **Anthropic 上下文管理器**(``AsyncMessageStreamManager``,带 ``__aenter__``
      但无 ``__aiter__``):进入上下文后迭代原始 SSE 事件,经
      [feishu.agent.adapters.anthropic._translate_events][] 归一化。

    - **OpenAI 异步流**(``AsyncStream[ChatCompletionChunk]`` 或 chunk 带
      ``choices`` 属性):直接异步迭代,经
      [feishu.agent.adapters.openai._translate_chunks][] 归一化。

    - **OpenAI 同步流**(``Stream[ChatCompletionChunk]``,同步可迭代,首个元素有
      ``choices`` 属性):通过 ``loop.run_in_executor`` 包装后同上处理。

    - **归一化 StreamChunk 序列**(已经 [feishu.agent.llm.LlmBackend][] 适配器处理
      的异步迭代器,首个元素为 [feishu.agent.llm.TextDelta][]):直接消费,仅产出
      ``TextDelta.text``,跳过工具调用增量与停止信号。

    - **Anthropic 原始事件异步/同步流**(默认 fallback):经 Anthropic 适配器归一化。

    同步/异步均兼容。工具调用增量([feishu.agent.llm.ToolCallDelta][])与停止信号
    ([feishu.agent.llm.MessageStop][])均被静默跳过,只有文本内容被产出。

    Args:
        provider_stream: LLM 提供商返回的流式对象,支持以下类型:

            - ``openai.AsyncStream[ChatCompletionChunk]``
            - ``openai.Stream[ChatCompletionChunk]``(同步)
            - ``anthropic.AsyncMessageStreamManager``(``async with`` 风格)
            - 产出原始 Anthropic SSE 事件的异步/同步可迭代对象
            - 产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器
              (即 [feishu.agent.llm.LlmBackend.stream][] 的返回值)

    Returns:
        逐个产出文本增量字符串(``str``)的异步迭代器,不含工具调用增量或停止信号。

    Raises:
        TypeError: 当 ``provider_stream`` 既非异步可迭代也非同步可迭代时抛出。

    飞书文档:
        [CardKit 流式更新](https://open.feishu.cn/document/server-docs/im-v1/message-card/card-kit)

    Examples:
        >>> import asyncio
        >>> from feishu.agent.llm import TextDelta, MessageStop, StopReason
        >>> async def _fake_normalized():
        ...     yield TextDelta(text="Hello")
        ...     yield TextDelta(text=" world")
        ...     yield MessageStop(stop_reason=StopReason.END_TURN)
        >>> async def _run():
        ...     return [t async for t in stream_text(_fake_normalized())]
        >>> asyncio.run(_run())
        ['Hello', ' world']
    """
    # ------------------------------------------------------------------
    # Case 1: Anthropic async-context-manager (e.g. AsyncMessageStreamManager).
    # Has __aenter__ but is NOT directly async-iterable.
    # ------------------------------------------------------------------
    is_async_iterable = hasattr(provider_stream, "__aiter__")
    is_context_manager = hasattr(provider_stream, "__aenter__")

    if is_context_manager and not is_async_iterable:
        async with provider_stream as raw_stream:
            async for text in _yield_text_from_anthropic(raw_stream):
                yield text
        return

    # ------------------------------------------------------------------
    # Case 2: Async iterable — detect provider by peeking at first chunk.
    # ------------------------------------------------------------------
    if is_async_iterable:
        async for text in _stream_text_async(provider_stream):
            yield text
        return

    # ------------------------------------------------------------------
    # Case 3: Synchronous iterable — wrap with run_in_executor, then detect.
    # ------------------------------------------------------------------
    if hasattr(provider_stream, "__iter__"):
        async for text in _stream_text_sync(provider_stream):
            yield text
        return

    raise TypeError(
        f"stream_text: unsupported provider_stream type {type(provider_stream)!r}; "
        "expected an async/sync iterable of LLM stream chunks."
    )