Netdata.cloud bot for Zulip
1"""Pydantic models for Netdata Cloud webhook payloads.""" 2 3from datetime import datetime 4from enum import Enum 5from typing import Optional, Union 6 7from pydantic import BaseModel, Field, field_validator, ConfigDict 8 9 10class AlertSeverity(str, Enum): 11 """Alert severity levels.""" 12 WARNING = "warning" 13 CRITICAL = "critical" 14 CLEAR = "clear" 15 16 17class ReachabilitySeverity(str, Enum): 18 """Reachability severity levels.""" 19 INFO = "info" 20 CRITICAL = "critical" 21 22 23class ReachabilityStatus(BaseModel): 24 """Reachability status information.""" 25 reachable: bool 26 text: str # "reachable" or "unreachable" 27 28 29class AlertNotification(BaseModel): 30 """Alert notification payload from Netdata Cloud.""" 31 message: str 32 alert: str 33 info: str 34 chart: str 35 context: str 36 space: str 37 severity: AlertSeverity 38 date: datetime 39 alert_url: str 40 41 @field_validator('date', mode='before') 42 @classmethod 43 def parse_date(cls, v): 44 if isinstance(v, str): 45 return datetime.fromisoformat(v.replace('Z', '+00:00')) 46 return v 47 48 49class ReachabilityNotification(BaseModel): 50 """Reachability notification payload from Netdata Cloud.""" 51 message: str 52 url: str 53 host: str 54 severity: ReachabilitySeverity 55 status: ReachabilityStatus 56 57 58class WebhookPayload(BaseModel): 59 """Union type for webhook payloads.""" 60 61 @classmethod 62 def parse(cls, data: dict) -> Union[AlertNotification, ReachabilityNotification]: 63 """Parse webhook payload and determine notification type.""" 64 if 'alert' in data and 'chart' in data: 65 return AlertNotification(**data) 66 elif 'status' in data and 'host' in data: 67 return ReachabilityNotification(**data) 68 else: 69 raise ValueError(f"Unknown notification type: {data}") 70 71 72class ZulipConfig(BaseModel): 73 """Zulip bot configuration.""" 74 site: Optional[str] = None 75 email: Optional[str] = None 76 api_key: Optional[str] = None 77 stream: str = "netdata-alerts" 78 79 model_config = ConfigDict(env_prefix="ZULIP_") 80 81 82class ServerConfig(BaseModel): 83 """Server configuration.""" 84 host: str = "0.0.0.0" 85 port: int = 8080 # Default HTTP port 86 87 model_config = ConfigDict(env_prefix="SERVER_")