this repo has no description
1package tangledalertbot
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/bugsnag/bugsnag-go"
15 "tangled.org/core/api/tangled"
16)
17
18type Issue struct {
19 AuthorDID string `json:"authorDID"`
20 RKey string `json:"rkey"`
21 Title string `json:"title"`
22 Body string `json:"body"`
23 Repo string `json:"repo"`
24 CreatedAt int64 `json:"createdAt"`
25}
26
27type Comment struct {
28 AuthorDID string `json:"authorDID"`
29 RKey string `json:"rkey"`
30 Body string `json:"body"`
31 Issue string `json:"issue" `
32 CreatedAt int64 `json:"createdAt"`
33}
34
35type Store interface {
36 CreateIssue(issue Issue) error
37 CreateComment(comment Comment) error
38 DeleteIssue(did, rkey string) error
39 DeleteComment(did, rkey string) error
40 DeleteCommentsForIssue(issueURI string) error
41}
42
43// JetstreamConsumer is responsible for consuming from a jetstream instance
44type JetstreamConsumer struct {
45 cfg *client.ClientConfig
46 handler *Handler
47 logger *slog.Logger
48}
49
50// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function
51func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer {
52 cfg := client.DefaultClientConfig()
53 if jsAddr != "" {
54 cfg.WebsocketURL = jsAddr
55 }
56 cfg.WantedCollections = []string{
57 tangled.RepoIssueNSID,
58 tangled.RepoIssueCommentNSID,
59 }
60 cfg.WantedDids = []string{}
61
62 return &JetstreamConsumer{
63 cfg: cfg,
64 logger: logger,
65 handler: handler,
66 }
67}
68
69// Consume will connect to a Jetstream client and start to consume and handle messages from it
70func (c *JetstreamConsumer) Consume(ctx context.Context) error {
71 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent)
72 defer scheduler.Shutdown()
73
74 client, err := client.NewClient(c.cfg, c.logger, scheduler)
75 if err != nil {
76 return fmt.Errorf("failed to create client: %w", err)
77 }
78
79 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
80
81 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
82 return fmt.Errorf("connect and read: %w", err)
83 }
84
85 slog.Info("stopping consume")
86 return nil
87}
88
89// Handler is responsible for handling a message consumed from Jetstream
90type Handler struct {
91 store Store
92}
93
94// NewFeedHandler returns a new handler
95func NewFeedHandler(store Store) *Handler {
96 return &Handler{store: store}
97}
98
99// HandleEvent will handle an event based on the event's commit operation
100func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error {
101 if event.Commit == nil {
102 return nil
103 }
104
105 switch event.Commit.Operation {
106 case models.CommitOperationCreate, models.CommitOperationUpdate:
107 return h.handleCreateUpdateEvent(ctx, event)
108 case models.CommitOperationDelete:
109 return h.handleDeleteEvent(ctx, event)
110 default:
111 return nil
112 }
113}
114
115func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error {
116 switch event.Commit.Collection {
117 case tangled.RepoIssueNSID:
118 h.handleCreateUpdateIssueEvent(ctx, event)
119 case tangled.RepoIssueCommentNSID:
120 h.handleCreateUpdateIssueCommentEvent(ctx, event)
121 default:
122 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
123 return nil
124 }
125
126 return nil
127}
128
129func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error {
130 switch event.Commit.Collection {
131 case tangled.RepoIssueNSID:
132 h.handleDeleteIssueEvent(ctx, event)
133 case tangled.RepoIssueCommentNSID:
134 h.handleDeleteIssueCommentEvent(ctx, event)
135 default:
136 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
137 return nil
138 }
139
140 return nil
141}
142
143func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) {
144 var issue tangled.RepoIssue
145
146 err := json.Unmarshal(event.Commit.Record, &issue)
147 if err != nil {
148 bugsnag.Notify(err)
149 slog.Error("error unmarshalling event record to issue", "error", err)
150 return
151 }
152
153 did := event.Did
154 rkey := event.Commit.RKey
155
156 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt)
157 if err != nil {
158 bugsnag.Notify(err)
159 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt)
160 createdAt = time.Now().UTC()
161 }
162 body := ""
163 if issue.Body != nil {
164 body = *issue.Body
165 }
166 err = h.store.CreateIssue(Issue{
167 AuthorDID: did,
168 RKey: rkey,
169 Title: issue.Title,
170 Body: body,
171 CreatedAt: createdAt.UnixMilli(),
172 Repo: issue.Repo,
173 })
174 if err != nil {
175 bugsnag.Notify(err)
176 slog.Error("create issue", "error", err, "did", did, "rkey", rkey)
177 return
178 }
179 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
180}
181
182func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) {
183 var comment tangled.RepoIssueComment
184
185 err := json.Unmarshal(event.Commit.Record, &comment)
186 if err != nil {
187 bugsnag.Notify(err)
188 slog.Error("error unmarshalling event record to comment", "error", err)
189 return
190 }
191
192 did := event.Did
193 rkey := event.Commit.RKey
194
195 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt)
196 if err != nil {
197 bugsnag.Notify(err)
198 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
199 createdAt = time.Now().UTC()
200 }
201
202 // if there is a replyTo present, don't store the comment because replies can't be replied to so
203 // the reply comment doesn't need to be stored
204 if comment.ReplyTo == nil || *comment.ReplyTo == "" {
205 err = h.store.CreateComment(Comment{
206 AuthorDID: did,
207 RKey: rkey,
208 Body: comment.Body,
209 Issue: comment.Issue,
210 CreatedAt: createdAt.UnixMilli(),
211 })
212 if err != nil {
213 bugsnag.Notify(err)
214 slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
215 return
216 }
217 }
218
219 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to
220
221 slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
222}
223
224func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) {
225 did := event.Did
226 rkey := event.Commit.RKey
227
228 err := h.store.DeleteIssue(did, rkey)
229 if err != nil {
230 bugsnag.Notify(err)
231 slog.Error("delete issue", "error", err, "did", did, "rkey", rkey)
232 return
233 }
234
235 // now attempt to delete any comments on that issue since they can't be replied to now.
236 // Note: if unsuccessful it doesn't matter because a deleted issue and its comments are
237 // not visible on the UI and so no one will be able to reply to them so this is just a
238 // cleanup operation
239 issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)
240 err = h.store.DeleteCommentsForIssue(issueURI)
241 if err != nil {
242 bugsnag.Notify(err)
243 slog.Error("delete comments for issue", "error", err, "issue URI", issueURI)
244 }
245
246 slog.Info("deleted issue ", "did", did, "rkey", rkey)
247}
248
249func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) {
250 did := event.Did
251 rkey := event.Commit.RKey
252
253 err := h.store.DeleteComment(did, rkey)
254 if err != nil {
255 bugsnag.Notify(err)
256 slog.Error("delete comment", "error", err, "did", did, "rkey", rkey)
257 return
258 }
259
260 slog.Info("deleted comment ", "did", did, "rkey", rkey)
261}