this repo has no description
1package tangledalertbot 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 "strings" 13 "time" 14 15 "github.com/pkg/errors" 16) 17 18const ( 19 httpClientTimeoutDuration = time.Second * 5 20 transportIdleConnTimeoutDuration = time.Second * 90 21 baseBskyURL = "https://bsky.social/xrpc" 22) 23 24type auth struct { 25 AccessJwt string `json:"accessJwt"` 26 RefershJWT string `json:"refreshJwt"` 27 Did string `json:"did"` 28} 29 30type accessData struct { 31 handle string 32 appPassword string 33} 34 35type ListConvosResponse struct { 36 Cursor string `json:"cursor"` 37 Convos []Convo `json:"convos"` 38} 39 40type Convo struct { 41 ID string `json:"id"` 42 Members []ConvoMember `json:"members"` 43 UnreadCount int `json:"unreadCount"` 44} 45 46type ConvoMember struct { 47 Did string `json:"did"` 48 Handle string `json:"handle"` 49} 50 51type ErrorResponse struct { 52 Error string `json:"error"` 53} 54 55type MessageResp struct { 56 Messages []Message `json:"messages"` 57 Cursor string `json:"cursor"` 58} 59 60type Message struct { 61 ID string `json:"id"` 62 Sender MessageSender `json:"sender"` 63 Text string `json:"text"` 64} 65 66type MessageSender struct { 67 Did string `json:"did"` 68} 69 70type UpdateMessageReadRequest struct { 71 ConvoID string `json:"convoId"` 72 MessageID string `json:"messageId"` 73} 74 75type User struct { 76 DID string 77 Handle string 78 ConvoID string 79 CreatedAt int 80} 81 82type DmService struct { 83 httpClient *http.Client 84 accessData accessData 85 auth auth 86 timerDuration time.Duration 87 pdsURL string 88 store Store 89} 90 91func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) { 92 httpClient := http.Client{ 93 Timeout: httpClientTimeoutDuration, 94 Transport: &http.Transport{ 95 IdleConnTimeout: transportIdleConnTimeoutDuration, 96 }, 97 } 98 99 accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE") 100 accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD") 101 pdsURL := os.Getenv("MESSAGING_PDS_URL") 102 103 service := DmService{ 104 httpClient: &httpClient, 105 accessData: accessData{ 106 handle: accessHandle, 107 appPassword: accessAppPassword, 108 }, 109 timerDuration: timerDuration, 110 pdsURL: pdsURL, 111 store: store, 112 } 113 114 auth, err := service.Authenicate() 115 if err != nil { 116 return nil, fmt.Errorf("authenticating: %w", err) 117 } 118 119 service.auth = auth 120 121 return &service, nil 122} 123 124func (d *DmService) Start(ctx context.Context) { 125 go d.RefreshTask(ctx) 126 127 timer := time.NewTimer(d.timerDuration) 128 defer timer.Stop() 129 130 for { 131 select { 132 case <-ctx.Done(): 133 slog.Warn("context canceled - stopping dm task") 134 return 135 case <-timer.C: 136 err := d.HandleMessageTimer(ctx) 137 if err != nil { 138 slog.Error("handle message timer", "error", err) 139 } 140 timer.Reset(d.timerDuration) 141 } 142 } 143} 144 145func (d *DmService) RefreshTask(ctx context.Context) { 146 timer := time.NewTimer(time.Hour) 147 defer timer.Stop() 148 149 for { 150 select { 151 case <-ctx.Done(): 152 return 153 case <-timer.C: 154 err := d.RefreshAuthenication(ctx) 155 if err != nil { 156 slog.Error("handle refresh auth timer", "error", err) 157 // TODO: better retry with backoff probably 158 timer.Reset(time.Minute) 159 continue 160 } 161 timer.Reset(time.Hour) 162 } 163 } 164} 165 166func (d *DmService) HandleMessageTimer(ctx context.Context) error { 167 convoResp, err := d.GetUnreadMessages() 168 if err != nil { 169 return fmt.Errorf("get unread messages: %w", err) 170 } 171 172 // TODO: handle the cursor pagination 173 174 for _, convo := range convoResp.Convos { 175 if convo.UnreadCount == 0 { 176 continue 177 } 178 179 messageResp, err := d.GetMessages(ctx, convo.ID) 180 if err != nil { 181 slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID) 182 continue 183 } 184 185 unreadCount := convo.UnreadCount 186 unreadMessages := make([]Message, 0, convo.UnreadCount) 187 // TODO: handle cursor pagination 188 for _, msg := range messageResp.Messages { 189 // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be 190 // an more unread messages? 191 if msg.Sender.Did == d.auth.Did { 192 continue 193 } 194 195 unreadMessages = append(unreadMessages, msg) 196 unreadCount-- 197 if unreadCount == 0 { 198 break 199 } 200 } 201 202 for _, msg := range unreadMessages { 203 d.handleMessage(msg, convo) 204 205 err = d.MarkMessageRead(msg.ID, convo.ID) 206 if err != nil { 207 slog.Error("marking message read", "error", err) 208 continue 209 } 210 } 211 } 212 213 return nil 214} 215 216func (d *DmService) handleMessage(msg Message, convo Convo) { 217 // TODO: add or remote user the list of "subsribed" users 218 if strings.ToLower(msg.Text) == "subscribe" { 219 userHandle := "" 220 for _, member := range convo.Members { 221 if member.Did == msg.Sender.Did { 222 userHandle = member.Handle 223 break 224 } 225 } 226 227 if userHandle == "" { 228 slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members) 229 return 230 } 231 232 user := User{ 233 DID: msg.Sender.Did, 234 ConvoID: convo.ID, 235 Handle: userHandle, 236 CreatedAt: int(time.Now().UnixMilli()), 237 } 238 239 err := d.store.CreateUser(user) 240 if err != nil { 241 slog.Error("error creating user", "error", err, "user", user) 242 return 243 } 244 } 245} 246 247func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) { 248 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL) 249 request, err := http.NewRequest("GET", url, nil) 250 if err != nil { 251 return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err) 252 } 253 254 request.Header.Add("Content-Type", "application/json") 255 request.Header.Add("Accept", "application/json") 256 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 257 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 258 259 resp, err := d.httpClient.Do(request) 260 if err != nil { 261 return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err) 262 } 263 defer resp.Body.Close() 264 265 if resp.StatusCode != http.StatusOK { 266 var errorResp ErrorResponse 267 err = decodeResp(resp.Body, &errorResp) 268 if err != nil { 269 return ListConvosResponse{}, err 270 } 271 272 return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 273 } 274 275 var listConvoResp ListConvosResponse 276 err = decodeResp(resp.Body, &listConvoResp) 277 if err != nil { 278 return ListConvosResponse{}, err 279 } 280 281 return listConvoResp, nil 282} 283 284func (d *DmService) MarkMessageRead(messageID, convoID string) error { 285 bodyReq := UpdateMessageReadRequest{ 286 ConvoID: convoID, 287 MessageID: messageID, 288 } 289 290 bodyB, err := json.Marshal(bodyReq) 291 if err != nil { 292 return fmt.Errorf("marshal update message request body: %w", err) 293 } 294 295 r := bytes.NewReader(bodyB) 296 297 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL) 298 request, err := http.NewRequest("POST", url, r) 299 if err != nil { 300 return fmt.Errorf("create new list convos http request: %w", err) 301 } 302 303 request.Header.Add("Content-Type", "application/json") 304 request.Header.Add("Accept", "application/json") 305 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 306 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 307 308 resp, err := d.httpClient.Do(request) 309 if err != nil { 310 return fmt.Errorf("do http request to update message read: %w", err) 311 } 312 defer resp.Body.Close() 313 314 if resp.StatusCode == http.StatusOK { 315 return nil 316 } 317 318 var errorResp ErrorResponse 319 err = decodeResp(resp.Body, &errorResp) 320 if err != nil { 321 return err 322 } 323 324 return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 325 326} 327 328func (d *DmService) Authenicate() (auth, error) { 329 url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL) 330 331 requestData := map[string]interface{}{ 332 "identifier": d.accessData.handle, 333 "password": d.accessData.appPassword, 334 } 335 336 data, err := json.Marshal(requestData) 337 if err != nil { 338 return auth{}, errors.Wrap(err, "failed to marshal request") 339 } 340 341 r := bytes.NewReader(data) 342 343 request, err := http.NewRequest("POST", url, r) 344 if err != nil { 345 return auth{}, errors.Wrap(err, "failed to create request") 346 } 347 348 request.Header.Add("Content-Type", "application/json") 349 350 resp, err := d.httpClient.Do(request) 351 if err != nil { 352 return auth{}, errors.Wrap(err, "failed to make request") 353 } 354 defer resp.Body.Close() 355 356 var loginResp auth 357 err = decodeResp(resp.Body, &loginResp) 358 if err != nil { 359 return auth{}, err 360 } 361 362 return loginResp, nil 363} 364 365func (d *DmService) RefreshAuthenication(ctx context.Context) error { 366 url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL) 367 368 request, err := http.NewRequest("POST", url, nil) 369 if err != nil { 370 return errors.Wrap(err, "failed to create request") 371 } 372 373 request.Header.Add("Content-Type", "application/json") 374 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT)) 375 376 resp, err := d.httpClient.Do(request) 377 if err != nil { 378 return errors.Wrap(err, "failed to make request") 379 } 380 defer resp.Body.Close() 381 382 var loginResp auth 383 err = decodeResp(resp.Body, &loginResp) 384 if err != nil { 385 return err 386 } 387 388 d.auth = loginResp 389 390 return nil 391} 392 393func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) { 394 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID) 395 request, err := http.NewRequest("GET", url, nil) 396 if err != nil { 397 return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err) 398 } 399 400 request.Header.Add("Content-Type", "application/json") 401 request.Header.Add("Accept", "application/json") 402 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 403 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 404 405 resp, err := d.httpClient.Do(request) 406 if err != nil { 407 return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err) 408 } 409 defer resp.Body.Close() 410 411 if resp.StatusCode != http.StatusOK { 412 var errorResp ErrorResponse 413 err = decodeResp(resp.Body, &errorResp) 414 if err != nil { 415 return MessageResp{}, err 416 } 417 418 return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 419 } 420 421 var messageResp MessageResp 422 err = decodeResp(resp.Body, &messageResp) 423 if err != nil { 424 return MessageResp{}, err 425 } 426 427 return messageResp, nil 428} 429 430func decodeResp(body io.Reader, result any) error { 431 resBody, err := io.ReadAll(body) 432 if err != nil { 433 return errors.Wrap(err, "failed to read response") 434 } 435 436 err = json.Unmarshal(resBody, result) 437 if err != nil { 438 return errors.Wrap(err, "failed to unmarshal response") 439 } 440 return nil 441}