providing password reset services for a long while: circa 2025
at main 5.3 kB view raw
1import { Database } from "bun:sqlite"; 2import { mkdir } from "node:fs/promises"; 3import type { Block, SlackAPIClient } from "slack-edge"; 4 5export interface SlackMessage { 6 channel: string; 7 blocks?: Block[]; 8 text: string; 9 timestamp?: number; 10 status: "pending" | "sent" | "failed"; 11} 12 13export default class SlackMessageQueue { 14 private db: Database; 15 private slack: SlackAPIClient; 16 private isProcessing = false; 17 private batchSize = 50; 18 private rateLimitDelay = 1000; // 1 message per second per channel 19 private channelLastMessageTime: Map<string, number> = new Map(); 20 private totalMessageCount = 0; 21 private messageCountResetTime = 0; 22 private backoffDelay = 1000; 23 private maxBackoff = 30000; // 30 seconds 24 25 constructor(slackClient: SlackAPIClient, dbPath = "slack-queue.db") { 26 this.slack = slackClient; 27 const dir = dbPath.split("/").slice(0, -1).join("/"); 28 if (dir) { 29 try { 30 mkdir(dir); 31 } catch (e) { 32 // Directory may already exist 33 } 34 } 35 this.db = new Database(dbPath); 36 this.initDatabase(); 37 this.processQueue(); 38 } 39 40 private initDatabase() { 41 this.db.run(` 42 CREATE TABLE IF NOT EXISTS messages ( 43 id INTEGER PRIMARY KEY AUTOINCREMENT, 44 channel TEXT NOT NULL, 45 blocks TEXT, 46 text TEXT NOT NULL, 47 timestamp INTEGER NOT NULL, 48 status TEXT NOT NULL 49 ) 50 `); 51 this.db.run("CREATE INDEX IF NOT EXISTS idx_status ON messages(status)"); 52 } 53 54 async enqueue(message: SlackMessage): Promise<void> { 55 const stmt = this.db.prepare(` 56 INSERT INTO messages (channel, blocks, text, timestamp, status) 57 VALUES (?, ?, ?, ?, ?) 58 `); 59 60 stmt.run( 61 message.channel ?? null, 62 JSON.stringify(message.blocks) ?? null, 63 message.text, 64 Date.now(), 65 "pending", 66 ); 67 68 if (!this.isProcessing) { 69 this.processQueue(); 70 } 71 } 72 73 private async sleep(ms: number): Promise<void> { 74 return new Promise((resolve) => setTimeout(resolve, ms)); 75 } 76 77 private async sendWithRateLimit( 78 message: SlackMessage & { id: number }, 79 ): Promise<void> { 80 const now = Date.now(); 81 82 // Check per-minute total limit 83 if (now - this.messageCountResetTime >= 60000) { 84 this.totalMessageCount = 0; 85 this.messageCountResetTime = now; 86 } 87 88 if (this.totalMessageCount >= 350) { 89 const waitTime = 60000 - (now - this.messageCountResetTime); 90 await this.sleep(waitTime); 91 this.totalMessageCount = 0; 92 this.messageCountResetTime = Date.now(); 93 } 94 95 // Check per-channel rate limit 96 const channelLastTime = 97 this.channelLastMessageTime.get(message.channel) || 0; 98 const timeSinceLastChannelMessage = now - channelLastTime; 99 100 if (timeSinceLastChannelMessage < this.rateLimitDelay) { 101 await this.sleep(this.rateLimitDelay - timeSinceLastChannelMessage); 102 } 103 104 let currentBackoff = this.backoffDelay; 105 let attempts = 0; 106 const maxAttempts = 3; 107 108 while (attempts < maxAttempts) { 109 try { 110 await this.slack.chat.postMessage({ 111 channel: message.channel, 112 blocks: JSON.parse(message.blocks as unknown as string) ?? undefined, 113 text: message.text, 114 }); 115 116 this.channelLastMessageTime.set(message.channel, Date.now()); 117 this.totalMessageCount++; 118 119 this.db 120 .prepare( 121 ` 122 UPDATE messages 123 SET status = 'sent' 124 WHERE id = ? 125 `, 126 ) 127 .run(message.id); 128 129 return; 130 } catch (error) { 131 console.error( 132 `Error sending message (attempt ${attempts + 1}/${maxAttempts})`, 133 error, 134 ); 135 attempts++; 136 137 if (attempts === maxAttempts) { 138 this.db 139 .prepare( 140 ` 141 UPDATE messages 142 SET status = 'failed' 143 WHERE id = ? 144 `, 145 ) 146 .run(message.id); 147 return; 148 } 149 150 await this.sleep(currentBackoff); 151 currentBackoff = Math.min(currentBackoff * 2, this.maxBackoff); 152 } 153 } 154 } 155 156 private async processQueue() { 157 if (this.isProcessing) return; 158 this.isProcessing = true; 159 160 console.log("Processing queue"); 161 162 try { 163 while (true) { 164 const messages = this.db 165 .prepare( 166 ` 167 SELECT * FROM messages 168 WHERE status = 'pending' 169 LIMIT ? 170 `, 171 ) 172 .all(this.batchSize) as (SlackMessage & { id: number })[]; 173 174 if (messages.length === 0) break; 175 176 // Process messages sequentially to maintain rate limiting 177 for (const message of messages) { 178 await this.sendWithRateLimit(message); 179 } 180 } 181 } finally { 182 this.isProcessing = false; 183 } 184 } 185 186 async cleanup(olderThan: number = 7 * 24 * 60 * 60 * 1000) { 187 const cutoff = Date.now() - olderThan; 188 this.db 189 .prepare( 190 ` 191 DELETE FROM messages 192 WHERE timestamp < ? AND status != 'pending' 193 `, 194 ) 195 .run(cutoff); 196 } 197 198 async queueLength() { 199 const result = this.db 200 .prepare( 201 ` 202 SELECT COUNT(*) as count 203 FROM messages 204 WHERE status = 'pending' 205 `, 206 ) 207 .get(); 208 return (result as { count: number }).count; 209 } 210}