A community based topic aggregation platform built on atproto
1package unfurl 2 3import ( 4 "fmt" 5 "log" 6 "sync" 7 "time" 8) 9 10// circuitState represents the state of a circuit breaker 11type circuitState int 12 13const ( 14 stateClosed circuitState = iota // Normal operation 15 stateOpen // Circuit is open (provider failing) 16 stateHalfOpen // Testing if provider recovered 17) 18 19// circuitBreaker tracks failures per provider and stops trying failing providers 20type circuitBreaker struct { 21 failures map[string]int 22 lastFailure map[string]time.Time 23 state map[string]circuitState 24 lastStateLog map[string]time.Time 25 failureThreshold int 26 openDuration time.Duration 27 mu sync.RWMutex 28} 29 30// newCircuitBreaker creates a circuit breaker with default settings 31func newCircuitBreaker() *circuitBreaker { 32 return &circuitBreaker{ 33 failureThreshold: 3, // Open after 3 consecutive failures 34 openDuration: 5 * time.Minute, // Keep open for 5 minutes 35 failures: make(map[string]int), 36 lastFailure: make(map[string]time.Time), 37 state: make(map[string]circuitState), 38 lastStateLog: make(map[string]time.Time), 39 } 40} 41 42// canAttempt checks if we should attempt to call this provider 43// Returns true if circuit is closed or half-open (ready to retry) 44func (cb *circuitBreaker) canAttempt(provider string) (bool, error) { 45 cb.mu.RLock() 46 defer cb.mu.RUnlock() 47 48 state := cb.getState(provider) 49 50 switch state { 51 case stateClosed: 52 return true, nil 53 case stateOpen: 54 // Check if we should transition to half-open 55 lastFail := cb.lastFailure[provider] 56 if time.Since(lastFail) > cb.openDuration { 57 // Transition to half-open (allow one retry) 58 cb.mu.RUnlock() 59 cb.mu.Lock() 60 cb.state[provider] = stateHalfOpen 61 cb.logStateChange(provider, stateHalfOpen) 62 cb.mu.Unlock() 63 cb.mu.RLock() 64 return true, nil 65 } 66 // Still in open period 67 failCount := cb.failures[provider] 68 nextRetry := lastFail.Add(cb.openDuration) 69 return false, fmt.Errorf( 70 "circuit breaker open for provider '%s' (failures: %d, next retry: %s)", 71 provider, 72 failCount, 73 nextRetry.Format("15:04:05"), 74 ) 75 case stateHalfOpen: 76 return true, nil 77 default: 78 return true, nil 79 } 80} 81 82// recordSuccess records a successful unfurl, resetting failure count 83func (cb *circuitBreaker) recordSuccess(provider string) { 84 cb.mu.Lock() 85 defer cb.mu.Unlock() 86 87 oldState := cb.getState(provider) 88 89 // Reset failure tracking 90 delete(cb.failures, provider) 91 delete(cb.lastFailure, provider) 92 cb.state[provider] = stateClosed 93 94 // Log recovery if we were in a failure state 95 if oldState != stateClosed { 96 cb.logStateChange(provider, stateClosed) 97 } 98} 99 100// recordFailure records a failed unfurl attempt 101func (cb *circuitBreaker) recordFailure(provider string, err error) { 102 cb.mu.Lock() 103 defer cb.mu.Unlock() 104 105 // Increment failure count 106 cb.failures[provider]++ 107 cb.lastFailure[provider] = time.Now() 108 109 failCount := cb.failures[provider] 110 111 // Check if we should open the circuit 112 if failCount >= cb.failureThreshold { 113 oldState := cb.getState(provider) 114 cb.state[provider] = stateOpen 115 if oldState != stateOpen { 116 log.Printf( 117 "[UNFURL-CIRCUIT] Opening circuit for provider '%s' after %d consecutive failures. Last error: %v", 118 provider, 119 failCount, 120 err, 121 ) 122 cb.lastStateLog[provider] = time.Now() 123 } 124 } else { 125 log.Printf( 126 "[UNFURL-CIRCUIT] Failure %d/%d for provider '%s': %v", 127 failCount, 128 cb.failureThreshold, 129 provider, 130 err, 131 ) 132 } 133} 134 135// getState returns the current state (must be called with lock held) 136func (cb *circuitBreaker) getState(provider string) circuitState { 137 if state, exists := cb.state[provider]; exists { 138 return state 139 } 140 return stateClosed 141} 142 143// logStateChange logs state transitions (must be called with lock held) 144// Debounced to avoid log spam (max once per minute per provider) 145func (cb *circuitBreaker) logStateChange(provider string, newState circuitState) { 146 lastLog, exists := cb.lastStateLog[provider] 147 if exists && time.Since(lastLog) < time.Minute { 148 return // Don't spam logs 149 } 150 151 var stateStr string 152 switch newState { 153 case stateClosed: 154 stateStr = "CLOSED (recovered)" 155 case stateOpen: 156 stateStr = "OPEN (failing)" 157 case stateHalfOpen: 158 stateStr = "HALF-OPEN (testing)" 159 } 160 161 log.Printf("[UNFURL-CIRCUIT] Circuit for provider '%s' is now %s", provider, stateStr) 162 cb.lastStateLog[provider] = time.Now() 163} 164 165// getStats returns current circuit breaker stats (for debugging/monitoring) 166func (cb *circuitBreaker) getStats() map[string]interface{} { 167 cb.mu.RLock() 168 defer cb.mu.RUnlock() 169 170 stats := make(map[string]interface{}) 171 172 // Collect all providers with any activity (state, failures, or both) 173 providers := make(map[string]bool) 174 for provider := range cb.state { 175 providers[provider] = true 176 } 177 for provider := range cb.failures { 178 providers[provider] = true 179 } 180 181 for provider := range providers { 182 state := cb.getState(provider) 183 var stateStr string 184 switch state { 185 case stateClosed: 186 stateStr = "closed" 187 case stateOpen: 188 stateStr = "open" 189 case stateHalfOpen: 190 stateStr = "half-open" 191 } 192 193 stats[provider] = map[string]interface{}{ 194 "state": stateStr, 195 "failures": cb.failures[provider], 196 "last_failure": cb.lastFailure[provider], 197 } 198 } 199 return stats 200}