this repo has no description
1package tangledalertbot
2
3import (
4 "context"
5 "encoding/json"
6 "strings"
7
8 "fmt"
9 "log/slog"
10 "time"
11
12 "github.com/bluesky-social/jetstream/pkg/client"
13 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
14 "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/bugsnag/bugsnag-go"
16 "tangled.org/core/api/tangled"
17)
18
19type Issue struct {
20 AuthorDID string `json:"authorDID"`
21 RKey string `json:"rkey"`
22 Title string `json:"title"`
23 Body string `json:"body"`
24 Repo string `json:"repo"`
25 CreatedAt int64 `json:"createdAt"`
26}
27
28type Comment struct {
29 AuthorDID string `json:"authorDID"`
30 RKey string `json:"rkey"`
31 Body string `json:"body"`
32 Issue string `json:"issue" `
33 CreatedAt int64 `json:"createdAt"`
34}
35
36type Store interface {
37 CreateIssue(issue Issue) error
38 CreateComment(comment Comment) error
39 DeleteIssue(did, rkey string) error
40 DeleteComment(did, rkey string) error
41 DeleteCommentsForIssue(issueURI string) error
42 GetUser(did string) (User, error)
43 CreateUser(user User) error
44}
45
46// JetstreamConsumer is responsible for consuming from a jetstream instance
47type JetstreamConsumer struct {
48 cfg *client.ClientConfig
49 handler *Handler
50 logger *slog.Logger
51}
52
53// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function
54func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer {
55 cfg := client.DefaultClientConfig()
56 if jsAddr != "" {
57 cfg.WebsocketURL = jsAddr
58 }
59 cfg.WantedCollections = []string{
60 tangled.RepoIssueNSID,
61 tangled.RepoIssueCommentNSID,
62 }
63 cfg.WantedDids = []string{}
64
65 return &JetstreamConsumer{
66 cfg: cfg,
67 logger: logger,
68 handler: handler,
69 }
70}
71
72// Consume will connect to a Jetstream client and start to consume and handle messages from it
73func (c *JetstreamConsumer) Consume(ctx context.Context) error {
74 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent)
75 defer scheduler.Shutdown()
76
77 client, err := client.NewClient(c.cfg, c.logger, scheduler)
78 if err != nil {
79 return fmt.Errorf("failed to create client: %w", err)
80 }
81
82 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
83
84 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
85 return fmt.Errorf("connect and read: %w", err)
86 }
87
88 slog.Info("stopping consume")
89 return nil
90}
91
92// Handler is responsible for handling a message consumed from Jetstream
93type Handler struct {
94 store Store
95}
96
97// NewFeedHandler returns a new handler
98func NewFeedHandler(store Store) *Handler {
99 return &Handler{store: store}
100}
101
102// HandleEvent will handle an event based on the event's commit operation
103func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error {
104 if event.Commit == nil {
105 return nil
106 }
107
108 switch event.Commit.Operation {
109 case models.CommitOperationCreate, models.CommitOperationUpdate:
110 return h.handleCreateUpdateEvent(ctx, event)
111 case models.CommitOperationDelete:
112 return h.handleDeleteEvent(ctx, event)
113 default:
114 return nil
115 }
116}
117
118func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error {
119 switch event.Commit.Collection {
120 case tangled.RepoIssueNSID:
121 h.handleCreateUpdateIssueEvent(ctx, event)
122 case tangled.RepoIssueCommentNSID:
123 h.handleCreateUpdateIssueCommentEvent(ctx, event)
124 default:
125 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
126 return nil
127 }
128
129 return nil
130}
131
132func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error {
133 switch event.Commit.Collection {
134 case tangled.RepoIssueNSID:
135 h.handleDeleteIssueEvent(ctx, event)
136 case tangled.RepoIssueCommentNSID:
137 h.handleDeleteIssueCommentEvent(ctx, event)
138 default:
139 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
140 return nil
141 }
142
143 return nil
144}
145
146func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) {
147 var issue tangled.RepoIssue
148
149 err := json.Unmarshal(event.Commit.Record, &issue)
150 if err != nil {
151 bugsnag.Notify(err)
152 slog.Error("error unmarshalling event record to issue", "error", err)
153 return
154 }
155
156 did := event.Did
157 rkey := event.Commit.RKey
158
159 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt)
160 if err != nil {
161 bugsnag.Notify(err)
162 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt)
163 createdAt = time.Now().UTC()
164 }
165 body := ""
166 if issue.Body != nil {
167 body = *issue.Body
168 }
169 err = h.store.CreateIssue(Issue{
170 AuthorDID: did,
171 RKey: rkey,
172 Title: issue.Title,
173 Body: body,
174 CreatedAt: createdAt.UnixMilli(),
175 Repo: issue.Repo,
176 })
177 if err != nil {
178 bugsnag.Notify(err)
179 slog.Error("create issue", "error", err, "did", did, "rkey", rkey)
180 return
181 }
182 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
183}
184
185func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) {
186 var comment tangled.RepoIssueComment
187
188 err := json.Unmarshal(event.Commit.Record, &comment)
189 if err != nil {
190 bugsnag.Notify(err)
191 slog.Error("error unmarshalling event record to comment", "error", err)
192 return
193 }
194
195 did := event.Did
196 rkey := event.Commit.RKey
197
198 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt)
199 if err != nil {
200 bugsnag.Notify(err)
201 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
202 createdAt = time.Now().UTC()
203 }
204
205 // if there is a replyTo present, don't store the comment because replies can't be replied to so
206 // the reply comment doesn't need to be stored
207 if comment.ReplyTo == nil || *comment.ReplyTo == "" {
208 err = h.store.CreateComment(Comment{
209 AuthorDID: did,
210 RKey: rkey,
211 Body: comment.Body,
212 Issue: comment.Issue,
213 CreatedAt: createdAt.UnixMilli(),
214 })
215 if err != nil {
216 bugsnag.Notify(err)
217 slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
218 return
219 }
220 }
221
222 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to
223 didToNotify := getUserToAlert(comment)
224 if didToNotify == "" {
225 slog.Info("could not work out did to send alert to", "comment", comment)
226 return
227 }
228
229 user, err := h.store.GetUser(didToNotify)
230 if err != nil {
231 slog.Error("getting user to send alert to", "error", err, "did", didToNotify)
232 return
233 }
234
235 slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID)
236}
237
238func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) {
239 did := event.Did
240 rkey := event.Commit.RKey
241
242 err := h.store.DeleteIssue(did, rkey)
243 if err != nil {
244 bugsnag.Notify(err)
245 slog.Error("delete issue", "error", err, "did", did, "rkey", rkey)
246 return
247 }
248
249 // now attempt to delete any comments on that issue since they can't be replied to now.
250 // Note: if unsuccessful it doesn't matter because a deleted issue and its comments are
251 // not visible on the UI and so no one will be able to reply to them so this is just a
252 // cleanup operation
253 issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)
254 err = h.store.DeleteCommentsForIssue(issueURI)
255 if err != nil {
256 bugsnag.Notify(err)
257 slog.Error("delete comments for issue", "error", err, "issue URI", issueURI)
258 }
259
260 slog.Info("deleted issue ", "did", did, "rkey", rkey)
261}
262
263func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) {
264 did := event.Did
265 rkey := event.Commit.RKey
266
267 err := h.store.DeleteComment(did, rkey)
268 if err != nil {
269 bugsnag.Notify(err)
270 slog.Error("delete comment", "error", err, "did", did, "rkey", rkey)
271 return
272 }
273
274 slog.Info("deleted comment ", "did", did, "rkey", rkey)
275}
276
277// at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22
278func getUserToAlert(comment tangled.RepoIssueComment) string {
279 if comment.ReplyTo != nil {
280 return getDidFromCommentURI(*comment.ReplyTo)
281 }
282 return getDidFromIssueURI(comment.Issue)
283}
284
285func getDidFromCommentURI(uri string) string {
286 split := strings.Split(uri, tangled.RepoIssueCommentNSID)
287 if len(split) != 2 {
288 slog.Error("invalid comment URI received", "uri", uri)
289 return ""
290 }
291
292 did := strings.TrimPrefix(split[0], "at://")
293 did = strings.TrimSuffix(did, "/")
294
295 return did
296}
297
298func getDidFromIssueURI(uri string) string {
299 split := strings.Split(uri, tangled.RepoIssueNSID)
300 if len(split) != 2 {
301 slog.Error("invalid issue URI received", "uri", uri)
302 return ""
303 }
304
305 did := strings.TrimPrefix(split[0], "at://")
306 did = strings.TrimSuffix(did, "/")
307
308 return did
309}