跳转至

client

feishu.ws.client

WsClient

飞书长连接(WebSocket)事件客户端。

作为 Webhook 接收器(create_event_app 等)的替代方案: 无需公网回调地址,应用主动与飞书建立一条持久 WebSocket 连接,事件经该连接推送, 处理结果通过 ACK 帧回传,对标 Slack 的 Socket Mode。

连接生命周期由 start 驱动:握手 -> 建连 -> 收发循环, 断线后按 ClientConfig 自动重连。事件解析与分发完全复用 EventDispatcher,因此 Webhook 与长连接两种接入 方式可共用同一套处理函数;分发结果会被编码进 ACK,供卡片回调等场景返回 {toast, card}

为便于测试,HTTP 客户端与 websocket 连接器均可注入,默认实现仅在真正需要时才创建/导入。

参数:

名称 类型 描述 默认

app_id

str

应用 App ID。

必需

app_secret

str

应用 App Secret。

必需

dispatcher

EventDispatcher

事件分发器。

必需

region

str

区域标识,feishulark,默认 feishu

'feishu'

base_url

str | None

自定义基础地址,传入时优先于 region

None

auto_reconnect

bool

断线后是否自动重连,默认 True

True

logger

Logger | None

自定义日志器,缺省使用名为 feishu 的日志器。

None

http_client

AsyncClient | None

注入的 httpx.AsyncClient,用于握手;为 None 时每次握手临时创建并关闭。

None

connect

Connect | None

注入的 websocket 连接器;为 None 时懒加载 websockets

None

引发:

类型 描述
ValueError

app_idapp_secret 为空时抛出。

飞书文档

长连接模式

示例:

