将任意 LLM 提供商的流式响应转换为纯文本增量的异步迭代器。
本函数是 feishu.client.FeishuClient.stream_card 的配套适配器:将大模型流式
响应(同步或异步)归一化为逐个字符串 token,可直接传入 stream_card 的
tokens 参数,从而把 LLM 输出实时推送至飞书消息卡片。
适配策略:
-
Anthropic 上下文管理器(AsyncMessageStreamManager,带 __aenter__
但无 __aiter__):进入上下文后迭代原始 SSE 事件,经
[feishu.agent.adapters.anthropic._translate_events][] 归一化。
-
OpenAI 异步流(AsyncStream[ChatCompletionChunk] 或 chunk 带
choices 属性):直接异步迭代,经
[feishu.agent.adapters.openai._translate_chunks][] 归一化。
-
OpenAI 同步流(Stream[ChatCompletionChunk],同步可迭代,首个元素有
choices 属性):通过 loop.run_in_executor 包装后同上处理。
-
归一化 StreamChunk 序列(已经 feishu.agent.llm.LlmBackend 适配器处理
的异步迭代器,首个元素为 feishu.agent.llm.TextDelta):直接消费,仅产出
TextDelta.text,跳过工具调用增量与停止信号。
-
Anthropic 原始事件异步/同步流(默认 fallback):经 Anthropic 适配器归一化。
同步/异步均兼容。工具调用增量(feishu.agent.llm.ToolCallDelta)与停止信号
(feishu.agent.llm.MessageStop)均被静默跳过,只有文本内容被产出。
参数:
| 名称 |
类型 |
描述 |
默认 |
provider_stream
|
Any
|
LLM 提供商返回的流式对象,支持以下类型:
openai.AsyncStream[ChatCompletionChunk]
openai.Stream[ChatCompletionChunk](同步)
anthropic.AsyncMessageStreamManager(async with 风格)
- 产出原始 Anthropic SSE 事件的异步/同步可迭代对象
- 产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器
(即 feishu.agent.llm.LlmBackend.stream 的返回值)
|
必需
|
返回:
引发:
| 类型 |
描述 |
TypeError
|
当 provider_stream 既非异步可迭代也非同步可迭代时抛出。
|
飞书文档
CardKit 流式更新
示例:
| Python Console Session |
|---|
| >>> import asyncio
>>> from feishu.agent.llm import TextDelta, MessageStop, StopReason
>>> async def _fake_normalized():
... yield TextDelta(text="Hello")
... yield TextDelta(text=" world")
... yield MessageStop(stop_reason=StopReason.END_TURN)
>>> async def _run():
... return [t async for t in stream_text(_fake_normalized())]
>>> asyncio.run(_run())
['Hello', ' world']
|
源代码位于: feishu/agent/streaming.py
| Python |
|---|
| async def stream_text(provider_stream: Any) -> AsyncIterator[str]:
r"""
将任意 LLM 提供商的流式响应转换为纯文本增量的异步迭代器。
本函数是 [feishu.client.FeishuClient.stream_card][] 的配套适配器:将大模型流式
响应(同步或异步)归一化为逐个字符串 token,可直接传入 ``stream_card`` 的
``tokens`` 参数,从而把 LLM 输出实时推送至飞书消息卡片。
适配策略:
- **Anthropic 上下文管理器**(``AsyncMessageStreamManager``,带 ``__aenter__``
但无 ``__aiter__``):进入上下文后迭代原始 SSE 事件,经
[feishu.agent.adapters.anthropic._translate_events][] 归一化。
- **OpenAI 异步流**(``AsyncStream[ChatCompletionChunk]`` 或 chunk 带
``choices`` 属性):直接异步迭代,经
[feishu.agent.adapters.openai._translate_chunks][] 归一化。
- **OpenAI 同步流**(``Stream[ChatCompletionChunk]``,同步可迭代,首个元素有
``choices`` 属性):通过 ``loop.run_in_executor`` 包装后同上处理。
- **归一化 StreamChunk 序列**(已经 [feishu.agent.llm.LlmBackend][] 适配器处理
的异步迭代器,首个元素为 [feishu.agent.llm.TextDelta][]):直接消费,仅产出
``TextDelta.text``,跳过工具调用增量与停止信号。
- **Anthropic 原始事件异步/同步流**(默认 fallback):经 Anthropic 适配器归一化。
同步/异步均兼容。工具调用增量([feishu.agent.llm.ToolCallDelta][])与停止信号
([feishu.agent.llm.MessageStop][])均被静默跳过,只有文本内容被产出。
Args:
provider_stream: LLM 提供商返回的流式对象,支持以下类型:
- ``openai.AsyncStream[ChatCompletionChunk]``
- ``openai.Stream[ChatCompletionChunk]``(同步)
- ``anthropic.AsyncMessageStreamManager``(``async with`` 风格)
- 产出原始 Anthropic SSE 事件的异步/同步可迭代对象
- 产出 [feishu.agent.llm.StreamChunk][] 的异步迭代器
(即 [feishu.agent.llm.LlmBackend.stream][] 的返回值)
Returns:
逐个产出文本增量字符串(``str``)的异步迭代器,不含工具调用增量或停止信号。
Raises:
TypeError: 当 ``provider_stream`` 既非异步可迭代也非同步可迭代时抛出。
飞书文档:
[CardKit 流式更新](https://open.feishu.cn/document/server-docs/im-v1/message-card/card-kit)
Examples:
>>> import asyncio
>>> from feishu.agent.llm import TextDelta, MessageStop, StopReason
>>> async def _fake_normalized():
... yield TextDelta(text="Hello")
... yield TextDelta(text=" world")
... yield MessageStop(stop_reason=StopReason.END_TURN)
>>> async def _run():
... return [t async for t in stream_text(_fake_normalized())]
>>> asyncio.run(_run())
['Hello', ' world']
"""
# ------------------------------------------------------------------
# Case 1: Anthropic async-context-manager (e.g. AsyncMessageStreamManager).
# Has __aenter__ but is NOT directly async-iterable.
# ------------------------------------------------------------------
is_async_iterable = hasattr(provider_stream, "__aiter__")
is_context_manager = hasattr(provider_stream, "__aenter__")
if is_context_manager and not is_async_iterable:
async with provider_stream as raw_stream:
async for text in _yield_text_from_anthropic(raw_stream):
yield text
return
# ------------------------------------------------------------------
# Case 2: Async iterable — detect provider by peeking at first chunk.
# ------------------------------------------------------------------
if is_async_iterable:
async for text in _stream_text_async(provider_stream):
yield text
return
# ------------------------------------------------------------------
# Case 3: Synchronous iterable — wrap with run_in_executor, then detect.
# ------------------------------------------------------------------
if hasattr(provider_stream, "__iter__"):
async for text in _stream_text_sync(provider_stream):
yield text
return
raise TypeError(
f"stream_text: unsupported provider_stream type {type(provider_stream)!r}; "
"expected an async/sync iterable of LLM stream chunks."
)
|