跳转至

events

feishu.events

EventDispatcher

将飞书事件按类型分发给已注册的异步处理函数。

通过 on 装饰器按事件类型注册处理函数, 使用 "*" 注册可匹配所有事件的兜底处理函数。 dispatch 会先运行精确匹配的处理函数, 再运行兜底处理函数。

单个处理函数抛出的异常会被捕获并记录,不会中断其余处理函数(含 "*" 兜底);可通过 on_error 注册全局异常处理函数集中上报。

若构造时传入 SeenStore,分发前会基于 Event.event_id 去重,避免飞书重试导致重复处理。

参数:

名称 类型 描述 默认

seen_store

SeenStore | None

事件去重存储;为 None 时不做去重。

None

logger

Logger | None

处理函数异常的日志器;缺省使用名为 feishu 的日志器。

None
飞书文档

订阅事件

示例:

Python Console Session
>>> import asyncio
>>> from feishu.events.envelope import Event
>>> dispatcher = EventDispatcher()
>>> seen = []
>>> @dispatcher.on("im.message.receive_v1")
... async def handle(event):
...     seen.append(event.event_id)
...
>>> event = Event.from_payload(
...     {"schema": "2.0", "header": {"event_type": "im.message.receive_v1", "event_id": "evt_42"}, "event": {}}
... )
>>> asyncio.run(dispatcher.dispatch(event))
>>> seen
['evt_42']
源代码位于: feishu/events/dispatcher.py
Python
class EventDispatcher:
    r"""
    将飞书事件按类型分发给已注册的异步处理函数。

    通过 [on][feishu.events.dispatcher.EventDispatcher.on] 装饰器按事件类型注册处理函数,
    使用 `"*"` 注册可匹配所有事件的兜底处理函数。
    [dispatch][feishu.events.dispatcher.EventDispatcher.dispatch] 会先运行精确匹配的处理函数,
    再运行兜底处理函数。

    单个处理函数抛出的异常会被捕获并记录,不会中断其余处理函数(含 `"*"` 兜底);可通过
    [on_error][feishu.events.dispatcher.EventDispatcher.on_error] 注册全局异常处理函数集中上报。

    若构造时传入 [SeenStore][feishu.events.idempotency.SeenStore],分发前会基于
    [Event.event_id][feishu.events.envelope.Event.event_id] 去重,避免飞书重试导致重复处理。

    Args:
        seen_store: 事件去重存储;为 `None` 时不做去重。
        logger: 处理函数异常的日志器;缺省使用名为 `feishu` 的日志器。

    飞书文档:
        [订阅事件](https://open.feishu.cn/document/server-docs/event-subscription-guide/overview)

    Examples:
        >>> import asyncio
        >>> from feishu.events.envelope import Event
        >>> dispatcher = EventDispatcher()
        >>> seen = []
        >>> @dispatcher.on("im.message.receive_v1")
        ... async def handle(event):
        ...     seen.append(event.event_id)
        ...
        >>> event = Event.from_payload(
        ...     {"schema": "2.0", "header": {"event_type": "im.message.receive_v1", "event_id": "evt_42"}, "event": {}}
        ... )
        >>> asyncio.run(dispatcher.dispatch(event))
        >>> seen
        ['evt_42']
    """

    def __init__(self, *, seen_store: SeenStore | None = None, logger: logging.Logger | None = None):
        self._handlers: dict[str, list[Handler]] = {}
        self._seen_store = seen_store
        self._logger = logger or logging.getLogger("feishu")
        self._error_handler: ErrorHandler | None = None

    def on(self, event_type: str) -> Callable[[Handler], Handler]:
        r"""
        注册指定事件类型的处理函数(装饰器)。

        同一类型可注册多个处理函数,按注册顺序执行。使用 `"*"` 注册兜底处理函数,
        它会在所有事件的精确匹配处理函数之后执行。

        Args:
            event_type: 事件类型,如 `im.message.receive_v1`;`"*"` 表示匹配全部。

        Returns:
            接收处理函数并将其注册的装饰器;原函数会被原样返回。

        Examples:
            >>> dispatcher = EventDispatcher()
            >>> @dispatcher.on("*")
            ... async def fallback(event):
            ...     return None
            ...
            >>> fallback.__name__
            'fallback'
        """

        def register(handler: Handler) -> Handler:
            self._handlers.setdefault(event_type, []).append(handler)
            return handler

        return register

    def on_error(self, handler: ErrorHandler) -> ErrorHandler:
        r"""
        注册全局异常处理函数(仿 Slack Bolt 的 `@app.error`)。

        当任一事件处理函数抛出异常时,框架会先记录日志,再调用此处理函数用于集中上报或返回兜底结果。
        处理函数接收异常与对应事件;若其返回非 `None` 值且本次分发尚无其他结果,则作为返回值
        (例如卡片回调可借此返回错误 toast)。后注册的会覆盖先注册的。

        Args:
            handler: 形如 `async def(exc: Exception, event: Event) -> dict | None` 的异常处理函数。

        Returns:
            原处理函数(便于作装饰器使用)。

        Examples:
            >>> dispatcher = EventDispatcher()
            >>> @dispatcher.on_error
            ... async def report(exc, event):
            ...     return None
            ...
            >>> report.__name__
            'report'
        """
        self._error_handler = handler
        return handler

    async def dispatch(self, event: Event) -> dict | None:
        r"""
        将事件分发给匹配的处理函数并返回结果。

        先执行与 [event.event_type][feishu.events.envelope.Event.event_type] 精确匹配的处理函数,
        再执行以 `"*"` 注册的兜底处理函数。若配置了去重存储且事件已处理过,则直接返回 `None`,
        不执行任何处理函数。

        单个处理函数抛出的异常会被捕获、记录,并交由可选的全局异常处理函数处理,随后继续执行其余
        处理函数,确保一个出错的处理函数不会中断其他处理函数或丢失卡片回调的 ACK。

        返回值取第一个非 `None` 的处理函数结果,通常用于卡片回调返回 `{"toast": ..., "card": ...}`。

        Args:
            event: 待分发的事件。

        Returns:
            首个非 `None` 的处理函数返回值;若已被去重或所有处理函数均返回 `None`,则返回 `None`。

        Examples:
            >>> import asyncio
            >>> from feishu.events.envelope import Event
            >>> dispatcher = EventDispatcher()
            >>> @dispatcher.on("card.action.trigger")
            ... async def on_card(event):
            ...     return {"toast": {"type": "success", "content": "ok"}}
            ...
            >>> event = Event.from_payload(
            ...     {"schema": "2.0", "header": {"event_type": "card.action.trigger", "event_id": "c1"}, "event": {}}
            ... )
            >>> asyncio.run(dispatcher.dispatch(event))
            {'toast': {'type': 'success', 'content': 'ok'}}
        """
        if self._seen_store is not None and not await claim(self._seen_store, event.event_id):
            return None

        handlers = list(self._handlers.get(event.event_type, ()))
        handlers += list(self._handlers.get("*", ()))
        result: dict[str, Any] | None = None
        for handler in handlers:
            try:
                value = await handler(event)
            except Exception as exc:  # noqa: BLE001 - isolate: one failing handler must not abort the rest
                value = await self._on_handler_error(exc, event)
            if value is not None and result is None:
                result = cast(dict, value)
        return result

    async def _on_handler_error(self, exc: Exception, event: Event) -> object:
        r"""记录处理函数异常;若注册了全局异常处理函数则调用它,并可由其返回兜底结果。"""
        self._logger.error("event handler failed for %s (%s)", event.event_type, event.event_id, exc_info=exc)
        if self._error_handler is None:
            return None
        try:
            return await self._error_handler(exc, event)
        except Exception:  # noqa: BLE001 - the error handler itself must never break dispatch
            self._logger.exception("event error-handler failed for %s", event.event_type)
            return None

