"""Pydantic models for Netdata Cloud webhook payloads.""" from datetime import datetime from enum import Enum from typing import Optional, Union from pydantic import BaseModel, Field, field_validator, ConfigDict class AlertSeverity(str, Enum): """Alert severity levels.""" WARNING = "warning" CRITICAL = "critical" CLEAR = "clear" class ReachabilitySeverity(str, Enum): """Reachability severity levels.""" INFO = "info" CRITICAL = "critical" class ReachabilityStatus(BaseModel): """Reachability status information.""" reachable: bool text: str # "reachable" or "unreachable" class AlertNotification(BaseModel): """Alert notification payload from Netdata Cloud.""" message: str alert: str info: str chart: str context: str space: str severity: AlertSeverity date: datetime alert_url: str @field_validator('date', mode='before') @classmethod def parse_date(cls, v): if isinstance(v, str): return datetime.fromisoformat(v.replace('Z', '+00:00')) return v class ReachabilityNotification(BaseModel): """Reachability notification payload from Netdata Cloud.""" message: str url: str host: str severity: ReachabilitySeverity status: ReachabilityStatus class WebhookPayload(BaseModel): """Union type for webhook payloads.""" @classmethod def parse(cls, data: dict) -> Union[AlertNotification, ReachabilityNotification]: """Parse webhook payload and determine notification type.""" if 'alert' in data and 'chart' in data: return AlertNotification(**data) elif 'status' in data and 'host' in data: return ReachabilityNotification(**data) else: raise ValueError(f"Unknown notification type: {data}") class ZulipConfig(BaseModel): """Zulip bot configuration.""" site: Optional[str] = None email: Optional[str] = None api_key: Optional[str] = None stream: str = "netdata-alerts" model_config = ConfigDict(env_prefix="ZULIP_") class ServerConfig(BaseModel): """Server configuration.""" host: str = "0.0.0.0" port: int = 8080 # Default HTTP port model_config = ConfigDict(env_prefix="SERVER_")