this repo has no description
at main 7.4 kB view raw
1package tangledalertbot 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/bugsnag/bugsnag-go" 15 "tangled.org/core/api/tangled" 16) 17 18type Issue struct { 19 AuthorDID string `json:"authorDID"` 20 RKey string `json:"rkey"` 21 Title string `json:"title"` 22 Body string `json:"body"` 23 Repo string `json:"repo"` 24 CreatedAt int64 `json:"createdAt"` 25} 26 27type Comment struct { 28 AuthorDID string `json:"authorDID"` 29 RKey string `json:"rkey"` 30 Body string `json:"body"` 31 Issue string `json:"issue" ` 32 CreatedAt int64 `json:"createdAt"` 33} 34 35type Store interface { 36 CreateIssue(issue Issue) error 37 CreateComment(comment Comment) error 38 DeleteIssue(did, rkey string) error 39 DeleteComment(did, rkey string) error 40 DeleteCommentsForIssue(issueURI string) error 41} 42 43// JetstreamConsumer is responsible for consuming from a jetstream instance 44type JetstreamConsumer struct { 45 cfg *client.ClientConfig 46 handler *Handler 47 logger *slog.Logger 48} 49 50// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function 51func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 52 cfg := client.DefaultClientConfig() 53 if jsAddr != "" { 54 cfg.WebsocketURL = jsAddr 55 } 56 cfg.WantedCollections = []string{ 57 tangled.RepoIssueNSID, 58 tangled.RepoIssueCommentNSID, 59 } 60 cfg.WantedDids = []string{} 61 62 return &JetstreamConsumer{ 63 cfg: cfg, 64 logger: logger, 65 handler: handler, 66 } 67} 68 69// Consume will connect to a Jetstream client and start to consume and handle messages from it 70func (c *JetstreamConsumer) Consume(ctx context.Context) error { 71 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) 72 defer scheduler.Shutdown() 73 74 client, err := client.NewClient(c.cfg, c.logger, scheduler) 75 if err != nil { 76 return fmt.Errorf("failed to create client: %w", err) 77 } 78 79 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 80 81 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 82 return fmt.Errorf("connect and read: %w", err) 83 } 84 85 slog.Info("stopping consume") 86 return nil 87} 88 89// Handler is responsible for handling a message consumed from Jetstream 90type Handler struct { 91 store Store 92} 93 94// NewFeedHandler returns a new handler 95func NewFeedHandler(store Store) *Handler { 96 return &Handler{store: store} 97} 98 99// HandleEvent will handle an event based on the event's commit operation 100func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { 101 if event.Commit == nil { 102 return nil 103 } 104 105 switch event.Commit.Operation { 106 case models.CommitOperationCreate, models.CommitOperationUpdate: 107 return h.handleCreateUpdateEvent(ctx, event) 108 case models.CommitOperationDelete: 109 return h.handleDeleteEvent(ctx, event) 110 default: 111 return nil 112 } 113} 114 115func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error { 116 switch event.Commit.Collection { 117 case tangled.RepoIssueNSID: 118 h.handleCreateUpdateIssueEvent(ctx, event) 119 case tangled.RepoIssueCommentNSID: 120 h.handleCreateUpdateIssueCommentEvent(ctx, event) 121 default: 122 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 123 return nil 124 } 125 126 return nil 127} 128 129func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error { 130 switch event.Commit.Collection { 131 case tangled.RepoIssueNSID: 132 h.handleDeleteIssueEvent(ctx, event) 133 case tangled.RepoIssueCommentNSID: 134 h.handleDeleteIssueCommentEvent(ctx, event) 135 default: 136 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 137 return nil 138 } 139 140 return nil 141} 142 143func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) { 144 var issue tangled.RepoIssue 145 146 err := json.Unmarshal(event.Commit.Record, &issue) 147 if err != nil { 148 bugsnag.Notify(err) 149 slog.Error("error unmarshalling event record to issue", "error", err) 150 return 151 } 152 153 did := event.Did 154 rkey := event.Commit.RKey 155 156 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt) 157 if err != nil { 158 bugsnag.Notify(err) 159 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt) 160 createdAt = time.Now().UTC() 161 } 162 body := "" 163 if issue.Body != nil { 164 body = *issue.Body 165 } 166 err = h.store.CreateIssue(Issue{ 167 AuthorDID: did, 168 RKey: rkey, 169 Title: issue.Title, 170 Body: body, 171 CreatedAt: createdAt.UnixMilli(), 172 Repo: issue.Repo, 173 }) 174 if err != nil { 175 bugsnag.Notify(err) 176 slog.Error("create issue", "error", err, "did", did, "rkey", rkey) 177 return 178 } 179 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) 180} 181 182func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) { 183 var comment tangled.RepoIssueComment 184 185 err := json.Unmarshal(event.Commit.Record, &comment) 186 if err != nil { 187 bugsnag.Notify(err) 188 slog.Error("error unmarshalling event record to comment", "error", err) 189 return 190 } 191 192 did := event.Did 193 rkey := event.Commit.RKey 194 195 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt) 196 if err != nil { 197 bugsnag.Notify(err) 198 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) 199 createdAt = time.Now().UTC() 200 } 201 202 // if there is a replyTo present, don't store the comment because replies can't be replied to so 203 // the reply comment doesn't need to be stored 204 if comment.ReplyTo == nil || *comment.ReplyTo == "" { 205 err = h.store.CreateComment(Comment{ 206 AuthorDID: did, 207 RKey: rkey, 208 Body: comment.Body, 209 Issue: comment.Issue, 210 CreatedAt: createdAt.UnixMilli(), 211 }) 212 if err != nil { 213 bugsnag.Notify(err) 214 slog.Error("create comment", "error", err, "did", did, "rkey", rkey) 215 return 216 } 217 } 218 219 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 220 221 slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) 222} 223 224func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) { 225 did := event.Did 226 rkey := event.Commit.RKey 227 228 err := h.store.DeleteIssue(did, rkey) 229 if err != nil { 230 bugsnag.Notify(err) 231 slog.Error("delete issue", "error", err, "did", did, "rkey", rkey) 232 return 233 } 234 235 // now attempt to delete any comments on that issue since they can't be replied to now. 236 // Note: if unsuccessful it doesn't matter because a deleted issue and its comments are 237 // not visible on the UI and so no one will be able to reply to them so this is just a 238 // cleanup operation 239 issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey) 240 err = h.store.DeleteCommentsForIssue(issueURI) 241 if err != nil { 242 bugsnag.Notify(err) 243 slog.Error("delete comments for issue", "error", err, "issue URI", issueURI) 244 } 245 246 slog.Info("deleted issue ", "did", did, "rkey", rkey) 247} 248 249func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) { 250 did := event.Did 251 rkey := event.Commit.RKey 252 253 err := h.store.DeleteComment(did, rkey) 254 if err != nil { 255 bugsnag.Notify(err) 256 slog.Error("delete comment", "error", err, "did", did, "rkey", rkey) 257 return 258 } 259 260 slog.Info("deleted comment ", "did", did, "rkey", rkey) 261}