on

Python
on(event_type: str) -> Callable[[Handler], Handler]

注册指定事件类型的处理函数(装饰器)。

同一类型可注册多个处理函数,按注册顺序执行。使用 "*" 注册兜底处理函数, 它会在所有事件的精确匹配处理函数之后执行。

参数:

名称 类型 描述 默认
event_type
str

事件类型,如 im.message.receive_v1"*" 表示匹配全部。

必需

返回:

类型 描述
Callable[[Handler], Handler]

接收处理函数并将其注册的装饰器;原函数会被原样返回。

示例:

Python Console Session
1
2
3
4
5
6
7
>>> dispatcher = EventDispatcher()
>>> @dispatcher.on("*")
... async def fallback(event):
...     return None
...
>>> fallback.__name__
'fallback'
源代码位于: feishu/events/dispatcher.py
Python
def on(self, event_type: str) -> Callable[[Handler], Handler]:
    r"""
    注册指定事件类型的处理函数(装饰器)。

    同一类型可注册多个处理函数,按注册顺序执行。使用 `"*"` 注册兜底处理函数,
    它会在所有事件的精确匹配处理函数之后执行。

    Args:
        event_type: 事件类型,如 `im.message.receive_v1`;`"*"` 表示匹配全部。

    Returns:
        接收处理函数并将其注册的装饰器;原函数会被原样返回。

    Examples:
        >>> dispatcher = EventDispatcher()
        >>> @dispatcher.on("*")
        ... async def fallback(event):
        ...     return None
        ...
        >>> fallback.__name__
        'fallback'
    """

    def register(handler: Handler) -> Handler:
        self._handlers.setdefault(event_type, []).append(handler)
        return handler

    return register

on_error

Python
on_error(handler: ErrorHandler) -> ErrorHandler

注册全局异常处理函数(仿 Slack Bolt 的 @app.error)。

当任一事件处理函数抛出异常时,框架会先记录日志,再调用此处理函数用于集中上报或返回兜底结果。 处理函数接收异常与对应事件;若其返回非 None 值且本次分发尚无其他结果,则作为返回值 (例如卡片回调可借此返回错误 toast)。后注册的会覆盖先注册的。

参数:

名称 类型 描述 默认
handler
ErrorHandler

形如 async def(exc: Exception, event: Event) -> dict | None 的异常处理函数。

必需

返回:

类型 描述
ErrorHandler

原处理函数(便于作装饰器使用)。

示例:

Python Console Session
1
2
3
4
5
6
7
>>> dispatcher = EventDispatcher()
>>> @dispatcher.on_error
... async def report(exc, event):
...     return None
...
>>> report.__name__
'report'
源代码位于: feishu/events/dispatcher.py
Python
def on_error(self, handler: ErrorHandler) -> ErrorHandler:
    r"""
    注册全局异常处理函数(仿 Slack Bolt 的 `@app.error`)。

    当任一事件处理函数抛出异常时,框架会先记录日志,再调用此处理函数用于集中上报或返回兜底结果。
    处理函数接收异常与对应事件;若其返回非 `None` 值且本次分发尚无其他结果,则作为返回值
    (例如卡片回调可借此返回错误 toast)。后注册的会覆盖先注册的。

    Args:
        handler: 形如 `async def(exc: Exception, event: Event) -> dict | None` 的异常处理函数。

    Returns:
        原处理函数(便于作装饰器使用)。

    Examples:
        >>> dispatcher = EventDispatcher()
        >>> @dispatcher.on_error
        ... async def report(exc, event):
        ...     return None
        ...
        >>> report.__name__
        'report'
    """
    self._error_handler = handler
    return handler

dispatch async

Python
dispatch(event: Event) -> dict | None

将事件分发给匹配的处理函数并返回结果。

先执行与 event.event_type 精确匹配的处理函数, 再执行以 "*" 注册的兜底处理函数。若配置了去重存储且事件已处理过,则直接返回 None, 不执行任何处理函数。

单个处理函数抛出的异常会被捕获、记录,并交由可选的全局异常处理函数处理,随后继续执行其余 处理函数,确保一个出错的处理函数不会中断其他处理函数或丢失卡片回调的 ACK。

返回值取第一个非 None 的处理函数结果,通常用于卡片回调返回 {"toast": ..., "card": ...}

参数:

名称 类型 描述 默认
event
Event

待分发的事件。

必需

返回:

类型 描述
dict | None

首个非 None 的处理函数返回值;若已被去重或所有处理函数均返回 None,则返回 None

示例:

Python Console Session
>>> import asyncio
>>> from feishu.events.envelope import Event
>>> dispatcher = EventDispatcher()
>>> @dispatcher.on("card.action.trigger")
... async def on_card(event):
...     return {"toast": {"type": "success", "content": "ok"}}
...
>>> event = Event.from_payload(
...     {"schema": "2.0", "header": {"event_type": "card.action.trigger", "event_id": "c1"}, "event": {}}
... )
>>> asyncio.run(dispatcher.dispatch(event))
{'toast': {'type': 'success', 'content': 'ok'}}
源代码位于: feishu/events/dispatcher.py
Python
async def dispatch(self, event: Event) -> dict | None:
    r"""
    将事件分发给匹配的处理函数并返回结果。

    先执行与 [event.event_type][feishu.events.envelope.Event.event_type] 精确匹配的处理函数,
    再执行以 `"*"` 注册的兜底处理函数。若配置了去重存储且事件已处理过,则直接返回 `None`,
    不执行任何处理函数。

    单个处理函数抛出的异常会被捕获、记录,并交由可选的全局异常处理函数处理,随后继续执行其余
    处理函数,确保一个出错的处理函数不会中断其他处理函数或丢失卡片回调的 ACK。

    返回值取第一个非 `None` 的处理函数结果,通常用于卡片回调返回 `{"toast": ..., "card": ...}`。

    Args:
        event: 待分发的事件。

    Returns:
        首个非 `None` 的处理函数返回值;若已被去重或所有处理函数均返回 `None`,则返回 `None`。

    Examples:
        >>> import asyncio
        >>> from feishu.events.envelope import Event
        >>> dispatcher = EventDispatcher()
        >>> @dispatcher.on("card.action.trigger")
        ... async def on_card(event):
        ...     return {"toast": {"type": "success", "content": "ok"}}
        ...
        >>> event = Event.from_payload(
        ...     {"schema": "2.0", "header": {"event_type": "card.action.trigger", "event_id": "c1"}, "event": {}}
        ... )
        >>> asyncio.run(dispatcher.dispatch(event))
        {'toast': {'type': 'success', 'content': 'ok'}}
    """
    if self._seen_store is not None and not await claim(self._seen_store, event.event_id):
        return None

    handlers = list(self._handlers.get(event.event_type, ()))
    handlers += list(self._handlers.get("*", ()))
    result: dict[str, Any] | None = None
    for handler in handlers:
        try:
            value = await handler(event)
        except Exception as exc:  # noqa: BLE001 - isolate: one failing handler must not abort the rest
            value = await self._on_handler_error(exc, event)
        if value is not None and result is None:
            result = cast(dict, value)
    return result

