this repo has no description
at send-alerts 8.7 kB view raw
1package tangledalertbot 2 3import ( 4 "context" 5 "encoding/json" 6 "strings" 7 8 "fmt" 9 "log/slog" 10 "time" 11 12 "github.com/bluesky-social/jetstream/pkg/client" 13 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 14 "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/bugsnag/bugsnag-go" 16 "tangled.org/core/api/tangled" 17) 18 19type Issue struct { 20 AuthorDID string `json:"authorDID"` 21 RKey string `json:"rkey"` 22 Title string `json:"title"` 23 Body string `json:"body"` 24 Repo string `json:"repo"` 25 CreatedAt int64 `json:"createdAt"` 26} 27 28type Comment struct { 29 AuthorDID string `json:"authorDID"` 30 RKey string `json:"rkey"` 31 Body string `json:"body"` 32 Issue string `json:"issue" ` 33 CreatedAt int64 `json:"createdAt"` 34} 35 36type Store interface { 37 CreateIssue(issue Issue) error 38 CreateComment(comment Comment) error 39 DeleteIssue(did, rkey string) error 40 DeleteComment(did, rkey string) error 41 DeleteCommentsForIssue(issueURI string) error 42 GetUser(did string) (User, error) 43 CreateUser(user User) error 44} 45 46// JetstreamConsumer is responsible for consuming from a jetstream instance 47type JetstreamConsumer struct { 48 cfg *client.ClientConfig 49 handler *Handler 50 logger *slog.Logger 51} 52 53// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function 54func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 55 cfg := client.DefaultClientConfig() 56 if jsAddr != "" { 57 cfg.WebsocketURL = jsAddr 58 } 59 cfg.WantedCollections = []string{ 60 tangled.RepoIssueNSID, 61 tangled.RepoIssueCommentNSID, 62 } 63 cfg.WantedDids = []string{} 64 65 return &JetstreamConsumer{ 66 cfg: cfg, 67 logger: logger, 68 handler: handler, 69 } 70} 71 72// Consume will connect to a Jetstream client and start to consume and handle messages from it 73func (c *JetstreamConsumer) Consume(ctx context.Context) error { 74 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) 75 defer scheduler.Shutdown() 76 77 client, err := client.NewClient(c.cfg, c.logger, scheduler) 78 if err != nil { 79 return fmt.Errorf("failed to create client: %w", err) 80 } 81 82 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 83 84 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 85 return fmt.Errorf("connect and read: %w", err) 86 } 87 88 slog.Info("stopping consume") 89 return nil 90} 91 92// Handler is responsible for handling a message consumed from Jetstream 93type Handler struct { 94 store Store 95} 96 97// NewFeedHandler returns a new handler 98func NewFeedHandler(store Store) *Handler { 99 return &Handler{store: store} 100} 101 102// HandleEvent will handle an event based on the event's commit operation 103func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { 104 if event.Commit == nil { 105 return nil 106 } 107 108 switch event.Commit.Operation { 109 case models.CommitOperationCreate, models.CommitOperationUpdate: 110 return h.handleCreateUpdateEvent(ctx, event) 111 case models.CommitOperationDelete: 112 return h.handleDeleteEvent(ctx, event) 113 default: 114 return nil 115 } 116} 117 118func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error { 119 switch event.Commit.Collection { 120 case tangled.RepoIssueNSID: 121 h.handleCreateUpdateIssueEvent(ctx, event) 122 case tangled.RepoIssueCommentNSID: 123 h.handleCreateUpdateIssueCommentEvent(ctx, event) 124 default: 125 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 126 return nil 127 } 128 129 return nil 130} 131 132func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error { 133 switch event.Commit.Collection { 134 case tangled.RepoIssueNSID: 135 h.handleDeleteIssueEvent(ctx, event) 136 case tangled.RepoIssueCommentNSID: 137 h.handleDeleteIssueCommentEvent(ctx, event) 138 default: 139 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 140 return nil 141 } 142 143 return nil 144} 145 146func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) { 147 var issue tangled.RepoIssue 148 149 err := json.Unmarshal(event.Commit.Record, &issue) 150 if err != nil { 151 bugsnag.Notify(err) 152 slog.Error("error unmarshalling event record to issue", "error", err) 153 return 154 } 155 156 did := event.Did 157 rkey := event.Commit.RKey 158 159 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt) 160 if err != nil { 161 bugsnag.Notify(err) 162 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt) 163 createdAt = time.Now().UTC() 164 } 165 body := "" 166 if issue.Body != nil { 167 body = *issue.Body 168 } 169 err = h.store.CreateIssue(Issue{ 170 AuthorDID: did, 171 RKey: rkey, 172 Title: issue.Title, 173 Body: body, 174 CreatedAt: createdAt.UnixMilli(), 175 Repo: issue.Repo, 176 }) 177 if err != nil { 178 bugsnag.Notify(err) 179 slog.Error("create issue", "error", err, "did", did, "rkey", rkey) 180 return 181 } 182 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) 183} 184 185func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) { 186 var comment tangled.RepoIssueComment 187 188 err := json.Unmarshal(event.Commit.Record, &comment) 189 if err != nil { 190 bugsnag.Notify(err) 191 slog.Error("error unmarshalling event record to comment", "error", err) 192 return 193 } 194 195 did := event.Did 196 rkey := event.Commit.RKey 197 198 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt) 199 if err != nil { 200 bugsnag.Notify(err) 201 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) 202 createdAt = time.Now().UTC() 203 } 204 205 // if there is a replyTo present, don't store the comment because replies can't be replied to so 206 // the reply comment doesn't need to be stored 207 if comment.ReplyTo == nil || *comment.ReplyTo == "" { 208 err = h.store.CreateComment(Comment{ 209 AuthorDID: did, 210 RKey: rkey, 211 Body: comment.Body, 212 Issue: comment.Issue, 213 CreatedAt: createdAt.UnixMilli(), 214 }) 215 if err != nil { 216 bugsnag.Notify(err) 217 slog.Error("create comment", "error", err, "did", did, "rkey", rkey) 218 return 219 } 220 } 221 222 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 223 didToNotify := getUserToAlert(comment) 224 if didToNotify == "" { 225 slog.Info("could not work out did to send alert to", "comment", comment) 226 return 227 } 228 229 user, err := h.store.GetUser(didToNotify) 230 if err != nil { 231 slog.Error("getting user to send alert to", "error", err, "did", didToNotify) 232 return 233 } 234 235 slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID) 236} 237 238func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) { 239 did := event.Did 240 rkey := event.Commit.RKey 241 242 err := h.store.DeleteIssue(did, rkey) 243 if err != nil { 244 bugsnag.Notify(err) 245 slog.Error("delete issue", "error", err, "did", did, "rkey", rkey) 246 return 247 } 248 249 // now attempt to delete any comments on that issue since they can't be replied to now. 250 // Note: if unsuccessful it doesn't matter because a deleted issue and its comments are 251 // not visible on the UI and so no one will be able to reply to them so this is just a 252 // cleanup operation 253 issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey) 254 err = h.store.DeleteCommentsForIssue(issueURI) 255 if err != nil { 256 bugsnag.Notify(err) 257 slog.Error("delete comments for issue", "error", err, "issue URI", issueURI) 258 } 259 260 slog.Info("deleted issue ", "did", did, "rkey", rkey) 261} 262 263func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) { 264 did := event.Did 265 rkey := event.Commit.RKey 266 267 err := h.store.DeleteComment(did, rkey) 268 if err != nil { 269 bugsnag.Notify(err) 270 slog.Error("delete comment", "error", err, "did", did, "rkey", rkey) 271 return 272 } 273 274 slog.Info("deleted comment ", "did", did, "rkey", rkey) 275} 276 277// at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22 278func getUserToAlert(comment tangled.RepoIssueComment) string { 279 if comment.ReplyTo != nil { 280 return getDidFromCommentURI(*comment.ReplyTo) 281 } 282 return getDidFromIssueURI(comment.Issue) 283} 284 285func getDidFromCommentURI(uri string) string { 286 split := strings.Split(uri, tangled.RepoIssueCommentNSID) 287 if len(split) != 2 { 288 slog.Error("invalid comment URI received", "uri", uri) 289 return "" 290 } 291 292 did := strings.TrimPrefix(split[0], "at://") 293 did = strings.TrimSuffix(did, "/") 294 295 return did 296} 297 298func getDidFromIssueURI(uri string) string { 299 split := strings.Split(uri, tangled.RepoIssueNSID) 300 if len(split) != 2 { 301 slog.Error("invalid issue URI received", "uri", uri) 302 return "" 303 } 304 305 did := strings.TrimPrefix(split[0], "at://") 306 did = strings.TrimSuffix(did, "/") 307 308 return did 309}