Netdata.cloud bot for Zulip
1"""Pydantic models for Netdata Cloud webhook payloads."""
2
3from datetime import datetime
4from enum import Enum
5from typing import Optional, Union
6
7from pydantic import BaseModel, Field, field_validator, ConfigDict
8
9
10class AlertSeverity(str, Enum):
11 """Alert severity levels."""
12 WARNING = "warning"
13 CRITICAL = "critical"
14 CLEAR = "clear"
15
16
17class ReachabilitySeverity(str, Enum):
18 """Reachability severity levels."""
19 INFO = "info"
20 CRITICAL = "critical"
21
22
23class ReachabilityStatus(BaseModel):
24 """Reachability status information."""
25 reachable: bool
26 text: str # "reachable" or "unreachable"
27
28
29class AlertNotification(BaseModel):
30 """Alert notification payload from Netdata Cloud."""
31 message: str
32 alert: str
33 info: str
34 chart: str
35 context: str
36 space: str
37 severity: AlertSeverity
38 date: datetime
39 alert_url: str
40
41 @field_validator('date', mode='before')
42 @classmethod
43 def parse_date(cls, v):
44 if isinstance(v, str):
45 return datetime.fromisoformat(v.replace('Z', '+00:00'))
46 return v
47
48
49class ReachabilityNotification(BaseModel):
50 """Reachability notification payload from Netdata Cloud."""
51 message: str
52 url: str
53 host: str
54 severity: ReachabilitySeverity
55 status: ReachabilityStatus
56
57
58class WebhookPayload(BaseModel):
59 """Union type for webhook payloads."""
60
61 @classmethod
62 def parse(cls, data: dict) -> Union[AlertNotification, ReachabilityNotification]:
63 """Parse webhook payload and determine notification type."""
64 if 'alert' in data and 'chart' in data:
65 return AlertNotification(**data)
66 elif 'status' in data and 'host' in data:
67 return ReachabilityNotification(**data)
68 else:
69 raise ValueError(f"Unknown notification type: {data}")
70
71
72class ZulipConfig(BaseModel):
73 """Zulip bot configuration."""
74 site: Optional[str] = None
75 email: Optional[str] = None
76 api_key: Optional[str] = None
77 stream: str = "netdata-alerts"
78
79 model_config = ConfigDict(env_prefix="ZULIP_")
80
81
82class ServerConfig(BaseModel):
83 """Server configuration."""
84 host: str = "0.0.0.0"
85 port: int = 8080 # Default HTTP port
86
87 model_config = ConfigDict(env_prefix="SERVER_")