Event

飞书事件载荷的统一视图,同时兼容 1.0 与 2.0 两种事件格式。

飞书早期(1.0)事件将类型、uuidtstoken 放在顶层,事件正文在 event 中; 新版(2.0)事件则将这些元信息收敛到 header 字段。Event 屏蔽了二者差异, 使调用方无需关心具体 schema 即可读取 event_typeevent_id 等字段。

通常无需直接构造,应使用 from_payload 由原始载荷推断。

飞书文档

事件格式

示例:

Python Console Session
1
2
3
4
5
6
7
>>> ev = Event.from_payload(
...     {"schema": "2.0", "header": {"event_type": "im.message.receive_v1", "event_id": "evt_2"}, "event": {}}
... )
>>> ev.schema_version
'2.0'
>>> ev.event_type
'im.message.receive_v1'
源代码位于: feishu/events/envelope.py
Python
class Event:
    r"""
    飞书事件载荷的统一视图,同时兼容 1.0 与 2.0 两种事件格式。

    飞书早期(1.0)事件将类型、`uuid`、`ts`、`token` 放在顶层,事件正文在 `event` 中;
    新版(2.0)事件则将这些元信息收敛到 `header` 字段。`Event` 屏蔽了二者差异,
    使调用方无需关心具体 schema 即可读取 [event_type][feishu.events.envelope.Event.event_type]、
    [event_id][feishu.events.envelope.Event.event_id] 等字段。

    通常无需直接构造,应使用 [from_payload][feishu.events.envelope.Event.from_payload] 由原始载荷推断。

    飞书文档:
        [事件格式](https://open.feishu.cn/document/server-docs/event-subscription-guide/overview)

    Examples:
        >>> ev = Event.from_payload(
        ...     {"schema": "2.0", "header": {"event_type": "im.message.receive_v1", "event_id": "evt_2"}, "event": {}}
        ... )
        >>> ev.schema_version
        '2.0'
        >>> ev.event_type
        'im.message.receive_v1'
    """

    __slots__ = ("_raw", "_schema_version")

    def __init__(self, raw: NestedDict, schema_version: str):
        self._raw = raw
        self._schema_version = schema_version

    @classmethod
    def from_payload(cls, payload: dict[str, Any]) -> Event:
        r"""
        从原始事件载荷构造 [Event][feishu.events.envelope.Event] 并自动推断 schema 版本。

        当载荷含有 `"schema": "2.0"` 或顶层存在 `header` 字段时判定为 2.0,否则视为 1.0。
        传入的 `dict` 会被包装为 `NestedDict` 以支持点号取值。

        Args:
            payload: 解密、解析后的事件载荷字典。

        Returns:
            统一封装后的事件对象。

        Examples:
            >>> Event.from_payload({"uuid": "u_1", "event": {"type": "message"}}).schema_version
            '1.0'
            >>> Event.from_payload({"header": {"event_type": "card.action.trigger"}, "event": {}}).schema_version
            '2.0'
        """
        raw = payload if isinstance(payload, NestedDict) else NestedDict(payload)
        is_2_0 = raw.get("schema") == "2.0" or "header" in raw
        return cls(raw, "2.0" if is_2_0 else "1.0")

    @property
    def schema_version(self) -> str:
        r"""
        事件的 schema 版本,`"1.0"` 或 `"2.0"`。
        """
        return self._schema_version

    @property
    def raw(self) -> NestedDict:
        r"""
        未经处理的原始事件载荷,便于访问本类未封装的字段。
        """
        return self._raw

    @property
    def _header(self) -> NestedDict:
        header = self._raw.get("header")
        return header if isinstance(header, NestedDict) else NestedDict(header or {})

    @property
    def _event(self) -> NestedDict:
        event = self._raw.get("event")
        return event if isinstance(event, NestedDict) else NestedDict(event or {})

    @property
    def event_type(self) -> str:
        r"""
        事件类型,例如 `im.message.receive_v1`。

        2.0 取自 `header.event_type`,1.0 取自 `event.type`;缺失时返回空字符串。
        """
        if self._schema_version == "2.0":
            return self._header.get("event_type", "")
        return self._event.get("type", "")

    @property
    def event_id(self) -> str:
        r"""
        事件唯一标识,用于去重。

        2.0 取自 `header.event_id`,1.0 取自顶层 `uuid`。始终返回 `str`,缺失时返回空字符串
        而非 `None`,因此调用方无需进行 `None` 判空。
        """
        if self._schema_version == "2.0":
            return self._header.get("event_id", "")
        return self._raw.get("uuid", "")

    @property
    def create_time(self) -> str | None:
        r"""
        事件产生时间戳(毫秒,字符串形式)。

        2.0 取自 `header.create_time`,1.0 取自顶层 `ts`;缺失时返回 `None`。
        """
        if self._schema_version == "2.0":
            return self._header.get("create_time")
        return self._raw.get("ts")

    @property
    def tenant_key(self) -> str | None:
        r"""
        租户标识,仅 2.0 事件可用;1.0 事件恒为 `None`。
        """
        return self._header.get("tenant_key") if self._schema_version == "2.0" else None

    @property
    def app_id(self) -> str | None:
        r"""
        触发事件的应用 ID,仅 2.0 事件可用;1.0 事件恒为 `None`。
        """
        return self._header.get("app_id") if self._schema_version == "2.0" else None

    @property
    def token(self) -> str | None:
        r"""
        事件头中的校验 Token(Verification Token)。

        用于事件来源校验,并非更新卡片所需的凭证。2.0 取自 `header.token`,1.0 取自顶层 `token`。
        """
        if self._schema_version == "2.0":
            return self._header.get("token")
        return self._raw.get("token")

    @property
    def body(self) -> NestedDict:
        r"""
        事件正文,即载荷中的 `event` 字段。

        返回值始终是可安全索引的 `NestedDict`:即使原始载荷缺少 `event` 字段,也会返回空字典,
        调用方可直接 `.get()` 而不会触发 `KeyError`。

        Examples:
            >>> ev = Event.from_payload({"schema": "2.0", "header": {"event_type": "x"}})
            >>> ev.body.get("anything") is None
            True
        """
        return self._event

schema_version property

Python
schema_version: str

事件的 schema 版本,"1.0""2.0"

raw property

Python
raw: NestedDict

未经处理的原始事件载荷,便于访问本类未封装的字段。

event_type property

Python
event_type: str

事件类型,例如 im.message.receive_v1

2.0 取自 header.event_type,1.0 取自 event.type;缺失时返回空字符串。

