"""Pydantic models for Netdata Cloud webhook payloads.""" from datetime import datetime from enum import Enum from pydantic import BaseModel, ConfigDict, Field, field_validator class AlertSeverity(str, Enum): """Alert severity levels.""" WARNING = "warning" CRITICAL = "critical" CLEAR = "clear" class ReachabilitySeverity(str, Enum): """Reachability severity levels.""" INFO = "info" CRITICAL = "critical" class ReachabilityStatus(BaseModel): """Reachability status information.""" reachable: bool text: str # "reachable" or "unreachable" class AlertNotification(BaseModel): """Alert notification payload from Netdata Cloud.""" message: str alert: str info: str chart: str context: str space: str severity: AlertSeverity date: datetime alert_url: str # Additional fields from full schema Rooms: dict | None = None family: str | None = None class_: str | None = Field(None, alias="class") # 'class' is a Python keyword duration: str | None = None additional_active_critical_alerts: int | None = None additional_active_warning_alerts: int | None = None @field_validator("date", mode="before") @classmethod def parse_date(cls, v): if isinstance(v, str): return datetime.fromisoformat(v.replace("Z", "+00:00")) return v class ReachabilityNotification(BaseModel): """Reachability notification payload from Netdata Cloud.""" message: str url: str host: str severity: ReachabilitySeverity status: ReachabilityStatus class TestNotification(BaseModel): """Test notification payload from Netdata Cloud.""" message: str class WebhookPayload(BaseModel): """Union type for webhook payloads.""" @classmethod def parse( cls, data: dict ) -> AlertNotification | ReachabilityNotification | TestNotification: """Parse webhook payload and determine notification type.""" if "alert" in data and "chart" in data: return AlertNotification(**data) elif "status" in data and "host" in data: return ReachabilityNotification(**data) elif len(data) == 1 and "message" in data: # Test notification - only has a message field return TestNotification(**data) else: raise ValueError(f"Unknown notification type: {data}") class ZulipConfig(BaseModel): """Zulip bot configuration.""" site: str | None = None email: str | None = None api_key: str | None = None stream: str = "netdata-alerts" model_config = ConfigDict(env_prefix="ZULIP_") class ServerConfig(BaseModel): """Server configuration.""" host: str = "0.0.0.0" port: int = 8080 # Default HTTP port challenge_secret: str | None = None # Netdata webhook challenge secret model_config = ConfigDict(env_prefix="SERVER_")