"""Tests for webhook functionality.""" import pytest from datetime import datetime from unittest.mock import Mock, patch from netdata_zulip_bot.models import AlertNotification, ReachabilityNotification, WebhookPayload from netdata_zulip_bot.formatter import ZulipMessageFormatter class TestWebhookPayload: """Test webhook payload parsing.""" def test_parse_alert_notification(self): """Test parsing alert notification payload.""" data = { "message": "Critical alert on production server", "alert": "high_cpu_usage", "info": "CPU usage exceeded 90%", "chart": "system.cpu", "context": "cpu.utilization", "space": "production", "severity": "critical", "date": "2024-01-15T14:30:00Z", "alert_url": "https://app.netdata.cloud/spaces/abc/alerts/123" } notification = WebhookPayload.parse(data) assert isinstance(notification, AlertNotification) assert notification.severity.value == "critical" assert notification.alert == "high_cpu_usage" assert isinstance(notification.date, datetime) def test_parse_reachability_notification(self): """Test parsing reachability notification payload.""" data = { "message": "Host web-server-01 is unreachable", "url": "https://app.netdata.cloud/hosts/web-server-01", "host": "web-server-01", "severity": "critical", "status": { "reachable": False, "text": "unreachable" } } notification = WebhookPayload.parse(data) assert isinstance(notification, ReachabilityNotification) assert notification.severity.value == "critical" assert notification.host == "web-server-01" assert not notification.status.reachable def test_parse_unknown_payload_raises_error(self): """Test that unknown payload types raise ValueError.""" data = {"unknown_field": "value"} with pytest.raises(ValueError, match="Unknown notification type"): WebhookPayload.parse(data) class TestZulipMessageFormatter: """Test Zulip message formatting.""" def setup_method(self): """Set up test fixtures.""" self.formatter = ZulipMessageFormatter() def test_format_critical_alert(self): """Test formatting critical alert notification.""" alert = AlertNotification( message="Critical alert on production server", alert="high_cpu_usage", info="CPU usage exceeded 90% for 5 minutes", chart="system.cpu", context="cpu.utilization", space="production", severity="critical", date=datetime(2024, 1, 15, 14, 30, 0), alert_url="https://app.netdata.cloud/spaces/abc/alerts/123" ) topic, content = self.formatter.format_alert(alert) assert topic == "critical" assert "🔴" in content assert "**high_cpu_usage**" in content assert "production" in content assert "Critical" in content assert "2024-01-15 14:30:00 UTC" in content assert "[View Alert](https://app.netdata.cloud/spaces/abc/alerts/123)" in content def test_format_warning_alert(self): """Test formatting warning alert notification.""" alert = AlertNotification( message="Warning: High memory usage", alert="high_memory_usage", info="Memory usage at 85%", chart="system.ram", context="memory.utilization", space="staging", severity="warning", date=datetime(2024, 1, 15, 10, 15, 0), alert_url="https://app.netdata.cloud/spaces/def/alerts/456" ) topic, content = self.formatter.format_alert(alert) assert topic == "warning" assert "âš ī¸" in content assert "Warning" in content def test_format_clear_alert(self): """Test formatting clear alert notification.""" alert = AlertNotification( message="Alert cleared: CPU usage normal", alert="high_cpu_usage", info="CPU usage returned to normal levels", chart="system.cpu", context="cpu.utilization", space="production", severity="clear", date=datetime(2024, 1, 15, 15, 0, 0), alert_url="https://app.netdata.cloud/spaces/abc/alerts/123" ) topic, content = self.formatter.format_alert(alert) assert topic == "clear" assert "✅" in content assert "Clear" in content def test_format_reachability_unreachable(self): """Test formatting unreachable host notification.""" notification = ReachabilityNotification( message="Host web-server-01 is unreachable", url="https://app.netdata.cloud/hosts/web-server-01", host="web-server-01", severity="critical", status={ "reachable": False, "text": "unreachable" } ) topic, content = self.formatter.format_reachability(notification) assert topic == "reachability" assert "🔴" in content # critical severity assert "❌" in content # unreachable status assert "web-server-01" in content assert "Unreachable" in content assert "[View Host](https://app.netdata.cloud/hosts/web-server-01)" in content def test_format_reachability_reachable(self): """Test formatting reachable host notification.""" notification = ReachabilityNotification( message="Host web-server-01 is reachable again", url="https://app.netdata.cloud/hosts/web-server-01", host="web-server-01", severity="info", status={ "reachable": True, "text": "reachable" } ) topic, content = self.formatter.format_reachability(notification) assert topic == "reachability" assert "â„šī¸" in content # info severity assert "✅" in content # reachable status assert "Reachable" in content