event_id property

Python
event_id: str

事件唯一标识,用于去重。

2.0 取自 header.event_id,1.0 取自顶层 uuid。始终返回 str,缺失时返回空字符串 而非 None,因此调用方无需进行 None 判空。

create_time property

Python
create_time: str | None

事件产生时间戳(毫秒,字符串形式)。

2.0 取自 header.create_time,1.0 取自顶层 ts;缺失时返回 None

tenant_key property

Python
tenant_key: str | None

租户标识,仅 2.0 事件可用;1.0 事件恒为 None

app_id property

Python
app_id: str | None

触发事件的应用 ID,仅 2.0 事件可用;1.0 事件恒为 None

token property

Python
token: str | None

事件头中的校验 Token(Verification Token)。

用于事件来源校验,并非更新卡片所需的凭证。2.0 取自 header.token,1.0 取自顶层 token

body property

Python
body: NestedDict

事件正文,即载荷中的 event 字段。

返回值始终是可安全索引的 NestedDict:即使原始载荷缺少 event 字段,也会返回空字典, 调用方可直接 .get() 而不会触发 KeyError

示例:

Python Console Session
1
2
3
>>> ev = Event.from_payload({"schema": "2.0", "header": {"event_type": "x"}})
>>> ev.body.get("anything") is None
True

from_payload classmethod

Python
from_payload(payload: dict[str, Any]) -> Event

从原始事件载荷构造 Event 并自动推断 schema 版本。

当载荷含有 "schema": "2.0" 或顶层存在 header 字段时判定为 2.0,否则视为 1.0。 传入的 dict 会被包装为 NestedDict 以支持点号取值。

参数:

名称 类型 描述 默认
payload
dict[str, Any]

解密、解析后的事件载荷字典。

必需

返回:

类型 描述
Event

统一封装后的事件对象。

示例:

Python Console Session
1
2
3
4
>>> Event.from_payload({"uuid": "u_1", "event": {"type": "message"}}).schema_version
'1.0'
>>> Event.from_payload({"header": {"event_type": "card.action.trigger"}, "event": {}}).schema_version
'2.0'
源代码位于: feishu/events/envelope.py
Python
@classmethod
def from_payload(cls, payload: dict[str, Any]) -> Event:
    r"""
    从原始事件载荷构造 [Event][feishu.events.envelope.Event] 并自动推断 schema 版本。

    当载荷含有 `"schema": "2.0"` 或顶层存在 `header` 字段时判定为 2.0,否则视为 1.0。
    传入的 `dict` 会被包装为 `NestedDict` 以支持点号取值。

    Args:
        payload: 解密、解析后的事件载荷字典。

    Returns:
        统一封装后的事件对象。

    Examples:
        >>> Event.from_payload({"uuid": "u_1", "event": {"type": "message"}}).schema_version
        '1.0'
        >>> Event.from_payload({"header": {"event_type": "card.action.trigger"}, "event": {}}).schema_version
        '2.0'
    """
    raw = payload if isinstance(payload, NestedDict) else NestedDict(payload)
    is_2_0 = raw.get("schema") == "2.0" or "header" in raw
    return cls(raw, "2.0" if is_2_0 else "1.0")

InMemorySeenStore

基于进程内存、带 TTL 的 SeenStore 实现。

将已处理的 event_id 连同过期时间存入字典,每次访问时清理过期项。适合单进程部署与测试; 多副本部署时各进程内存互不共享,应改用基于共享存储的实现。

参数:

名称 类型 描述 默认

ttl

float

记录的存活时长(秒),超过后视为未见过。默认 3600

3600.0

now

Callable[[], float]

单调时钟函数,默认 time.monotonic,可注入以便测试。

monotonic

示例:

Python Console Session
1
2
3
4
5
6
7
>>> import asyncio
>>> store = InMemorySeenStore()
>>> asyncio.run(store.seen("evt_1"))
False
>>> asyncio.run(store.mark("evt_1"))
>>> asyncio.run(store.seen("evt_1"))
True
源代码位于: feishu/events/idempotency.py
Python
class InMemorySeenStore:
    r"""
    基于进程内存、带 TTL 的 [SeenStore][feishu.events.idempotency.SeenStore] 实现。

    将已处理的 `event_id` 连同过期时间存入字典,每次访问时清理过期项。适合单进程部署与测试;
    多副本部署时各进程内存互不共享,应改用基于共享存储的实现。

    Args:
        ttl: 记录的存活时长(秒),超过后视为未见过。默认 `3600`。
        now: 单调时钟函数,默认 `time.monotonic`,可注入以便测试。

    Examples:
        >>> import asyncio
        >>> store = InMemorySeenStore()
        >>> asyncio.run(store.seen("evt_1"))
        False
        >>> asyncio.run(store.mark("evt_1"))
        >>> asyncio.run(store.seen("evt_1"))
        True
    """

    def __init__(self, ttl: float = 3600.0, *, now: Callable[[], float] = time.monotonic) -> None:
        self._ttl = ttl
        self._now = now
        self._store: dict[str, float] = {}
        self._lock = asyncio.Lock()

    async def add(self, event_id: str) -> bool:
        r"""
        原子地认领 `event_id`:此前未标记(或已过期)则标记并返回 `True`,否则返回 `False`。

        将 `seen` 检查与 `mark` 标记合并在同一把锁内完成,消除二者之间的检查-标记竞态——
        并发投递的重复事件中只有一个会得到 `True`。

        Args:
            event_id: 待认领的事件标识。

        Returns:
            首次认领返回 `True`(应处理),重复返回 `False`(应跳过)。
        """
        async with self._lock:
            self._purge()
            if event_id in self._store:
                return False
            self._store[event_id] = self._now() + self._ttl
            return True

    async def seen(self, event_id: str) -> bool:
        r"""
        查询 `event_id` 是否在 TTL 内被标记过。

        Args:
            event_id: 待查询的事件标识。

        Returns:
            已标记且未过期返回 `True`,否则返回 `False`。
        """
        async with self._lock:
            self._purge()
            return event_id in self._store

    async def mark(self, event_id: str) -> None:
        r"""
        标记 `event_id` 为已处理,并按 TTL 设置过期时间。

        Args:
            event_id: 待标记的事件标识。
        """
        async with self._lock:
            self._purge()
            self._store[event_id] = self._now() + self._ttl

    def _purge(self) -> None:
        now = self._now()
        expired = [k for k, exp in self._store.items() if exp <= now]
        for k in expired:
            del self._store[k]

add async

Python
add(event_id: str) -> bool

原子地认领 event_id:此前未标记(或已过期)则标记并返回 True,否则返回 False

seen 检查与 mark 标记合并在同一把锁内完成,消除二者之间的检查-标记竞态—— 并发投递的重复事件中只有一个会得到 True

参数:

名称 类型 描述 默认
event_id
str

待认领的事件标识。

必需

返回:

类型 描述
bool

首次认领返回 True(应处理),重复返回 False(应跳过)。