Python Console Session
1
2
3
4
5
6
7
8
9
>>> from feishu.events.dispatcher import EventDispatcher
>>> dispatcher = EventDispatcher()
>>> @dispatcher.on("im.message.receive_v1")
... async def on_message(event):
...     print(event.event_id)
...
>>> ws = WsClient("cli_app", "secret", dispatcher)
>>> import asyncio
>>> asyncio.run(ws.start())
源代码位于: feishu/ws/client.py
Python
class WsClient:
    r"""
    飞书长连接(WebSocket)事件客户端。

    作为 Webhook 接收器([create_event_app][feishu.events.receiver.create_event_app] 等)的替代方案:
    无需公网回调地址,应用主动与飞书建立一条持久 WebSocket 连接,事件经该连接推送,
    处理结果通过 ACK 帧回传,对标 Slack 的 Socket Mode。

    连接生命周期由 [start][feishu.ws.client.WsClient.start] 驱动:握手 -> 建连 -> 收发循环,
    断线后按 [ClientConfig][feishu.ws.model.ClientConfig] 自动重连。事件解析与分发完全复用
    [EventDispatcher][feishu.events.dispatcher.EventDispatcher],因此 Webhook 与长连接两种接入
    方式可共用同一套处理函数;分发结果会被编码进 ACK,供卡片回调等场景返回 `{toast, card}`。

    为便于测试,HTTP 客户端与 websocket 连接器均可注入,默认实现仅在真正需要时才创建/导入。

    Args:
        app_id: 应用 App ID。
        app_secret: 应用 App Secret。
        dispatcher: 事件分发器。
        region: 区域标识,`feishu` 或 `lark`,默认 `feishu`。
        base_url: 自定义基础地址,传入时优先于 `region`。
        auto_reconnect: 断线后是否自动重连,默认 `True`。
        logger: 自定义日志器,缺省使用名为 `feishu` 的日志器。
        http_client: 注入的 `httpx.AsyncClient`,用于握手;为 `None` 时每次握手临时创建并关闭。
        connect: 注入的 websocket 连接器;为 `None` 时懒加载 `websockets`。

    Raises:
        ValueError: 当 `app_id` 或 `app_secret` 为空时抛出。

    飞书文档:
        [长连接模式](https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/reference/im-v1/message/events/long-connection-mode)

    Examples:
        >>> from feishu.events.dispatcher import EventDispatcher
        >>> dispatcher = EventDispatcher()
        >>> @dispatcher.on("im.message.receive_v1")
        ... async def on_message(event):
        ...     print(event.event_id)
        ...
        >>> ws = WsClient("cli_app", "secret", dispatcher)
        >>> import asyncio
        >>> asyncio.run(ws.start())  # doctest: +SKIP
    """

    def __init__(
        self,
        app_id: str,
        app_secret: str,
        dispatcher: EventDispatcher,
        *,
        region: str = "feishu",
        base_url: str | None = None,
        auto_reconnect: bool = True,
        logger: logging.Logger | None = None,
        http_client: httpx.AsyncClient | None = None,
        connect: Connect | None = None,
    ) -> None:
        if not app_id:
            raise ValueError("app_id must not be empty")
        if not app_secret:
            raise ValueError("app_secret must not be empty")
        self._app_id = app_id
        self._app_secret = app_secret
        self._dispatcher = dispatcher
        self._base_url = resolve_base_url(region, base_url)
        self._auto_reconnect = auto_reconnect
        self.logger = logger or logging.getLogger("feishu")
        self._http_client = http_client
        self._connect: Connect = connect or _default_connect

        # 握手后填充:service 帧字段取自 wss URL 的 service_id 查询参数。
        self._service_id = 0
        self._ping_interval = ClientConfig().ping_interval
        # 分片重组缓冲:message_id -> {seq: chunk}。首版不做 TTL 淘汰;
        # 若上游漏发某个分片,对应条目会一直驻留,后续可加超时清理。
        self._fragments: dict[str, dict[int, bytes]] = {}
        # 运行控制。
        self._stopped = False
        self._websocket: Any = None

    async def _handshake(self) -> tuple[str, ClientConfig]:
        r"""
        执行握手,换取 wss 连接地址与客户端配置。

        向 `{base_url}/callback/ws/endpoint` POST 应用凭据(注意该端点不在 Open API 前缀下,
        因此使用裸 httpx 而非 SDK 传输层)。成功后从返回的 wss URL 中解析出 `service_id`
        并记录为后续出站帧的 `service` 字段。

        Returns:
            `(wss_url, client_config)` 二元组。

        Raises:
            FeishuServerError: 当握手返回 5xx 时抛出(可重试,由 [start][feishu.ws.client.WsClient.start] 退避重连)。
            FeishuError: 当响应 `code` 非 0 时抛出(如鉴权/配置错误,不可重试)。
        """
        client = self._http_client or httpx.AsyncClient()
        try:
            resp = await client.post(
                f"{self._base_url}{_ENDPOINT_PATH}",
                headers={"locale": "zh"},
                json={"AppID": self._app_id, "AppSecret": self._app_secret},
            )
            # 5xx is a transient server-side failure -> surface as FeishuServerError so start() retries.
            if resp.status_code >= 500:
                raise FeishuServerError(resp.status_code, f"handshake failed: HTTP {resp.status_code}")
            payload = resp.json()
        finally:
            if self._http_client is None:
                await client.aclose()

        code = payload.get("code", -1)
        if code != 0:
            raise FeishuError(code, payload.get("msg", ""), raw=payload)

        data = payload.get("data") or {}
        url = data["URL"]
        self._service_id = _parse_service_id(url)
        config = client_config_from_dict(data.get("ClientConfig") or {})
        self._ping_interval = config.ping_interval
        return url, config

    def _ping_frame(self) -> Frame:
        r"""构造一个心跳(ping)控制帧。"""
        return Frame(
            seq_id=0,
            log_id=0,
            service=self._service_id,
            method=FRAME_TYPE_CONTROL,
            headers=[Header("type", "ping")],
        )

    async def _ping_loop(self, websocket: Any, send_lock: asyncio.Lock) -> None:
        r"""后台任务:按 `ping_interval` 周期发送心跳控制帧,直至被取消或连接断开。

        连接在心跳发送期间断开会使 `websocket.send` 抛出异常;此处安静退出(`_serve` 会感知断开并触发
        重连),避免该后台任务的异常无人取回而触发 asyncio 告警。取消(CancelledError)正常向上传播。
        """
        try:
            while True:
                await asyncio.sleep(self._ping_interval)
                async with send_lock:
                    await websocket.send(encode_frame(self._ping_frame()))
        except Exception:  # noqa: BLE001 - a drop mid-ping is expected; exit quietly, don't leak the task exc
            self.logger.debug("ws ping loop stopped", exc_info=True)

    def _handle_control(self, frame: Frame) -> None:
        r"""处理控制帧(心跳回复):若回复携带 ClientConfig 则刷新心跳间隔。"""
        if frame.payload:
            # 心跳回复无有效 ClientConfig 时保持原间隔。
            with suppress(ValueError, KeyError):
                self._ping_interval = client_config_from_dict(json.loads(frame.payload.decode("utf-8"))).ping_interval

    async def _build_ack_for(self, frame: Frame, payload: bytes) -> Frame:
        r"""解析完整载荷、交由分发器处理,并把分发结果编码进 ACK 帧返回。"""
        event = Event.from_payload(json.loads(payload.decode("utf-8")))
        result = await self._dispatcher.dispatch(event)
        return _build_ack(frame, result)

    def _reassemble(self, frame: Frame) -> bytes | None:
        r"""
        按 `sum`/`seq` 头重组分片帧。

        `sum <= 1` 时帧自身即完整载荷,直接返回其 `payload`。否则按 `message_id` 缓存各 `seq`
        分片,集齐 `sum` 个后按序拼接并清理缓冲返回;尚未集齐时返回 `None`。

        Args:
            frame: 数据帧。

        Returns:
            完整的载荷字节;分片尚未集齐时返回 `None`。
        """
        total = int(frame.header("sum") or "1")
        payload = frame.payload or b""
        if total <= 1:
            return payload

        message_id = frame.header("message_id") or ""
        seq = int(frame.header("seq") or "0")
        chunks = self._fragments.get(message_id)
        if chunks is None:
            if len(self._fragments) >= _MAX_PARTIAL_MESSAGES:
                # 丢弃最旧的未完成分片(dict 按插入序),将无界增长收敛为有界。
                self._fragments.pop(next(iter(self._fragments)), None)
            chunks = self._fragments[message_id] = {}
        chunks[seq] = payload
        if len(chunks) < total:
            return None

        ordered = b"".join(chunks[i] for i in range(total))
        del self._fragments[message_id]
        return ordered

    async def _serve(self, websocket: Any) -> None:
        r"""
        在已建立的连接上收发,直至连接关闭。

        启动心跳后台任务并循环接收、解码帧。控制帧(心跳)与分片重组在收发循环内同步处理;
        每条完整消息的分发与 ACK 回送则派生独立任务并发执行,从而避免某个耗时的处理函数阻塞
        后续帧的接收(所有发送经 `send_lock` 串行化以保证帧不交错)。连接关闭(websockets 抛出
        `ConnectionClosed`)时退出循环,并在 `finally` 中取消心跳、等待在途分发任务收尾。

        Args:
            websocket: 已连接的 websocket 对象。
        """
        import websockets

        self._websocket = websocket
        send_lock = asyncio.Lock()
        ping_task = asyncio.ensure_future(self._ping_loop(websocket, send_lock))
        pending: set[asyncio.Task[None]] = set()
        try:
            while True:
                try:
                    raw = await websocket.recv()
                except websockets.ConnectionClosed:
                    break
                if isinstance(raw, str):
                    raw = raw.encode("utf-8")
                frame = decode_frame(raw)
                if frame.method == FRAME_TYPE_CONTROL:
                    self._handle_control(frame)
                    continue
                if frame.method != FRAME_TYPE_DATA:
                    continue
                # 分片重组在循环内同步完成(避免并发任务竞争分片缓冲);只有完整消息的分发与
                # ACK 回送会派生独立任务,确保慢处理函数不阻塞后续帧的接收。
                payload = self._reassemble(frame)
                if payload is None:
                    continue
                task = asyncio.ensure_future(self._send_ack(websocket, frame, payload, send_lock))
                pending.add(task)
                task.add_done_callback(pending.discard)
        finally:
            ping_task.cancel()
            # 连接关闭后等待在途分发任务收尾;其 ACK 可能因连接已关闭而发送失败,将被忽略。
            if pending:
                await asyncio.gather(*pending, return_exceptions=True)
            self._websocket = None

    async def _send_ack(self, websocket: Any, frame: Frame, payload: bytes, send_lock: asyncio.Lock) -> None:
        r"""
        分发完整消息并在 `send_lock` 保护下回送 ACK(作为独立任务并发执行)。

        作为脱离收发循环的独立任务运行,其异常不会冒泡到 `_serve`;因此在此捕获并记录
        (载荷解析失败、处理函数异常或连接已关闭导致的发送失败),避免在连接存活期间被静默吞掉。
        """
        try:
            ack = await self._build_ack_for(frame, payload)
            async with send_lock:
                await websocket.send(encode_frame(ack))
        except Exception:  # noqa: BLE001 - a per-message task failure must be logged, not silently dropped
            self.logger.exception("ws ack/dispatch failed for frame seq_id=%s", frame.seq_id)

    async def start(self) -> None:
        r"""
        启动长连接并阻塞运行,直至 [aclose][feishu.ws.client.WsClient.aclose] 被调用。

        循环执行「握手 -> 建连 -> 收发」;连接断开后,若开启了自动重连且仍有重连次数,
        则等待 `reconnect_interval` 秒后重试(`reconnect_count == -1` 表示无限重连)。
        重连次数耗尽则停止。握手本身的瞬时失败(网络错误 / 5xx)同样按此退避重试,并计入重连预算;
        鉴权 / 配置等不可重试错误则直接抛出。

        Examples:
            >>> ws = WsClient("cli_app", "secret", EventDispatcher())
            >>> import asyncio
            >>> asyncio.run(ws.start())  # doctest: +SKIP
        """
        # Transient connect/serve failures are retried on the reconnect budget below, not escaped.
        # websockets is optional; include its base exception only when importable so its connect /
        # WS-upgrade / abnormal-close errors count too (ImportError from a missing dep still propagates).
        transient: tuple[type[BaseException], ...] = (OSError, asyncio.TimeoutError)
        try:
            import websockets

            transient += (websockets.WebSocketException,)
        except ImportError:
            pass

        attempts = 0
        config = ClientConfig()  # defaults until the first successful handshake (drives early backoff)
        while not self._stopped:
            try:
                url, config = await self._handshake()
            except (httpx.RequestError, FeishuServerError) as exc:
                # Transient handshake failure (network blip / 5xx): back off and retry rather than
                # aborting the whole connection loop. Non-transient errors (auth/config) propagate.
                if self._stopped or not self._auto_reconnect:
                    raise
                if config.reconnect_count != -1 and attempts >= config.reconnect_count:
                    self.logger.warning("ws handshake retries exhausted (%d)", config.reconnect_count)
                    raise
                attempts += 1
                self.logger.warning("ws handshake failed (%s); retrying", exc)
                await asyncio.sleep(self._reconnect_delay(config))
                continue
            try:
                async with self._connect(url) as websocket:
                    await self._serve(websocket)
            except transient as exc:
                # Transient failure opening/serving the socket after a good handshake (DNS blip,
                # refused, WS-upgrade error, abnormal close). Retry on the same reconnect budget
                # below rather than escaping start(); non-transient errors (bugs/auth) still propagate.
                if self._stopped or not self._auto_reconnect:
                    raise
                self.logger.warning("ws connection failed (%s); retrying", exc)

            if self._stopped or not self._auto_reconnect:
                return
            if config.reconnect_count != -1 and attempts >= config.reconnect_count:
                self.logger.warning("ws reconnect attempts exhausted (%d)", config.reconnect_count)
                return
            attempts += 1
            await asyncio.sleep(self._reconnect_delay(config))

    def _reconnect_delay(self, config: ClientConfig) -> float:
        r"""重连等待时长:在 `reconnect_interval` 之上叠加 `[0, reconnect_nonce)` 的随机抖动,避免雪崩式重连。"""
        return config.reconnect_interval + random.uniform(0, config.reconnect_nonce)

    async def aclose(self) -> None:
        r"""
        请求停止:置位停止标志使 [start][feishu.ws.client.WsClient.start] 的循环退出,并关闭活动连接。
        """
        self._stopped = True
        websocket = self._websocket
        if websocket is not None:
            await websocket.close()

