package tangledalertbot import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "os" "strings" "time" "github.com/pkg/errors" ) const ( httpClientTimeoutDuration = time.Second * 5 transportIdleConnTimeoutDuration = time.Second * 90 baseBskyURL = "https://bsky.social/xrpc" ) type auth struct { AccessJwt string `json:"accessJwt"` RefershJWT string `json:"refreshJwt"` Did string `json:"did"` } type accessData struct { handle string appPassword string } type ListConvosResponse struct { Cursor string `json:"cursor"` Convos []Convo `json:"convos"` } type Convo struct { ID string `json:"id"` Members []ConvoMember `json:"members"` UnreadCount int `json:"unreadCount"` } type ConvoMember struct { Did string `json:"did"` Handle string `json:"handle"` } type ErrorResponse struct { Error string `json:"error"` } type MessageResp struct { Messages []Message `json:"messages"` Cursor string `json:"cursor"` } type Message struct { ID string `json:"id"` Sender MessageSender `json:"sender"` Text string `json:"text"` } type MessageSender struct { Did string `json:"did"` } type UpdateMessageReadRequest struct { ConvoID string `json:"convoId"` MessageID string `json:"messageId"` } type User struct { DID string Handle string ConvoID string CreatedAt int } type DmService struct { httpClient *http.Client accessData accessData auth auth timerDuration time.Duration pdsURL string store Store } func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) { httpClient := http.Client{ Timeout: httpClientTimeoutDuration, Transport: &http.Transport{ IdleConnTimeout: transportIdleConnTimeoutDuration, }, } accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE") accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD") pdsURL := os.Getenv("MESSAGING_PDS_URL") service := DmService{ httpClient: &httpClient, accessData: accessData{ handle: accessHandle, appPassword: accessAppPassword, }, timerDuration: timerDuration, pdsURL: pdsURL, store: store, } auth, err := service.Authenicate() if err != nil { return nil, fmt.Errorf("authenticating: %w", err) } service.auth = auth return &service, nil } func (d *DmService) Start(ctx context.Context) { go d.RefreshTask(ctx) timer := time.NewTimer(d.timerDuration) defer timer.Stop() for { select { case <-ctx.Done(): slog.Warn("context canceled - stopping dm task") return case <-timer.C: err := d.HandleMessageTimer(ctx) if err != nil { slog.Error("handle message timer", "error", err) } timer.Reset(d.timerDuration) } } } func (d *DmService) RefreshTask(ctx context.Context) { timer := time.NewTimer(time.Hour) defer timer.Stop() for { select { case <-ctx.Done(): return case <-timer.C: err := d.RefreshAuthenication(ctx) if err != nil { slog.Error("handle refresh auth timer", "error", err) // TODO: better retry with backoff probably timer.Reset(time.Minute) continue } timer.Reset(time.Hour) } } } func (d *DmService) HandleMessageTimer(ctx context.Context) error { convoResp, err := d.GetUnreadMessages() if err != nil { return fmt.Errorf("get unread messages: %w", err) } // TODO: handle the cursor pagination for _, convo := range convoResp.Convos { if convo.UnreadCount == 0 { continue } messageResp, err := d.GetMessages(ctx, convo.ID) if err != nil { slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID) continue } unreadCount := convo.UnreadCount unreadMessages := make([]Message, 0, convo.UnreadCount) // TODO: handle cursor pagination for _, msg := range messageResp.Messages { // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be // an more unread messages? if msg.Sender.Did == d.auth.Did { continue } unreadMessages = append(unreadMessages, msg) unreadCount-- if unreadCount == 0 { break } } for _, msg := range unreadMessages { d.handleMessage(msg, convo) err = d.MarkMessageRead(msg.ID, convo.ID) if err != nil { slog.Error("marking message read", "error", err) continue } } } return nil } func (d *DmService) handleMessage(msg Message, convo Convo) { // TODO: add or remote user the list of "subsribed" users if strings.ToLower(msg.Text) == "subscribe" { userHandle := "" for _, member := range convo.Members { if member.Did == msg.Sender.Did { userHandle = member.Handle break } } if userHandle == "" { slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members) return } user := User{ DID: msg.Sender.Did, ConvoID: convo.ID, Handle: userHandle, CreatedAt: int(time.Now().UnixMilli()), } err := d.store.CreateUser(user) if err != nil { slog.Error("error creating user", "error", err, "user", user) return } } } func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) { url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL) request, err := http.NewRequest("GET", url, nil) if err != nil { return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err) } request.Header.Add("Content-Type", "application/json") request.Header.Add("Accept", "application/json") request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) resp, err := d.httpClient.Do(request) if err != nil { return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { var errorResp ErrorResponse err = decodeResp(resp.Body, &errorResp) if err != nil { return ListConvosResponse{}, err } return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) } var listConvoResp ListConvosResponse err = decodeResp(resp.Body, &listConvoResp) if err != nil { return ListConvosResponse{}, err } return listConvoResp, nil } func (d *DmService) MarkMessageRead(messageID, convoID string) error { bodyReq := UpdateMessageReadRequest{ ConvoID: convoID, MessageID: messageID, } bodyB, err := json.Marshal(bodyReq) if err != nil { return fmt.Errorf("marshal update message request body: %w", err) } r := bytes.NewReader(bodyB) url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL) request, err := http.NewRequest("POST", url, r) if err != nil { return fmt.Errorf("create new list convos http request: %w", err) } request.Header.Add("Content-Type", "application/json") request.Header.Add("Accept", "application/json") request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) resp, err := d.httpClient.Do(request) if err != nil { return fmt.Errorf("do http request to update message read: %w", err) } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { return nil } var errorResp ErrorResponse err = decodeResp(resp.Body, &errorResp) if err != nil { return err } return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) } func (d *DmService) Authenicate() (auth, error) { url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL) requestData := map[string]interface{}{ "identifier": d.accessData.handle, "password": d.accessData.appPassword, } data, err := json.Marshal(requestData) if err != nil { return auth{}, errors.Wrap(err, "failed to marshal request") } r := bytes.NewReader(data) request, err := http.NewRequest("POST", url, r) if err != nil { return auth{}, errors.Wrap(err, "failed to create request") } request.Header.Add("Content-Type", "application/json") resp, err := d.httpClient.Do(request) if err != nil { return auth{}, errors.Wrap(err, "failed to make request") } defer resp.Body.Close() var loginResp auth err = decodeResp(resp.Body, &loginResp) if err != nil { return auth{}, err } return loginResp, nil } func (d *DmService) RefreshAuthenication(ctx context.Context) error { url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL) request, err := http.NewRequest("POST", url, nil) if err != nil { return errors.Wrap(err, "failed to create request") } request.Header.Add("Content-Type", "application/json") request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT)) resp, err := d.httpClient.Do(request) if err != nil { return errors.Wrap(err, "failed to make request") } defer resp.Body.Close() var loginResp auth err = decodeResp(resp.Body, &loginResp) if err != nil { return err } d.auth = loginResp return nil } func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) { url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID) request, err := http.NewRequest("GET", url, nil) if err != nil { return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err) } request.Header.Add("Content-Type", "application/json") request.Header.Add("Accept", "application/json") request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) resp, err := d.httpClient.Do(request) if err != nil { return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { var errorResp ErrorResponse err = decodeResp(resp.Body, &errorResp) if err != nil { return MessageResp{}, err } return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) } var messageResp MessageResp err = decodeResp(resp.Body, &messageResp) if err != nil { return MessageResp{}, err } return messageResp, nil } func decodeResp(body io.Reader, result any) error { resBody, err := io.ReadAll(body) if err != nil { return errors.Wrap(err, "failed to read response") } err = json.Unmarshal(resBody, result) if err != nil { return errors.Wrap(err, "failed to unmarshal response") } return nil }