源代码位于: feishu/events/idempotency.py
Python
async def add(self, event_id: str) -> bool:
    r"""
    原子地认领 `event_id`:此前未标记(或已过期)则标记并返回 `True`,否则返回 `False`。

    将 `seen` 检查与 `mark` 标记合并在同一把锁内完成,消除二者之间的检查-标记竞态——
    并发投递的重复事件中只有一个会得到 `True`。

    Args:
        event_id: 待认领的事件标识。

    Returns:
        首次认领返回 `True`(应处理),重复返回 `False`(应跳过)。
    """
    async with self._lock:
        self._purge()
        if event_id in self._store:
            return False
        self._store[event_id] = self._now() + self._ttl
        return True

seen async

Python
seen(event_id: str) -> bool

查询 event_id 是否在 TTL 内被标记过。

参数:

名称 类型 描述 默认
event_id
str

待查询的事件标识。

必需

返回:

类型 描述
bool

已标记且未过期返回 True,否则返回 False

源代码位于: feishu/events/idempotency.py
Python
async def seen(self, event_id: str) -> bool:
    r"""
    查询 `event_id` 是否在 TTL 内被标记过。

    Args:
        event_id: 待查询的事件标识。

    Returns:
        已标记且未过期返回 `True`,否则返回 `False`。
    """
    async with self._lock:
        self._purge()
        return event_id in self._store

mark async

Python
mark(event_id: str) -> None

标记 event_id 为已处理,并按 TTL 设置过期时间。

参数:

名称 类型 描述 默认
event_id
str

待标记的事件标识。

必需
源代码位于: feishu/events/idempotency.py
Python
async def mark(self, event_id: str) -> None:
    r"""
    标记 `event_id` 为已处理,并按 TTL 设置过期时间。

    Args:
        event_id: 待标记的事件标识。
    """
    async with self._lock:
        self._purge()
        self._store[event_id] = self._now() + self._ttl

SeenStore

Bases: Protocol

事件去重存储的协议接口。

飞书在投递超时时会重试推送,导致同一 event_id 多次到达。实现本协议即可为接收器 (create_event_routecreate_card_route)或 EventDispatcher 提供幂等保证。 通过 InMemorySeenStore 即得到一个内置实现, 生产环境可改用基于 Redis 等共享存储的实现。

本协议使用 runtime_checkable,可用 isinstance 进行结构化校验。

飞书文档

接收事件

源代码位于: feishu/events/idempotency.py
Python
@runtime_checkable
class SeenStore(Protocol):
    r"""
    事件去重存储的协议接口。

    飞书在投递超时时会重试推送,导致同一 `event_id` 多次到达。实现本协议即可为接收器
    ([create_event_route][feishu.events.receiver.create_event_route]、
    [create_card_route][feishu.events.receiver.create_card_route])或
    [EventDispatcher][feishu.events.dispatcher.EventDispatcher] 提供幂等保证。
    通过 [InMemorySeenStore][feishu.events.idempotency.InMemorySeenStore] 即得到一个内置实现,
    生产环境可改用基于 Redis 等共享存储的实现。

    本协议使用 `runtime_checkable`,可用 `isinstance` 进行结构化校验。

    飞书文档:
        [接收事件](https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscription-configure-/request-url-configuration-case)
    """

    async def seen(self, event_id: str) -> bool:
        r"""
        查询 `event_id` 是否已被处理过。
        """
        ...

    async def mark(self, event_id: str) -> None:
        r"""
        将 `event_id` 标记为已处理。
        """
        ...

seen async

Python
seen(event_id: str) -> bool

查询 event_id 是否已被处理过。

源代码位于: feishu/events/idempotency.py
Python
async def seen(self, event_id: str) -> bool:
    r"""
    查询 `event_id` 是否已被处理过。
    """
    ...

mark async

Python
mark(event_id: str) -> None

event_id 标记为已处理。

源代码位于: feishu/events/idempotency.py
Python
async def mark(self, event_id: str) -> None:
    r"""
    将 `event_id` 标记为已处理。
    """
    ...

verify_signature

Python
verify_signature(timestamp: str, nonce: str, encrypt_key: str, raw_body: bytes, signature: str) -> bool

校验飞书事件推送的签名。

飞书在请求头中携带 X-Lark-Request-TimestampX-Lark-Request-NonceX-Lark-Signature。 签名为 sha256(timestamp + nonce + encrypt_key + raw_body) 的十六进制摘要。 本函数使用常量时间比较以避免计时攻击。

参数:

名称 类型 描述 默认

timestamp

str

请求头 X-Lark-Request-Timestamp 的值。

必需

nonce

str

请求头 X-Lark-Request-Nonce 的值。

必需

encrypt_key

str

应用配置的 Encrypt Key。

必需

raw_body

bytes

HTTP 请求的原始字节体(必须在解析 JSON 之前读取,不能被改动)。

必需

signature

str

请求头 X-Lark-Signature 的值。

必需

返回:

类型 描述
bool

签名匹配返回 True,否则返回 False

飞书文档

配置加密推送

示例:

Python Console Session
1
2
3
4
5
6
7
>>> import hashlib
>>> ts, nonce, key, body = "1700000000", "abc123", "ek_secret", b'{"encrypt":"payload"}'
>>> sig = hashlib.sha256((ts + nonce + key).encode("utf-8") + body).hexdigest()
>>> verify_signature(ts, nonce, key, body, sig)
True
>>> verify_signature(ts, nonce, key, b'{"encrypt":"TAMPERED"}', sig)
False
源代码位于: feishu/signature.py
Python
def verify_signature(timestamp: str, nonce: str, encrypt_key: str, raw_body: bytes, signature: str) -> bool:
    r"""
    校验飞书事件推送的签名。

    飞书在请求头中携带 `X-Lark-Request-Timestamp`、`X-Lark-Request-Nonce` 与 `X-Lark-Signature`。
    签名为 `sha256(timestamp + nonce + encrypt_key + raw_body)` 的十六进制摘要。
    本函数使用常量时间比较以避免计时攻击。

    Args:
        timestamp: 请求头 `X-Lark-Request-Timestamp` 的值。
        nonce: 请求头 `X-Lark-Request-Nonce` 的值。
        encrypt_key: 应用配置的 Encrypt Key。
        raw_body: HTTP 请求的原始字节体(必须在解析 JSON 之前读取,不能被改动)。
        signature: 请求头 `X-Lark-Signature` 的值。

    Returns:
        签名匹配返回 `True`,否则返回 `False`。

    飞书文档:
        [配置加密推送](https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscriptions/encrypt-key-encryption-configuration-case)

    Examples:
        >>> import hashlib
        >>> ts, nonce, key, body = "1700000000", "abc123", "ek_secret", b'{"encrypt":"payload"}'
        >>> sig = hashlib.sha256((ts + nonce + key).encode("utf-8") + body).hexdigest()
        >>> verify_signature(ts, nonce, key, body, sig)
        True
        >>> verify_signature(ts, nonce, key, b'{"encrypt":"TAMPERED"}', sig)
        False
    """
    expected = hashlib.sha256((timestamp + nonce + encrypt_key).encode("utf-8") + raw_body).hexdigest()
    return hmac.compare_digest(expected, signature)

decrypt

Python
decrypt(encrypt_key: str, b64_ciphertext: str) -> bytes