start async

Python
start() -> None

启动长连接并阻塞运行,直至 aclose 被调用。

循环执行「握手 -> 建连 -> 收发」;连接断开后,若开启了自动重连且仍有重连次数, 则等待 reconnect_interval 秒后重试(reconnect_count == -1 表示无限重连)。 重连次数耗尽则停止。握手本身的瞬时失败(网络错误 / 5xx)同样按此退避重试,并计入重连预算; 鉴权 / 配置等不可重试错误则直接抛出。

示例:

Python Console Session
1
2
3
>>> ws = WsClient("cli_app", "secret", EventDispatcher())
>>> import asyncio
>>> asyncio.run(ws.start())
源代码位于: feishu/ws/client.py
Python
async def start(self) -> None:
    r"""
    启动长连接并阻塞运行,直至 [aclose][feishu.ws.client.WsClient.aclose] 被调用。

    循环执行「握手 -> 建连 -> 收发」;连接断开后,若开启了自动重连且仍有重连次数,
    则等待 `reconnect_interval` 秒后重试(`reconnect_count == -1` 表示无限重连)。
    重连次数耗尽则停止。握手本身的瞬时失败(网络错误 / 5xx)同样按此退避重试,并计入重连预算;
    鉴权 / 配置等不可重试错误则直接抛出。

    Examples:
        >>> ws = WsClient("cli_app", "secret", EventDispatcher())
        >>> import asyncio
        >>> asyncio.run(ws.start())  # doctest: +SKIP
    """
    # Transient connect/serve failures are retried on the reconnect budget below, not escaped.
    # websockets is optional; include its base exception only when importable so its connect /
    # WS-upgrade / abnormal-close errors count too (ImportError from a missing dep still propagates).
    transient: tuple[type[BaseException], ...] = (OSError, asyncio.TimeoutError)
    try:
        import websockets

        transient += (websockets.WebSocketException,)
    except ImportError:
        pass

    attempts = 0
    config = ClientConfig()  # defaults until the first successful handshake (drives early backoff)
    while not self._stopped:
        try:
            url, config = await self._handshake()
        except (httpx.RequestError, FeishuServerError) as exc:
            # Transient handshake failure (network blip / 5xx): back off and retry rather than
            # aborting the whole connection loop. Non-transient errors (auth/config) propagate.
            if self._stopped or not self._auto_reconnect:
                raise
            if config.reconnect_count != -1 and attempts >= config.reconnect_count:
                self.logger.warning("ws handshake retries exhausted (%d)", config.reconnect_count)
                raise
            attempts += 1
            self.logger.warning("ws handshake failed (%s); retrying", exc)
            await asyncio.sleep(self._reconnect_delay(config))
            continue
        try:
            async with self._connect(url) as websocket:
                await self._serve(websocket)
        except transient as exc:
            # Transient failure opening/serving the socket after a good handshake (DNS blip,
            # refused, WS-upgrade error, abnormal close). Retry on the same reconnect budget
            # below rather than escaping start(); non-transient errors (bugs/auth) still propagate.
            if self._stopped or not self._auto_reconnect:
                raise
            self.logger.warning("ws connection failed (%s); retrying", exc)

        if self._stopped or not self._auto_reconnect:
            return
        if config.reconnect_count != -1 and attempts >= config.reconnect_count:
            self.logger.warning("ws reconnect attempts exhausted (%d)", config.reconnect_count)
            return
        attempts += 1
        await asyncio.sleep(self._reconnect_delay(config))

aclose async

Python
aclose() -> None

请求停止:置位停止标志使 start 的循环退出,并关闭活动连接。

源代码位于: feishu/ws/client.py
Python
async def aclose(self) -> None:
    r"""
    请求停止:置位停止标志使 [start][feishu.ws.client.WsClient.start] 的循环退出,并关闭活动连接。
    """
    self._stopped = True
    websocket = self._websocket
    if websocket is not None:
        await websocket.close()