providing password reset services for a long while: circa 2025
1import { Database } from "bun:sqlite";
2import { mkdir } from "node:fs/promises";
3import type { Block, SlackAPIClient } from "slack-edge";
4
5export interface SlackMessage {
6 channel: string;
7 blocks?: Block[];
8 text: string;
9 timestamp?: number;
10 status: "pending" | "sent" | "failed";
11}
12
13export default class SlackMessageQueue {
14 private db: Database;
15 private slack: SlackAPIClient;
16 private isProcessing = false;
17 private batchSize = 50;
18 private rateLimitDelay = 1000; // 1 message per second per channel
19 private channelLastMessageTime: Map<string, number> = new Map();
20 private totalMessageCount = 0;
21 private messageCountResetTime = 0;
22 private backoffDelay = 1000;
23 private maxBackoff = 30000; // 30 seconds
24
25 constructor(slackClient: SlackAPIClient, dbPath = "slack-queue.db") {
26 this.slack = slackClient;
27 const dir = dbPath.split("/").slice(0, -1).join("/");
28 if (dir) {
29 try {
30 mkdir(dir);
31 } catch (e) {
32 // Directory may already exist
33 }
34 }
35 this.db = new Database(dbPath);
36 this.initDatabase();
37 this.processQueue();
38 }
39
40 private initDatabase() {
41 this.db.run(`
42 CREATE TABLE IF NOT EXISTS messages (
43 id INTEGER PRIMARY KEY AUTOINCREMENT,
44 channel TEXT NOT NULL,
45 blocks TEXT,
46 text TEXT NOT NULL,
47 timestamp INTEGER NOT NULL,
48 status TEXT NOT NULL
49 )
50 `);
51 this.db.run("CREATE INDEX IF NOT EXISTS idx_status ON messages(status)");
52 }
53
54 async enqueue(message: SlackMessage): Promise<void> {
55 const stmt = this.db.prepare(`
56 INSERT INTO messages (channel, blocks, text, timestamp, status)
57 VALUES (?, ?, ?, ?, ?)
58 `);
59
60 stmt.run(
61 message.channel ?? null,
62 JSON.stringify(message.blocks) ?? null,
63 message.text,
64 Date.now(),
65 "pending",
66 );
67
68 if (!this.isProcessing) {
69 this.processQueue();
70 }
71 }
72
73 private async sleep(ms: number): Promise<void> {
74 return new Promise((resolve) => setTimeout(resolve, ms));
75 }
76
77 private async sendWithRateLimit(
78 message: SlackMessage & { id: number },
79 ): Promise<void> {
80 const now = Date.now();
81
82 // Check per-minute total limit
83 if (now - this.messageCountResetTime >= 60000) {
84 this.totalMessageCount = 0;
85 this.messageCountResetTime = now;
86 }
87
88 if (this.totalMessageCount >= 350) {
89 const waitTime = 60000 - (now - this.messageCountResetTime);
90 await this.sleep(waitTime);
91 this.totalMessageCount = 0;
92 this.messageCountResetTime = Date.now();
93 }
94
95 // Check per-channel rate limit
96 const channelLastTime =
97 this.channelLastMessageTime.get(message.channel) || 0;
98 const timeSinceLastChannelMessage = now - channelLastTime;
99
100 if (timeSinceLastChannelMessage < this.rateLimitDelay) {
101 await this.sleep(this.rateLimitDelay - timeSinceLastChannelMessage);
102 }
103
104 let currentBackoff = this.backoffDelay;
105 let attempts = 0;
106 const maxAttempts = 3;
107
108 while (attempts < maxAttempts) {
109 try {
110 await this.slack.chat.postMessage({
111 channel: message.channel,
112 blocks: JSON.parse(message.blocks as unknown as string) ?? undefined,
113 text: message.text,
114 });
115
116 this.channelLastMessageTime.set(message.channel, Date.now());
117 this.totalMessageCount++;
118
119 this.db
120 .prepare(
121 `
122 UPDATE messages
123 SET status = 'sent'
124 WHERE id = ?
125 `,
126 )
127 .run(message.id);
128
129 return;
130 } catch (error) {
131 console.error(
132 `Error sending message (attempt ${attempts + 1}/${maxAttempts})`,
133 error,
134 );
135 attempts++;
136
137 if (attempts === maxAttempts) {
138 this.db
139 .prepare(
140 `
141 UPDATE messages
142 SET status = 'failed'
143 WHERE id = ?
144 `,
145 )
146 .run(message.id);
147 return;
148 }
149
150 await this.sleep(currentBackoff);
151 currentBackoff = Math.min(currentBackoff * 2, this.maxBackoff);
152 }
153 }
154 }
155
156 private async processQueue() {
157 if (this.isProcessing) return;
158 this.isProcessing = true;
159
160 console.log("Processing queue");
161
162 try {
163 while (true) {
164 const messages = this.db
165 .prepare(
166 `
167 SELECT * FROM messages
168 WHERE status = 'pending'
169 LIMIT ?
170 `,
171 )
172 .all(this.batchSize) as (SlackMessage & { id: number })[];
173
174 if (messages.length === 0) break;
175
176 // Process messages sequentially to maintain rate limiting
177 for (const message of messages) {
178 await this.sendWithRateLimit(message);
179 }
180 }
181 } finally {
182 this.isProcessing = false;
183 }
184 }
185
186 async cleanup(olderThan: number = 7 * 24 * 60 * 60 * 1000) {
187 const cutoff = Date.now() - olderThan;
188 this.db
189 .prepare(
190 `
191 DELETE FROM messages
192 WHERE timestamp < ? AND status != 'pending'
193 `,
194 )
195 .run(cutoff);
196 }
197
198 async queueLength() {
199 const result = this.db
200 .prepare(
201 `
202 SELECT COUNT(*) as count
203 FROM messages
204 WHERE status = 'pending'
205 `,
206 )
207 .get();
208 return (result as { count: number }).count;
209 }
210}