解密飞书事件的 encrypt 密文。

飞书在开启加密推送后,会将事件体加密为一段 Base64 字符串放在 encrypt 字段中。 其算法为 AES-256-CBC:密钥取 sha256(encrypt_key),前 16 字节为 IV,采用 PKCS7 填充。

参数:

名称 类型 描述 默认

encrypt_key

str

应用在飞书开放平台配置的 Encrypt Key。

必需

b64_ciphertext

str

encrypt 字段中的 Base64 密文。

必需

返回:

类型 描述
bytes

解密并去除填充后的原始明文字节(通常是一段 JSON)。

引发:

类型 描述
[feishu.errors.FeishuCryptoError][]

当 Base64 非法、密文长度不足一个 AES 块, 或密钥错误导致去填充失败时抛出。

飞书文档

订阅事件

示例:

Python Console Session
1
2
3
4
5
6
7
>>> decrypt("test key", "P37w+VZImNgPEO1RBhJ6RtKl7n6zymIbEG1pReEzghk=")
b'hello world'
>>> try:
...     decrypt("test key", "!!!not base64!!!")
... except FeishuCryptoError as exc:
...     print(type(exc).__name__)
FeishuCryptoError
源代码位于: feishu/events/crypto.py
Python
def decrypt(encrypt_key: str, b64_ciphertext: str) -> bytes:
    r"""
    解密飞书事件的 `encrypt` 密文。

    飞书在开启加密推送后,会将事件体加密为一段 Base64 字符串放在 `encrypt` 字段中。
    其算法为 AES-256-CBC:密钥取 `sha256(encrypt_key)`,前 16 字节为 IV,采用 PKCS7 填充。

    Args:
        encrypt_key: 应用在飞书开放平台配置的 Encrypt Key。
        b64_ciphertext: `encrypt` 字段中的 Base64 密文。

    Returns:
        解密并去除填充后的原始明文字节(通常是一段 JSON)。

    Raises:
        [feishu.errors.FeishuCryptoError][]: 当 Base64 非法、密文长度不足一个 AES 块,
            或密钥错误导致去填充失败时抛出。

    飞书文档:
        [订阅事件](https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscriptions/encrypt-key-encryption-configuration-case)

    Examples:
        >>> decrypt("test key", "P37w+VZImNgPEO1RBhJ6RtKl7n6zymIbEG1pReEzghk=")
        b'hello world'
        >>> try:
        ...     decrypt("test key", "!!!not base64!!!")
        ... except FeishuCryptoError as exc:
        ...     print(type(exc).__name__)
        FeishuCryptoError
    """
    key = hashlib.sha256(encrypt_key.encode("utf-8")).digest()
    try:
        blob = base64.b64decode(b64_ciphertext)
    except ValueError as exc:
        raise FeishuCryptoError(-1, f"invalid base64 ciphertext: {exc}") from exc
    if len(blob) < 16:
        raise FeishuCryptoError(-1, "ciphertext shorter than one AES block")
    iv, ct = blob[:16], blob[16:]
    try:
        plain = AES.new(key, AES.MODE_CBC, iv).decrypt(ct)
        return unpad(plain, AES.block_size)
    except ValueError as exc:
        raise FeishuCryptoError(-1, f"decrypt/unpad failed: {exc}") from exc

create_card_route

Python
create_card_route(dispatcher: EventDispatcher, *, path: str = '/feishu/card', encrypt_key: str | None = None, verification_token: str | None = None, seen_store: Any = _DEFAULT_SEEN_STORE, max_age_seconds: float | None = 300, now: Callable[[], float] = time) -> Route

创建处理飞书卡片交互回调的 Starlette POST 路由。

create_event_route 不同,本路由会 **同步**等待分发器执行(不使用后台任务),并将处理函数返回的 {toast, card} 字典 作为同步 JSON 响应返回,以满足飞书对卡片交互约 3 秒的响应时限。当处理函数返回 None 时, 响应为 200 {}

安全模型与 create_event_route 一致:

  • url_verification 握手仅通过内层 verification_token 鉴权(签名豁免在此生效)。
  • 其余事件在配置了 encrypt_key 时,必须携带并经 SignatureVerifier 通过 X-Lark-Signature 校验,且时间戳须在 max_age_seconds 时间窗内(防重放);缺失、过期或非法签名将返回 401,且处理函数不会被调用。

由于飞书在超时时会重试卡片回调,命中 seen_store 的重复事件会直接返回 {},避免重复触发副作用。

参数:

名称 类型 描述 默认

dispatcher

EventDispatcher

事件分发器。

必需

path

str

路由路径,默认 /feishu/card

'/feishu/card'

encrypt_key

str | None

应用配置的 Encrypt Key;设置后启用解密与签名强校验。

None

verification_token

str | None

握手校验 Token;设置后会校验 url_verification 的内层 token。

None

seen_store

Any

事件去重存储;缺省时使用新建的 InMemorySeenStore, 从而开箱即用地提供去重/防重放保护;显式传入 None 可关闭去重, 也可传入自定义实现。

_DEFAULT_SEEN_STORE

max_age_seconds

float | None

签名请求允许的最大时延(秒),默认 300;用于拒绝过期的 重放请求。设为 None 可关闭新鲜度校验(但绝不会跳过 MAC 校验)。

300

now

Callable[[], float]

返回当前 epoch 秒的可调用对象,默认 time.time;可注入以编写确定性测试。

time

返回:

类型 描述
Route

可挂载到 Starlette 应用的 [Route][starlette.routing.Route]。

飞书文档

卡片回传交互

示例:

