this repo has no description
1package tangledalertbot
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12 "strings"
13 "time"
14
15 "github.com/pkg/errors"
16)
17
18const (
19 httpClientTimeoutDuration = time.Second * 5
20 transportIdleConnTimeoutDuration = time.Second * 90
21 baseBskyURL = "https://bsky.social/xrpc"
22)
23
24type auth struct {
25 AccessJwt string `json:"accessJwt"`
26 RefershJWT string `json:"refreshJwt"`
27 Did string `json:"did"`
28}
29
30type accessData struct {
31 handle string
32 appPassword string
33}
34
35type ListConvosResponse struct {
36 Cursor string `json:"cursor"`
37 Convos []Convo `json:"convos"`
38}
39
40type Convo struct {
41 ID string `json:"id"`
42 Members []ConvoMember `json:"members"`
43 UnreadCount int `json:"unreadCount"`
44}
45
46type ConvoMember struct {
47 Did string `json:"did"`
48 Handle string `json:"handle"`
49}
50
51type ErrorResponse struct {
52 Error string `json:"error"`
53}
54
55type MessageResp struct {
56 Messages []Message `json:"messages"`
57 Cursor string `json:"cursor"`
58}
59
60type Message struct {
61 ID string `json:"id"`
62 Sender MessageSender `json:"sender"`
63 Text string `json:"text"`
64}
65
66type MessageSender struct {
67 Did string `json:"did"`
68}
69
70type UpdateMessageReadRequest struct {
71 ConvoID string `json:"convoId"`
72 MessageID string `json:"messageId"`
73}
74
75type User struct {
76 DID string
77 Handle string
78 ConvoID string
79 CreatedAt int
80}
81
82type DmService struct {
83 httpClient *http.Client
84 accessData accessData
85 auth auth
86 timerDuration time.Duration
87 pdsURL string
88 store Store
89}
90
91func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) {
92 httpClient := http.Client{
93 Timeout: httpClientTimeoutDuration,
94 Transport: &http.Transport{
95 IdleConnTimeout: transportIdleConnTimeoutDuration,
96 },
97 }
98
99 accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE")
100 accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD")
101 pdsURL := os.Getenv("MESSAGING_PDS_URL")
102
103 service := DmService{
104 httpClient: &httpClient,
105 accessData: accessData{
106 handle: accessHandle,
107 appPassword: accessAppPassword,
108 },
109 timerDuration: timerDuration,
110 pdsURL: pdsURL,
111 store: store,
112 }
113
114 auth, err := service.Authenicate()
115 if err != nil {
116 return nil, fmt.Errorf("authenticating: %w", err)
117 }
118
119 service.auth = auth
120
121 return &service, nil
122}
123
124func (d *DmService) Start(ctx context.Context) {
125 go d.RefreshTask(ctx)
126
127 timer := time.NewTimer(d.timerDuration)
128 defer timer.Stop()
129
130 for {
131 select {
132 case <-ctx.Done():
133 slog.Warn("context canceled - stopping dm task")
134 return
135 case <-timer.C:
136 err := d.HandleMessageTimer(ctx)
137 if err != nil {
138 slog.Error("handle message timer", "error", err)
139 }
140 timer.Reset(d.timerDuration)
141 }
142 }
143}
144
145func (d *DmService) RefreshTask(ctx context.Context) {
146 timer := time.NewTimer(time.Hour)
147 defer timer.Stop()
148
149 for {
150 select {
151 case <-ctx.Done():
152 return
153 case <-timer.C:
154 err := d.RefreshAuthenication(ctx)
155 if err != nil {
156 slog.Error("handle refresh auth timer", "error", err)
157 // TODO: better retry with backoff probably
158 timer.Reset(time.Minute)
159 continue
160 }
161 timer.Reset(time.Hour)
162 }
163 }
164}
165
166func (d *DmService) HandleMessageTimer(ctx context.Context) error {
167 convoResp, err := d.GetUnreadMessages()
168 if err != nil {
169 return fmt.Errorf("get unread messages: %w", err)
170 }
171
172 // TODO: handle the cursor pagination
173
174 for _, convo := range convoResp.Convos {
175 if convo.UnreadCount == 0 {
176 continue
177 }
178
179 messageResp, err := d.GetMessages(ctx, convo.ID)
180 if err != nil {
181 slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID)
182 continue
183 }
184
185 unreadCount := convo.UnreadCount
186 unreadMessages := make([]Message, 0, convo.UnreadCount)
187 // TODO: handle cursor pagination
188 for _, msg := range messageResp.Messages {
189 // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be
190 // an more unread messages?
191 if msg.Sender.Did == d.auth.Did {
192 continue
193 }
194
195 unreadMessages = append(unreadMessages, msg)
196 unreadCount--
197 if unreadCount == 0 {
198 break
199 }
200 }
201
202 for _, msg := range unreadMessages {
203 d.handleMessage(msg, convo)
204
205 err = d.MarkMessageRead(msg.ID, convo.ID)
206 if err != nil {
207 slog.Error("marking message read", "error", err)
208 continue
209 }
210 }
211 }
212
213 return nil
214}
215
216func (d *DmService) handleMessage(msg Message, convo Convo) {
217 // TODO: add or remote user the list of "subsribed" users
218 if strings.ToLower(msg.Text) == "subscribe" {
219 userHandle := ""
220 for _, member := range convo.Members {
221 if member.Did == msg.Sender.Did {
222 userHandle = member.Handle
223 break
224 }
225 }
226
227 if userHandle == "" {
228 slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members)
229 return
230 }
231
232 user := User{
233 DID: msg.Sender.Did,
234 ConvoID: convo.ID,
235 Handle: userHandle,
236 CreatedAt: int(time.Now().UnixMilli()),
237 }
238
239 err := d.store.CreateUser(user)
240 if err != nil {
241 slog.Error("error creating user", "error", err, "user", user)
242 return
243 }
244 }
245}
246
247func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) {
248 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL)
249 request, err := http.NewRequest("GET", url, nil)
250 if err != nil {
251 return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err)
252 }
253
254 request.Header.Add("Content-Type", "application/json")
255 request.Header.Add("Accept", "application/json")
256 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
257 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
258
259 resp, err := d.httpClient.Do(request)
260 if err != nil {
261 return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err)
262 }
263 defer resp.Body.Close()
264
265 if resp.StatusCode != http.StatusOK {
266 var errorResp ErrorResponse
267 err = decodeResp(resp.Body, &errorResp)
268 if err != nil {
269 return ListConvosResponse{}, err
270 }
271
272 return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
273 }
274
275 var listConvoResp ListConvosResponse
276 err = decodeResp(resp.Body, &listConvoResp)
277 if err != nil {
278 return ListConvosResponse{}, err
279 }
280
281 return listConvoResp, nil
282}
283
284func (d *DmService) MarkMessageRead(messageID, convoID string) error {
285 bodyReq := UpdateMessageReadRequest{
286 ConvoID: convoID,
287 MessageID: messageID,
288 }
289
290 bodyB, err := json.Marshal(bodyReq)
291 if err != nil {
292 return fmt.Errorf("marshal update message request body: %w", err)
293 }
294
295 r := bytes.NewReader(bodyB)
296
297 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL)
298 request, err := http.NewRequest("POST", url, r)
299 if err != nil {
300 return fmt.Errorf("create new list convos http request: %w", err)
301 }
302
303 request.Header.Add("Content-Type", "application/json")
304 request.Header.Add("Accept", "application/json")
305 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
306 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
307
308 resp, err := d.httpClient.Do(request)
309 if err != nil {
310 return fmt.Errorf("do http request to update message read: %w", err)
311 }
312 defer resp.Body.Close()
313
314 if resp.StatusCode == http.StatusOK {
315 return nil
316 }
317
318 var errorResp ErrorResponse
319 err = decodeResp(resp.Body, &errorResp)
320 if err != nil {
321 return err
322 }
323
324 return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
325
326}
327
328func (d *DmService) Authenicate() (auth, error) {
329 url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL)
330
331 requestData := map[string]interface{}{
332 "identifier": d.accessData.handle,
333 "password": d.accessData.appPassword,
334 }
335
336 data, err := json.Marshal(requestData)
337 if err != nil {
338 return auth{}, errors.Wrap(err, "failed to marshal request")
339 }
340
341 r := bytes.NewReader(data)
342
343 request, err := http.NewRequest("POST", url, r)
344 if err != nil {
345 return auth{}, errors.Wrap(err, "failed to create request")
346 }
347
348 request.Header.Add("Content-Type", "application/json")
349
350 resp, err := d.httpClient.Do(request)
351 if err != nil {
352 return auth{}, errors.Wrap(err, "failed to make request")
353 }
354 defer resp.Body.Close()
355
356 var loginResp auth
357 err = decodeResp(resp.Body, &loginResp)
358 if err != nil {
359 return auth{}, err
360 }
361
362 return loginResp, nil
363}
364
365func (d *DmService) RefreshAuthenication(ctx context.Context) error {
366 url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL)
367
368 request, err := http.NewRequest("POST", url, nil)
369 if err != nil {
370 return errors.Wrap(err, "failed to create request")
371 }
372
373 request.Header.Add("Content-Type", "application/json")
374 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT))
375
376 resp, err := d.httpClient.Do(request)
377 if err != nil {
378 return errors.Wrap(err, "failed to make request")
379 }
380 defer resp.Body.Close()
381
382 var loginResp auth
383 err = decodeResp(resp.Body, &loginResp)
384 if err != nil {
385 return err
386 }
387
388 d.auth = loginResp
389
390 return nil
391}
392
393func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) {
394 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID)
395 request, err := http.NewRequest("GET", url, nil)
396 if err != nil {
397 return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err)
398 }
399
400 request.Header.Add("Content-Type", "application/json")
401 request.Header.Add("Accept", "application/json")
402 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
403 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
404
405 resp, err := d.httpClient.Do(request)
406 if err != nil {
407 return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err)
408 }
409 defer resp.Body.Close()
410
411 if resp.StatusCode != http.StatusOK {
412 var errorResp ErrorResponse
413 err = decodeResp(resp.Body, &errorResp)
414 if err != nil {
415 return MessageResp{}, err
416 }
417
418 return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
419 }
420
421 var messageResp MessageResp
422 err = decodeResp(resp.Body, &messageResp)
423 if err != nil {
424 return MessageResp{}, err
425 }
426
427 return messageResp, nil
428}
429
430func decodeResp(body io.Reader, result any) error {
431 resBody, err := io.ReadAll(body)
432 if err != nil {
433 return errors.Wrap(err, "failed to read response")
434 }
435
436 err = json.Unmarshal(resBody, result)
437 if err != nil {
438 return errors.Wrap(err, "failed to unmarshal response")
439 }
440 return nil
441}