跳转至

streaming

feishu.streaming

stream_card async

Python
stream_card(client: FeishuClient, tokens: AsyncIterator[str], *, receive_id: str | None = None, receive_id_type: str | None = None, reply_to_message_id: str | None = None, element_id: str = 'md', debounce_s: float = 0.25, header: str | None = None, template: str | None = None, _now: Callable[[], float] = _now, _new_uuid: Callable[[], str] = _new_uuid) -> str

驱动 CardKit v1 流式生命周期,并返回创建出的 card_id

先创建并发送一张初始流式卡片,随后将逐 token 累积的**完整文本**流式写入 element_id 指定的 markdown 元素(带去抖限流、序号单调递增、每次写入以 uuid 标识), 最后**始终**执行收尾(streaming_mode=False)。即使 token 生产者抛出异常也会收尾: 卡片的流式状态在 10 分钟后会自动关闭,若不主动收尾,泄漏的开放流会白白浪费卡片实体。

参数:

名称 类型 描述 默认

client

FeishuClient

飞书客户端,须提供 async request(method, path, *, params, json) 方法。

必需

tokens

AsyncIterator[str]

逐段产出文本的异步迭代器,通常为 LLM 的流式输出。每段会被追加到累积文本上。

必需

receive_id

str | None

消息接收者 ID,其类型由 receive_id_type 指定。发新消息时必填,且与 reply_to_message_id 二者只能取其一。

None

receive_id_type

str | None

接收者 ID 类型,如 open_iduser_idchat_id 等;为空时按 receive_id 前缀自动推断(与 IM 发送族一致),仅发新消息时适用。

None

reply_to_message_id

str | None

以回复形式发送时的目标消息 message_idom_ 开头)。提供时初始卡片 经回复接口发出(在原消息所在会话内成串显示),此时 receive_id 应留空。

None

element_id

str

流式写入目标 markdown 元素的 ID,默认为 md

'md'

debounce_s

float

去抖间隔(秒),默认为 0.25(约 4 次/秒,远低于飞书 10 次/秒/卡片的上限)。

0.25

header

str | None

卡片标题文本。为 None 时不渲染标题。

None

template

str | None

标题栏配色模板(如 bluegreen),仅在提供 header 时生效。

None

_now

Callable[[], float]

单调时钟函数,仅供测试注入以使去抖确定化,生产调用请勿传入。

_now

_new_uuid

Callable[[], str]

uuid 生成函数,仅供测试注入,生产调用请勿传入。

_new_uuid

返回:

类型 描述
str

创建出的卡片实体 card_id

飞书文档

创建卡片实体

流式更新文本

发送消息

示例:

Python Console Session
1
2
3
4
5
6
7
8
>>> async def tokens():
...     for tok in ["你好", ",", "世界"]:
...         yield tok
>>> card_id = await client.stream_card(
...     tokens(), receive_id="ou_xxx", receive_id_type="open_id"
... )
>>> card_id
'card_42'
源代码位于: feishu/streaming/cardkit.py
Python
async def stream_card(
    client: FeishuClient,
    tokens: AsyncIterator[str],
    *,
    receive_id: str | None = None,
    receive_id_type: str | None = None,
    reply_to_message_id: str | None = None,
    element_id: str = "md",
    debounce_s: float = 0.25,
    header: str | None = None,
    template: str | None = None,
    _now: Callable[[], float] = _now,
    _new_uuid: Callable[[], str] = _new_uuid,
) -> str:
    r"""驱动 CardKit v1 流式生命周期,并返回创建出的 ``card_id``。

    先创建并发送一张初始流式卡片,随后将逐 token 累积的**完整文本**流式写入
    ``element_id`` 指定的 markdown 元素(带去抖限流、序号单调递增、每次写入以 uuid 标识),
    最后**始终**执行收尾(``streaming_mode=False``)。即使 token 生产者抛出异常也会收尾:
    卡片的流式状态在 10 分钟后会自动关闭,若不主动收尾,泄漏的开放流会白白浪费卡片实体。

    Args:
        client: 飞书客户端,须提供 ``async request(method, path, *, params, json)`` 方法。
        tokens: 逐段产出文本的异步迭代器,通常为 LLM 的流式输出。每段会被追加到累积文本上。
        receive_id: 消息接收者 ID,其类型由 ``receive_id_type`` 指定。发新消息时必填,且与
            ``reply_to_message_id`` 二者只能取其一。
        receive_id_type: 接收者 ID 类型,如 ``open_id``、``user_id``、``chat_id`` 等;为空时按 ``receive_id``
            前缀自动推断(与 IM 发送族一致),仅发新消息时适用。
        reply_to_message_id: 以回复形式发送时的目标消息 ``message_id``(``om_`` 开头)。提供时初始卡片
            经回复接口发出(在原消息所在会话内成串显示),此时 ``receive_id`` 应留空。
        element_id: 流式写入目标 markdown 元素的 ID,默认为 ``md``。
        debounce_s: 去抖间隔(秒),默认为 ``0.25``(约 4 次/秒,远低于飞书 10 次/秒/卡片的上限)。
        header: 卡片标题文本。为 ``None`` 时不渲染标题。
        template: 标题栏配色模板(如 ``blue``、``green``),仅在提供 ``header`` 时生效。
        _now: 单调时钟函数,仅供测试注入以使去抖确定化,生产调用请勿传入。
        _new_uuid: uuid 生成函数,仅供测试注入,生产调用请勿传入。

    Returns:
        创建出的卡片实体 ``card_id``。

    飞书文档:
        [创建卡片实体](https://open.feishu.cn/document/uAjLw4CM/ukzMukzMukzM/feishu-cards/cardkit-v1/card/create)

        [流式更新文本](https://open.feishu.cn/document/uAjLw4CM/ukzMukzMukzM/feishu-cards/streaming-updates-overview)

        [发送消息](https://open.feishu.cn/document/server-docs/im-v1/message/create)

    Examples:
        >>> async def tokens():
        ...     for tok in ["你好", ",", "世界"]:
        ...         yield tok
        >>> card_id = await client.stream_card(  # doctest: +SKIP
        ...     tokens(), receive_id="ou_xxx", receive_id_type="open_id"
        ... )
        >>> card_id  # doctest: +SKIP
        'card_42'
    """
    if (receive_id is None) == (reply_to_message_id is None):
        raise ValueError("stream_card requires exactly one of receive_id or reply_to_message_id")
    counter = _SequenceCounter()

    # 1) Create the card entity.
    card_json = _build_streaming_card(element_id=element_id, header=header, template=template)
    create = await client.request(
        "POST",
        spec.CREATE_CARD_PATH,
        json={spec.CREATE_CARD_TYPE_FIELD: spec.CREATE_CARD_TYPE, spec.CREATE_CARD_DATA_FIELD: json.dumps(card_json)},
    )
    card_id = create["data"]["card_id"]

    # 2) Send the interactive message referencing the entity — fresh, or in reply position.
    content = json.dumps({"type": spec.SEND_CARD_CONTENT_TYPE, "data": {"card_id": card_id}})
    if reply_to_message_id is not None:
        await client.request(
            "POST",
            spec.reply_message_path(reply_to_message_id),
            json={"msg_type": spec.SEND_MSG_TYPE, "content": content},
        )
    else:
        assert receive_id is not None  # guaranteed by the receive_id / reply_to_message_id XOR check above
        rid_type = receive_id_type or infer_receive_id_type(receive_id)
        await client.request(
            "POST",
            spec.SEND_MESSAGE_PATH,
            params={"receive_id_type": rid_type},
            json={"receive_id": receive_id, "msg_type": spec.SEND_MSG_TYPE, "content": content},
        )

    # 3) Stream cumulative text. Finalize is mandatory -> try/finally.
    flusher = _Flusher(
        client,
        card_id=card_id,
        element_id=element_id,
        counter=counter,
        debounce_s=debounce_s,
        now=_now,
        new_uuid=_new_uuid,
    )
    accumulated = ""
    try:
        async for tok in tokens:
            accumulated += tok
            await flusher.write(accumulated)  # debounced; may skip
    finally:
        # One mandatory final flush of whatever we accumulated, then finalize.
        if accumulated:
            await flusher.write(accumulated, force=True)
        await client.request(
            "PATCH",
            spec.settings_path(card_id),
            json={
                spec.SETTINGS_FIELD: json.dumps({"config": {spec.STREAMING_MODE_KEY: False}}),
                spec.SEQUENCE_FIELD: await counter.next(),
            },
        )
    return card_id