Python Console Session
>>> dispatcher = EventDispatcher()
>>> route = create_card_route(dispatcher, encrypt_key="ek_secret")
源代码位于: feishu/events/receiver.py
Python
def create_card_route(
    dispatcher: EventDispatcher,
    *,
    path: str = "/feishu/card",
    encrypt_key: str | None = None,
    verification_token: str | None = None,
    seen_store: Any = _DEFAULT_SEEN_STORE,
    max_age_seconds: float | None = 300,
    now: Callable[[], float] = time.time,
) -> Route:
    r"""
    创建处理飞书卡片交互回调的 Starlette POST 路由。

    与 [create_event_route][feishu.events.receiver.create_event_route] 不同,本路由会
    **同步**等待分发器执行(不使用后台任务),并将处理函数返回的 `{toast, card}` 字典
    作为同步 JSON 响应返回,以满足飞书对卡片交互约 3 秒的响应时限。当处理函数返回 `None` 时,
    响应为 `200 {}`。

    安全模型与 [create_event_route][feishu.events.receiver.create_event_route] 一致:

    * `url_verification` 握手仅通过内层 `verification_token` 鉴权(签名豁免在此生效)。
    * 其余事件在配置了 `encrypt_key` 时,必须携带并经
      [SignatureVerifier][feishu.signature.SignatureVerifier] 通过 `X-Lark-Signature`
      校验,且时间戳须在 `max_age_seconds` 时间窗内(防重放);缺失、过期或非法签名将返回
      401,且处理函数不会被调用。

    由于飞书在超时时会重试卡片回调,命中 `seen_store` 的重复事件会直接返回 `{}`,避免重复触发副作用。

    Args:
        dispatcher: 事件分发器。
        path: 路由路径,默认 `/feishu/card`。
        encrypt_key: 应用配置的 Encrypt Key;设置后启用解密与签名强校验。
        verification_token: 握手校验 Token;设置后会校验 `url_verification` 的内层 token。
        seen_store: 事件去重存储;缺省时使用新建的
            [InMemorySeenStore][feishu.events.idempotency.InMemorySeenStore],
            从而开箱即用地提供去重/防重放保护;显式传入 `None` 可关闭去重,
            也可传入自定义实现。
        max_age_seconds: 签名请求允许的最大时延(秒),默认 `300`;用于拒绝过期的
            重放请求。设为 `None` 可关闭新鲜度校验(但绝不会跳过 MAC 校验)。
        now: 返回当前 epoch 秒的可调用对象,默认 [time.time][];可注入以编写确定性测试。

    Returns:
        可挂载到 Starlette 应用的 [Route][starlette.routing.Route]。

    飞书文档:
        [卡片回传交互](https://open.feishu.cn/document/server-docs/im-v1/message-card/handle-card-actions)

    Examples:
        >>> dispatcher = EventDispatcher()
        >>> route = create_card_route(dispatcher, encrypt_key="ek_secret")  # doctest: +SKIP
    """
    store = _resolve_seen_store(seen_store)
    verifier = _build_verifier(encrypt_key, max_age_seconds, now)

    async def endpoint(request: Request) -> Response:
        raw = await request.body()
        result = await _read_payload(request, raw, encrypt_key, verifier)
        if isinstance(result, Response):
            return result
        payload, sig_verified = result

        if payload.get("type") == "url_verification":
            # Handshake: authenticated by inner token, not by MAC signature.
            if verification_token is not None and not hmac.compare_digest(
                str(payload.get("token", "")), verification_token
            ):
                return JSONResponse({"msg": "token mismatch"}, status_code=401)
            return JSONResponse({"challenge": payload.get("challenge")})

        # Real card events require MAC authentication when encrypt_key is configured.
        if encrypt_key is not None and not sig_verified:
            return JSONResponse({"msg": "signature required"}, status_code=401)

        event = Event.from_payload(payload)
        # Feishu retries card-action callbacks on timeout; returning {} prevents re-running side effects.
        if store is not None and not await claim(store, event.event_id):
            return JSONResponse({})

        handler_result = await dispatcher.dispatch(event)
        return JSONResponse(handler_result if handler_result is not None else {})

    return Route(path, endpoint, methods=["POST"])

create_event_app

Python
create_event_app(dispatcher: EventDispatcher, *, event_path: str = '/feishu/event', card_path: str | None = '/feishu/card', encrypt_key: str | None = None, verification_token: str | None = None, seen_store: Any = _DEFAULT_SEEN_STORE, card_seen_store: Any = _DEFAULT_SEEN_STORE, max_age_seconds: float | None = 300, now: Callable[[], float] = time) -> Starlette

返回一个可独立运行、处理飞书 Webhook 推送的 Starlette 应用。

始终在 event_path 挂载事件路由;当 card_path 不为 None 时,额外在该路径挂载卡片回调路由。 全部安全与路由逻辑分别委托给 create_event_routecreate_card_route,因此默认即带有 签名新鲜度(防重放)时间窗与去重保护。

参数:

名称 类型 描述 默认

dispatcher

EventDispatcher

事件分发器。

必需

event_path

str

事件路由路径,默认 /feishu/event

'/feishu/event'

card_path

str | None

卡片回调路由路径,默认 /feishu/card;为 None 时不挂载卡片路由。

'/feishu/card'

encrypt_key

str | None

应用配置的 Encrypt Key;设置后启用解密与签名强校验。

None

verification_token

str | None

握手校验 Token;设置后会校验 url_verification 的内层 token。

None

seen_store

Any

事件路由的去重存储;缺省时各路由自建 InMemorySeenStore, 显式传入 None 关闭去重,也可传入自定义实现。

_DEFAULT_SEEN_STORE

card_seen_store

Any

卡片路由的去重存储;语义同 seen_store

_DEFAULT_SEEN_STORE

max_age_seconds

float | None

签名请求允许的最大时延(秒),默认 300;设为 None 关闭新鲜度校验。

300

now

Callable[[], float]

返回当前 epoch 秒的可调用对象,默认 time.time;可注入以编写确定性测试。

time

返回:

类型 描述
Starlette

已挂载相应路由的 [Starlette][starlette.applications.Starlette] 应用。

飞书文档

接收事件

示例:

Python Console Session
>>> dispatcher = EventDispatcher()
>>> app = create_event_app(dispatcher, encrypt_key="ek_secret")
源代码位于: feishu/events/receiver.py
Python
def create_event_app(
    dispatcher: EventDispatcher,
    *,
    event_path: str = "/feishu/event",
    card_path: str | None = "/feishu/card",
    encrypt_key: str | None = None,
    verification_token: str | None = None,
    seen_store: Any = _DEFAULT_SEEN_STORE,
    card_seen_store: Any = _DEFAULT_SEEN_STORE,
    max_age_seconds: float | None = 300,
    now: Callable[[], float] = time.time,
) -> Starlette:
    r"""
    返回一个可独立运行、处理飞书 Webhook 推送的 Starlette 应用。

    始终在 `event_path` 挂载事件路由;当 `card_path` 不为 `None` 时,额外在该路径挂载卡片回调路由。
    全部安全与路由逻辑分别委托给 [create_event_route][feishu.events.receiver.create_event_route]
    与 [create_card_route][feishu.events.receiver.create_card_route],因此默认即带有
    签名新鲜度(防重放)时间窗与去重保护。

    Args:
        dispatcher: 事件分发器。
        event_path: 事件路由路径,默认 `/feishu/event`。
        card_path: 卡片回调路由路径,默认 `/feishu/card`;为 `None` 时不挂载卡片路由。
        encrypt_key: 应用配置的 Encrypt Key;设置后启用解密与签名强校验。
        verification_token: 握手校验 Token;设置后会校验 `url_verification` 的内层 token。
        seen_store: 事件路由的去重存储;缺省时各路由自建
            [InMemorySeenStore][feishu.events.idempotency.InMemorySeenStore],
            显式传入 `None` 关闭去重,也可传入自定义实现。
        card_seen_store: 卡片路由的去重存储;语义同 `seen_store`。
        max_age_seconds: 签名请求允许的最大时延(秒),默认 `300`;设为 `None` 关闭新鲜度校验。
        now: 返回当前 epoch 秒的可调用对象,默认 [time.time][];可注入以编写确定性测试。

    Returns:
        已挂载相应路由的 [Starlette][starlette.applications.Starlette] 应用。

    飞书文档:
        [接收事件](https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscription-configure-/request-url-configuration-case)

    Examples:
        >>> dispatcher = EventDispatcher()
        >>> app = create_event_app(dispatcher, encrypt_key="ek_secret")  # doctest: +SKIP
    """
    routes = [
        create_event_route(
            dispatcher,
            path=event_path,
            encrypt_key=encrypt_key,
            verification_token=verification_token,
            seen_store=seen_store,
            max_age_seconds=max_age_seconds,
            now=now,
        )
    ]
    if card_path is not None:
        routes.append(
            create_card_route(
                dispatcher,
                path=card_path,
                encrypt_key=encrypt_key,
                verification_token=verification_token,
                seen_store=card_seen_store,
                max_age_seconds=max_age_seconds,
                now=now,
            )
        )
    return Starlette(routes=routes)

