跳转至

grafana

feishu.plugins.grafana

GrafanaBundle

无额外依赖的 Grafana / Alertmanager 告警载荷归一化工具 bundle。

源代码位于: feishu/plugins/grafana.py
Python
class GrafanaBundle:
    r"""无额外依赖的 Grafana / Alertmanager 告警载荷归一化工具 bundle。"""

    def register(self, registry: ToolRegistry, context: BundleContext) -> None:
        registry.add(
            Tool(
                name="normalize_grafana_alerts",
                description="把 Grafana 或 Alertmanager 告警 webhook 载荷归一化为紧凑告警事实。",
                input_schema={
                    "type": "object",
                    "properties": {"payload": {"type": "object", "description": "Grafana/Alertmanager webhook JSON。"}},
                    "required": ["payload"],
                    "additionalProperties": False,
                },
                handler=lambda payload: normalize_grafana_alerts(payload),
            )
        )
        registry.add(
            Tool(
                name="search_grafana_dashboards",
                description="搜索 Grafana dashboard,用于先定位集群状态或服务状态面板。",
                input_schema={
                    "type": "object",
                    "properties": {
                        "query": {"type": "string", "description": "Dashboard 标题关键词。"},
                        "limit": {"type": "integer", "description": "最多返回多少个 dashboard,默认 10。"},
                    },
                    "additionalProperties": False,
                },
                handler=lambda query=None, limit=10: _search_dashboards(context, query=query, limit=limit),
            )
        )
        registry.add(
            Tool(
                name="get_grafana_dashboard",
                description="读取 Grafana dashboard 的标题、panel 与 Prometheus 查询摘要。",
                input_schema={
                    "type": "object",
                    "properties": {"uid": {"type": "string", "description": "Grafana dashboard uid。"}},
                    "required": ["uid"],
                    "additionalProperties": False,
                },
                handler=lambda uid: _get_dashboard(context, uid=uid),
            )
        )
        registry.add(
            Tool(
                name="query_grafana_prometheus",
                description="通过 Grafana 的 Prometheus datasource 执行只读 instant query。",
                input_schema={
                    "type": "object",
                    "properties": {
                        "query": {"type": "string", "description": "PromQL instant query。"},
                        "datasource_uid": {"type": "string", "description": "可选 Grafana datasource uid。"},
                        "max_results": {"type": "integer", "description": "最多返回多少条时间序列,默认 20。"},
                    },
                    "required": ["query"],
                    "additionalProperties": False,
                },
                handler=lambda query, datasource_uid=None, max_results=20: _query_prometheus(
                    context, query=query, datasource_uid=datasource_uid, max_results=max_results
                ),
            )
        )

GrafanaClient

Small async Grafana HTTP client for read-only dashboard and datasource queries.

源代码位于: feishu/plugins/grafana.py
Python
class GrafanaClient:
    r"""Small async Grafana HTTP client for read-only dashboard and datasource queries."""

    def __init__(
        self,
        base_url: str,
        *,
        api_token: str | None = None,
        datasource_uid: str | None = None,
        timeout: float = 10.0,
        transport: httpx.AsyncBaseTransport | None = None,
    ) -> None:
        self.base_url = base_url.rstrip("/")
        self.api_token = api_token
        self.datasource_uid = datasource_uid
        self.timeout = timeout
        self.transport = transport

    async def search_dashboards(self, *, query: str | None = None, limit: int = 10) -> list[dict[str, Any]]:
        params: dict[str, Any] = {"type": "dash-db", "limit": max(1, min(int(limit), 100))}
        if query:
            params["query"] = query
        payload = await self._request("GET", "/api/search", params=params)
        items = payload if isinstance(payload, list) else []
        return [_compact_dashboard_hit(item) for item in items if isinstance(item, dict)]

    async def get_dashboard(self, uid: str) -> dict[str, Any]:
        payload = await self._request("GET", f"/api/dashboards/uid/{uid}")
        data: dict[str, Any] = payload if isinstance(payload, dict) else {}
        dashboard_value = data.get("dashboard")
        dashboard: dict[str, Any] = dashboard_value if isinstance(dashboard_value, dict) else data
        meta_value = data.get("meta")
        meta = meta_value if isinstance(meta_value, dict) else {}
        return _compact_dashboard(dashboard, meta=meta)

    async def query_prometheus(
        self,
        *,
        query: str,
        datasource_uid: str | None = None,
        max_results: int = 20,
    ) -> dict[str, Any]:
        uid = datasource_uid or self.datasource_uid or await self._default_prometheus_datasource_uid()
        payload = await self._request(
            "GET",
            f"/api/datasources/proxy/uid/{uid}/api/v1/query",
            params={"query": query},
        )
        return _compact_prometheus_response(payload, query=query, datasource_uid=uid, max_results=max_results)

    async def _default_prometheus_datasource_uid(self) -> str:
        payload = await self._request("GET", "/api/datasources")
        items = payload if isinstance(payload, list) else []
        prometheus = [item for item in items if isinstance(item, dict) and item.get("type") == "prometheus"]
        for item in prometheus:
            uid = _text(item.get("uid"))
            if item.get("isDefault") and uid:
                return uid
        for item in prometheus:
            uid = _text(item.get("uid"))
            if uid:
                return uid
        raise RuntimeError("Grafana Prometheus datasource is not configured")

    async def _request(self, method: str, path: str, **kwargs: Any) -> Any:
        headers = {"Accept": "application/json"}
        if self.api_token:
            headers["Authorization"] = f"Bearer {self.api_token}"
        async with httpx.AsyncClient(
            base_url=self.base_url,
            timeout=self.timeout,
            headers=headers,
            transport=self.transport,
        ) as client:
            response = await client.request(method, path, **kwargs)
            response.raise_for_status()
            return response.json()

normalize_grafana_alerts

Python
normalize_grafana_alerts(payload: dict[str, Any]) -> dict[str, Any]

把 Grafana / Alertmanager webhook 载荷归一化为适合模型消费的告警事实。

源代码位于: feishu/plugins/grafana.py
Python
def normalize_grafana_alerts(payload: dict[str, Any]) -> dict[str, Any]:
    r"""把 Grafana / Alertmanager webhook 载荷归一化为适合模型消费的告警事实。"""
    alerts_value = payload.get("alerts")
    alerts = alerts_value if isinstance(alerts_value, list) else []
    normalized = [_alert(item) for item in alerts if isinstance(item, dict)]
    return {
        "status": _text(payload.get("status")) or _aggregate_status(normalized),
        "alert_count": len(normalized),
        "alerts": normalized,
    }