···
1
+
package tangledalertbot
15
+
"github.com/pkg/errors"
19
+
httpClientTimeoutDuration = time.Second * 5
20
+
transportIdleConnTimeoutDuration = time.Second * 90
21
+
baseBskyURL = "https://bsky.social/xrpc"
25
+
AccessJwt string `json:"accessJwt"`
26
+
RefershJWT string `json:"refreshJwt"`
27
+
Did string `json:"did"`
30
+
type accessData struct {
35
+
type ListConvosResponse struct {
36
+
Cursor string `json:"cursor"`
37
+
Convos []Convo `json:"convos"`
41
+
ID string `json:"id"`
42
+
Members []ConvoMember `json:"members"`
43
+
UnreadCount int `json:"unreadCount"`
46
+
type ConvoMember struct {
47
+
Did string `json:"did"`
48
+
Handle string `json:"handle"`
51
+
type ErrorResponse struct {
52
+
Error string `json:"error"`
55
+
type MessageResp struct {
56
+
Messages []Message `json:"messages"`
57
+
Cursor string `json:"cursor"`
60
+
type Message struct {
61
+
ID string `json:"id"`
62
+
Sender MessageSender `json:"sender"`
63
+
Text string `json:"text"`
66
+
type MessageSender struct {
67
+
Did string `json:"did"`
70
+
type UpdateMessageReadRequest struct {
71
+
ConvoID string `json:"convoId"`
72
+
MessageID string `json:"messageId"`
82
+
type DmService struct {
83
+
httpClient *http.Client
84
+
accessData accessData
86
+
timerDuration time.Duration
91
+
func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) {
92
+
httpClient := http.Client{
93
+
Timeout: httpClientTimeoutDuration,
94
+
Transport: &http.Transport{
95
+
IdleConnTimeout: transportIdleConnTimeoutDuration,
99
+
accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE")
100
+
accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD")
101
+
pdsURL := os.Getenv("MESSAGING_PDS_URL")
103
+
service := DmService{
104
+
httpClient: &httpClient,
105
+
accessData: accessData{
106
+
handle: accessHandle,
107
+
appPassword: accessAppPassword,
109
+
timerDuration: timerDuration,
114
+
auth, err := service.Authenicate()
116
+
return nil, fmt.Errorf("authenticating: %w", err)
119
+
service.auth = auth
121
+
return &service, nil
124
+
func (d *DmService) Start(ctx context.Context) {
125
+
go d.RefreshTask(ctx)
127
+
timer := time.NewTimer(d.timerDuration)
133
+
slog.Warn("context canceled - stopping dm task")
136
+
err := d.HandleMessageTimer(ctx)
138
+
slog.Error("handle message timer", "error", err)
140
+
timer.Reset(d.timerDuration)
145
+
func (d *DmService) RefreshTask(ctx context.Context) {
146
+
timer := time.NewTimer(time.Hour)
154
+
err := d.RefreshAuthenication(ctx)
156
+
slog.Error("handle refresh auth timer", "error", err)
157
+
// TODO: better retry with backoff probably
158
+
timer.Reset(time.Minute)
161
+
timer.Reset(time.Hour)
166
+
func (d *DmService) HandleMessageTimer(ctx context.Context) error {
167
+
convoResp, err := d.GetUnreadMessages()
169
+
return fmt.Errorf("get unread messages: %w", err)
172
+
// TODO: handle the cursor pagination
174
+
for _, convo := range convoResp.Convos {
175
+
if convo.UnreadCount == 0 {
179
+
messageResp, err := d.GetMessages(ctx, convo.ID)
181
+
slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID)
185
+
unreadCount := convo.UnreadCount
186
+
unreadMessages := make([]Message, 0, convo.UnreadCount)
187
+
// TODO: handle cursor pagination
188
+
for _, msg := range messageResp.Messages {
189
+
// TODO: techincally if I get to a message that's from the bot account, then there shouldn't be
190
+
// an more unread messages?
191
+
if msg.Sender.Did == d.auth.Did {
195
+
unreadMessages = append(unreadMessages, msg)
197
+
if unreadCount == 0 {
202
+
for _, msg := range unreadMessages {
203
+
d.handleMessage(msg, convo)
205
+
err = d.MarkMessageRead(msg.ID, convo.ID)
207
+
slog.Error("marking message read", "error", err)
216
+
func (d *DmService) handleMessage(msg Message, convo Convo) {
217
+
// TODO: add or remote user the list of "subsribed" users
218
+
if strings.ToLower(msg.Text) == "subscribe" {
220
+
for _, member := range convo.Members {
221
+
if member.Did == msg.Sender.Did {
222
+
userHandle = member.Handle
227
+
if userHandle == "" {
228
+
slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members)
233
+
DID: msg.Sender.Did,
235
+
Handle: userHandle,
236
+
CreatedAt: int(time.Now().UnixMilli()),
239
+
err := d.store.CreateUser(user)
241
+
slog.Error("error creating user", "error", err, "user", user)
247
+
func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) {
248
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL)
249
+
request, err := http.NewRequest("GET", url, nil)
251
+
return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err)
254
+
request.Header.Add("Content-Type", "application/json")
255
+
request.Header.Add("Accept", "application/json")
256
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
257
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
259
+
resp, err := d.httpClient.Do(request)
261
+
return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err)
263
+
defer resp.Body.Close()
265
+
if resp.StatusCode != http.StatusOK {
266
+
var errorResp ErrorResponse
267
+
err = decodeResp(resp.Body, &errorResp)
269
+
return ListConvosResponse{}, err
272
+
return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
275
+
var listConvoResp ListConvosResponse
276
+
err = decodeResp(resp.Body, &listConvoResp)
278
+
return ListConvosResponse{}, err
281
+
return listConvoResp, nil
284
+
func (d *DmService) MarkMessageRead(messageID, convoID string) error {
285
+
bodyReq := UpdateMessageReadRequest{
287
+
MessageID: messageID,
290
+
bodyB, err := json.Marshal(bodyReq)
292
+
return fmt.Errorf("marshal update message request body: %w", err)
295
+
r := bytes.NewReader(bodyB)
297
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL)
298
+
request, err := http.NewRequest("POST", url, r)
300
+
return fmt.Errorf("create new list convos http request: %w", err)
303
+
request.Header.Add("Content-Type", "application/json")
304
+
request.Header.Add("Accept", "application/json")
305
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
306
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
308
+
resp, err := d.httpClient.Do(request)
310
+
return fmt.Errorf("do http request to update message read: %w", err)
312
+
defer resp.Body.Close()
314
+
if resp.StatusCode == http.StatusOK {
318
+
var errorResp ErrorResponse
319
+
err = decodeResp(resp.Body, &errorResp)
324
+
return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
328
+
func (d *DmService) Authenicate() (auth, error) {
329
+
url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL)
331
+
requestData := map[string]interface{}{
332
+
"identifier": d.accessData.handle,
333
+
"password": d.accessData.appPassword,
336
+
data, err := json.Marshal(requestData)
338
+
return auth{}, errors.Wrap(err, "failed to marshal request")
341
+
r := bytes.NewReader(data)
343
+
request, err := http.NewRequest("POST", url, r)
345
+
return auth{}, errors.Wrap(err, "failed to create request")
348
+
request.Header.Add("Content-Type", "application/json")
350
+
resp, err := d.httpClient.Do(request)
352
+
return auth{}, errors.Wrap(err, "failed to make request")
354
+
defer resp.Body.Close()
357
+
err = decodeResp(resp.Body, &loginResp)
362
+
return loginResp, nil
365
+
func (d *DmService) RefreshAuthenication(ctx context.Context) error {
366
+
url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL)
368
+
request, err := http.NewRequest("POST", url, nil)
370
+
return errors.Wrap(err, "failed to create request")
373
+
request.Header.Add("Content-Type", "application/json")
374
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT))
376
+
resp, err := d.httpClient.Do(request)
378
+
return errors.Wrap(err, "failed to make request")
380
+
defer resp.Body.Close()
383
+
err = decodeResp(resp.Body, &loginResp)
393
+
func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) {
394
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID)
395
+
request, err := http.NewRequest("GET", url, nil)
397
+
return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err)
400
+
request.Header.Add("Content-Type", "application/json")
401
+
request.Header.Add("Accept", "application/json")
402
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
403
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
405
+
resp, err := d.httpClient.Do(request)
407
+
return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err)
409
+
defer resp.Body.Close()
411
+
if resp.StatusCode != http.StatusOK {
412
+
var errorResp ErrorResponse
413
+
err = decodeResp(resp.Body, &errorResp)
415
+
return MessageResp{}, err
418
+
return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
421
+
var messageResp MessageResp
422
+
err = decodeResp(resp.Body, &messageResp)
424
+
return MessageResp{}, err
427
+
return messageResp, nil
430
+
func decodeResp(body io.Reader, result any) error {
431
+
resBody, err := io.ReadAll(body)
433
+
return errors.Wrap(err, "failed to read response")
436
+
err = json.Unmarshal(resBody, result)
438
+
return errors.Wrap(err, "failed to unmarshal response")