create_event_route

Python
create_event_route(dispatcher: EventDispatcher, *, path: str = '/feishu/event', encrypt_key: str | None = None, verification_token: str | None = None, seen_store: Any = _DEFAULT_SEEN_STORE, max_age_seconds: float | None = 300, now: Callable[[], float] = time) -> Route

创建处理飞书事件推送的 Starlette POST 路由。

该端点的处理流程:

  1. 先读取原始请求体字节(签名校验与 AES 解密都依赖未经改动的原始字节)。
  2. 当配置了 encrypt_key 且请求头包含 X-Lark-Signature 时, 经 SignatureVerifier 校验签名 并校验时间戳新鲜度(重放时间窗),记录校验结果。
  3. 若请求体被 encrypt 包裹,则先行解密。
  4. 处理 url_verification 握手:握手仅通过内层 verification_token 鉴权, 不要求 MAC 签名(签名豁免仅在此处生效)。
  5. 其余正常事件:当配置了 encrypt_key 时,签名必须存在、时间戳在 max_age_seconds 时间窗内且校验通过;缺失签名或时间戳过期将返回 401 (防止 Webhook 注入与重放攻击绕过)。
  6. 通过 seen_store 去重(默认即开启),其余事件以后台任务(BackgroundTask) 异步分发,端点立即返回 200 {}

参数:

名称 类型 描述 默认

dispatcher

EventDispatcher

事件分发器。

必需

path

str

路由路径,默认 /feishu/event

'/feishu/event'

encrypt_key

str | None

应用配置的 Encrypt Key;设置后启用解密与签名强校验。

None

verification_token

str | None

握手校验 Token;设置后会校验 url_verification 的内层 token。

None

seen_store

Any

事件去重存储;缺省时使用新建的 InMemorySeenStore, 从而开箱即用地提供去重/防重放保护;显式传入 None 可关闭去重, 也可传入自定义实现(如基于 Redis 的共享存储)。

_DEFAULT_SEEN_STORE

max_age_seconds

float | None

签名请求允许的最大时延(秒),默认 300;用于拒绝过期的 重放请求。设为 None 可关闭新鲜度校验(但绝不会跳过 MAC 校验)。

300

now

Callable[[], float]

返回当前 epoch 秒的可调用对象,默认 time.time;可注入以编写确定性测试。

time

返回:

类型 描述
Route

可挂载到 Starlette 应用的 [Route][starlette.routing.Route]。

飞书文档

接收事件

示例:

Python Console Session
1
2
3
4
>>> from starlette.applications import Starlette
>>> dispatcher = EventDispatcher()
>>> route = create_event_route(dispatcher, encrypt_key="ek_secret")
>>> app = Starlette(routes=[route])
源代码位于: feishu/events/receiver.py
Python
def create_event_route(
    dispatcher: EventDispatcher,
    *,
    path: str = "/feishu/event",
    encrypt_key: str | None = None,
    verification_token: str | None = None,
    seen_store: Any = _DEFAULT_SEEN_STORE,
    max_age_seconds: float | None = 300,
    now: Callable[[], float] = time.time,
) -> Route:
    r"""
    创建处理飞书事件推送的 Starlette POST 路由。

    该端点的处理流程:

    1. 先读取原始请求体字节(签名校验与 AES 解密都依赖未经改动的原始字节)。
    2. 当配置了 `encrypt_key` 且请求头包含 `X-Lark-Signature` 时,
       经 [SignatureVerifier][feishu.signature.SignatureVerifier] 校验签名
       **并校验时间戳新鲜度**(重放时间窗),记录校验结果。
    3. 若请求体被 `encrypt` 包裹,则先行解密。
    4. 处理 `url_verification` 握手:握手仅通过内层 `verification_token` 鉴权,
       不要求 MAC 签名(签名豁免仅在此处生效)。
    5. 其余正常事件:当配置了 `encrypt_key` 时,签名必须存在、时间戳在
       `max_age_seconds` 时间窗内且校验通过;缺失签名或时间戳过期将返回 401
       (防止 Webhook 注入与重放攻击绕过)。
    6. 通过 `seen_store` 去重(默认即开启),其余事件以后台任务(BackgroundTask)
       异步分发,端点立即返回 `200 {}`。

    Args:
        dispatcher: 事件分发器。
        path: 路由路径,默认 `/feishu/event`。
        encrypt_key: 应用配置的 Encrypt Key;设置后启用解密与签名强校验。
        verification_token: 握手校验 Token;设置后会校验 `url_verification` 的内层 token。
        seen_store: 事件去重存储;缺省时使用新建的
            [InMemorySeenStore][feishu.events.idempotency.InMemorySeenStore],
            从而开箱即用地提供去重/防重放保护;显式传入 `None` 可关闭去重,
            也可传入自定义实现(如基于 Redis 的共享存储)。
        max_age_seconds: 签名请求允许的最大时延(秒),默认 `300`;用于拒绝过期的
            重放请求。设为 `None` 可关闭新鲜度校验(但绝不会跳过 MAC 校验)。
        now: 返回当前 epoch 秒的可调用对象,默认 [time.time][];可注入以编写确定性测试。

    Returns:
        可挂载到 Starlette 应用的 [Route][starlette.routing.Route]。

    飞书文档:
        [接收事件](https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscription-configure-/request-url-configuration-case)

    Examples:
        >>> from starlette.applications import Starlette
        >>> dispatcher = EventDispatcher()
        >>> route = create_event_route(dispatcher, encrypt_key="ek_secret")  # doctest: +SKIP
        >>> app = Starlette(routes=[route])  # doctest: +SKIP
    """
    store = _resolve_seen_store(seen_store)
    verifier = _build_verifier(encrypt_key, max_age_seconds, now)

    async def endpoint(request: Request) -> Response:
        raw = await request.body()
        result = await _read_payload(request, raw, encrypt_key, verifier)
        if isinstance(result, Response):
            return result
        payload, sig_verified = result

        if payload.get("type") == "url_verification":
            # Handshake: authenticated by inner token, not by MAC signature.
            if verification_token is not None and not hmac.compare_digest(
                str(payload.get("token", "")), verification_token
            ):
                return JSONResponse({"msg": "token mismatch"}, status_code=401)
            return JSONResponse({"challenge": payload.get("challenge")})

        # Normal events require MAC authentication when encrypt_key is configured.
        if encrypt_key is not None and not sig_verified:
            return JSONResponse({"msg": "signature required"}, status_code=401)

        event = Event.from_payload(payload)
        if store is not None and not await claim(store, event.event_id):
            return JSONResponse({})

        return JSONResponse({}, background=BackgroundTask(dispatcher.dispatch, event))

    return Route(path, endpoint, methods=["POST"])