A community based topic aggregation platform built on atproto
1package unfurl
2
3import (
4 "fmt"
5 "log"
6 "sync"
7 "time"
8)
9
10// circuitState represents the state of a circuit breaker
11type circuitState int
12
13const (
14 stateClosed circuitState = iota // Normal operation
15 stateOpen // Circuit is open (provider failing)
16 stateHalfOpen // Testing if provider recovered
17)
18
19// circuitBreaker tracks failures per provider and stops trying failing providers
20type circuitBreaker struct {
21 failures map[string]int
22 lastFailure map[string]time.Time
23 state map[string]circuitState
24 lastStateLog map[string]time.Time
25 failureThreshold int
26 openDuration time.Duration
27 mu sync.RWMutex
28}
29
30// newCircuitBreaker creates a circuit breaker with default settings
31func newCircuitBreaker() *circuitBreaker {
32 return &circuitBreaker{
33 failureThreshold: 3, // Open after 3 consecutive failures
34 openDuration: 5 * time.Minute, // Keep open for 5 minutes
35 failures: make(map[string]int),
36 lastFailure: make(map[string]time.Time),
37 state: make(map[string]circuitState),
38 lastStateLog: make(map[string]time.Time),
39 }
40}
41
42// canAttempt checks if we should attempt to call this provider
43// Returns true if circuit is closed or half-open (ready to retry)
44func (cb *circuitBreaker) canAttempt(provider string) (bool, error) {
45 cb.mu.RLock()
46 defer cb.mu.RUnlock()
47
48 state := cb.getState(provider)
49
50 switch state {
51 case stateClosed:
52 return true, nil
53 case stateOpen:
54 // Check if we should transition to half-open
55 lastFail := cb.lastFailure[provider]
56 if time.Since(lastFail) > cb.openDuration {
57 // Transition to half-open (allow one retry)
58 cb.mu.RUnlock()
59 cb.mu.Lock()
60 cb.state[provider] = stateHalfOpen
61 cb.logStateChange(provider, stateHalfOpen)
62 cb.mu.Unlock()
63 cb.mu.RLock()
64 return true, nil
65 }
66 // Still in open period
67 failCount := cb.failures[provider]
68 nextRetry := lastFail.Add(cb.openDuration)
69 return false, fmt.Errorf(
70 "circuit breaker open for provider '%s' (failures: %d, next retry: %s)",
71 provider,
72 failCount,
73 nextRetry.Format("15:04:05"),
74 )
75 case stateHalfOpen:
76 return true, nil
77 default:
78 return true, nil
79 }
80}
81
82// recordSuccess records a successful unfurl, resetting failure count
83func (cb *circuitBreaker) recordSuccess(provider string) {
84 cb.mu.Lock()
85 defer cb.mu.Unlock()
86
87 oldState := cb.getState(provider)
88
89 // Reset failure tracking
90 delete(cb.failures, provider)
91 delete(cb.lastFailure, provider)
92 cb.state[provider] = stateClosed
93
94 // Log recovery if we were in a failure state
95 if oldState != stateClosed {
96 cb.logStateChange(provider, stateClosed)
97 }
98}
99
100// recordFailure records a failed unfurl attempt
101func (cb *circuitBreaker) recordFailure(provider string, err error) {
102 cb.mu.Lock()
103 defer cb.mu.Unlock()
104
105 // Increment failure count
106 cb.failures[provider]++
107 cb.lastFailure[provider] = time.Now()
108
109 failCount := cb.failures[provider]
110
111 // Check if we should open the circuit
112 if failCount >= cb.failureThreshold {
113 oldState := cb.getState(provider)
114 cb.state[provider] = stateOpen
115 if oldState != stateOpen {
116 log.Printf(
117 "[UNFURL-CIRCUIT] Opening circuit for provider '%s' after %d consecutive failures. Last error: %v",
118 provider,
119 failCount,
120 err,
121 )
122 cb.lastStateLog[provider] = time.Now()
123 }
124 } else {
125 log.Printf(
126 "[UNFURL-CIRCUIT] Failure %d/%d for provider '%s': %v",
127 failCount,
128 cb.failureThreshold,
129 provider,
130 err,
131 )
132 }
133}
134
135// getState returns the current state (must be called with lock held)
136func (cb *circuitBreaker) getState(provider string) circuitState {
137 if state, exists := cb.state[provider]; exists {
138 return state
139 }
140 return stateClosed
141}
142
143// logStateChange logs state transitions (must be called with lock held)
144// Debounced to avoid log spam (max once per minute per provider)
145func (cb *circuitBreaker) logStateChange(provider string, newState circuitState) {
146 lastLog, exists := cb.lastStateLog[provider]
147 if exists && time.Since(lastLog) < time.Minute {
148 return // Don't spam logs
149 }
150
151 var stateStr string
152 switch newState {
153 case stateClosed:
154 stateStr = "CLOSED (recovered)"
155 case stateOpen:
156 stateStr = "OPEN (failing)"
157 case stateHalfOpen:
158 stateStr = "HALF-OPEN (testing)"
159 }
160
161 log.Printf("[UNFURL-CIRCUIT] Circuit for provider '%s' is now %s", provider, stateStr)
162 cb.lastStateLog[provider] = time.Now()
163}
164
165// getStats returns current circuit breaker stats (for debugging/monitoring)
166func (cb *circuitBreaker) getStats() map[string]interface{} {
167 cb.mu.RLock()
168 defer cb.mu.RUnlock()
169
170 stats := make(map[string]interface{})
171
172 // Collect all providers with any activity (state, failures, or both)
173 providers := make(map[string]bool)
174 for provider := range cb.state {
175 providers[provider] = true
176 }
177 for provider := range cb.failures {
178 providers[provider] = true
179 }
180
181 for provider := range providers {
182 state := cb.getState(provider)
183 var stateStr string
184 switch state {
185 case stateClosed:
186 stateStr = "closed"
187 case stateOpen:
188 stateStr = "open"
189 case stateHalfOpen:
190 stateStr = "half-open"
191 }
192
193 stats[provider] = map[string]interface{}{
194 "state": stateStr,
195 "failures": cb.failures[provider],
196 "last_failure": cb.lastFailure[provider],
197 }
198 }
199 return stats
200}