package tangledalertbot import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" "github.com/bugsnag/bugsnag-go" "tangled.org/core/api/tangled" ) type Issue struct { AuthorDID string `json:"authorDID"` RKey string `json:"rkey"` Title string `json:"title"` Body string `json:"body"` Repo string `json:"repo"` CreatedAt int64 `json:"createdAt"` } type Comment struct { AuthorDID string `json:"authorDID"` RKey string `json:"rkey"` Body string `json:"body"` Issue string `json:"issue" ` CreatedAt int64 `json:"createdAt"` } type Store interface { CreateIssue(issue Issue) error CreateComment(comment Comment) error DeleteIssue(did, rkey string) error DeleteComment(did, rkey string) error DeleteCommentsForIssue(issueURI string) error } // JetstreamConsumer is responsible for consuming from a jetstream instance type JetstreamConsumer struct { cfg *client.ClientConfig handler *Handler logger *slog.Logger } // NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { cfg := client.DefaultClientConfig() if jsAddr != "" { cfg.WebsocketURL = jsAddr } cfg.WantedCollections = []string{ tangled.RepoIssueNSID, tangled.RepoIssueCommentNSID, } cfg.WantedDids = []string{} return &JetstreamConsumer{ cfg: cfg, logger: logger, handler: handler, } } // Consume will connect to a Jetstream client and start to consume and handle messages from it func (c *JetstreamConsumer) Consume(ctx context.Context) error { scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) defer scheduler.Shutdown() client, err := client.NewClient(c.cfg, c.logger, scheduler) if err != nil { return fmt.Errorf("failed to create client: %w", err) } cursor := time.Now().Add(1 * -time.Minute).UnixMicro() if err := client.ConnectAndRead(ctx, &cursor); err != nil { return fmt.Errorf("connect and read: %w", err) } slog.Info("stopping consume") return nil } // Handler is responsible for handling a message consumed from Jetstream type Handler struct { store Store } // NewFeedHandler returns a new handler func NewFeedHandler(store Store) *Handler { return &Handler{store: store} } // HandleEvent will handle an event based on the event's commit operation func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit == nil { return nil } switch event.Commit.Operation { case models.CommitOperationCreate, models.CommitOperationUpdate: return h.handleCreateUpdateEvent(ctx, event) case models.CommitOperationDelete: return h.handleDeleteEvent(ctx, event) default: return nil } } func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error { switch event.Commit.Collection { case tangled.RepoIssueNSID: h.handleCreateUpdateIssueEvent(ctx, event) case tangled.RepoIssueCommentNSID: h.handleCreateUpdateIssueCommentEvent(ctx, event) default: slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) return nil } return nil } func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error { switch event.Commit.Collection { case tangled.RepoIssueNSID: h.handleDeleteIssueEvent(ctx, event) case tangled.RepoIssueCommentNSID: h.handleDeleteIssueCommentEvent(ctx, event) default: slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) return nil } return nil } func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) { var issue tangled.RepoIssue err := json.Unmarshal(event.Commit.Record, &issue) if err != nil { bugsnag.Notify(err) slog.Error("error unmarshalling event record to issue", "error", err) return } did := event.Did rkey := event.Commit.RKey createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt) if err != nil { bugsnag.Notify(err) slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt) createdAt = time.Now().UTC() } body := "" if issue.Body != nil { body = *issue.Body } err = h.store.CreateIssue(Issue{ AuthorDID: did, RKey: rkey, Title: issue.Title, Body: body, CreatedAt: createdAt.UnixMilli(), Repo: issue.Repo, }) if err != nil { bugsnag.Notify(err) slog.Error("create issue", "error", err, "did", did, "rkey", rkey) return } slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) } func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) { var comment tangled.RepoIssueComment err := json.Unmarshal(event.Commit.Record, &comment) if err != nil { bugsnag.Notify(err) slog.Error("error unmarshalling event record to comment", "error", err) return } did := event.Did rkey := event.Commit.RKey createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt) if err != nil { bugsnag.Notify(err) slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) createdAt = time.Now().UTC() } // if there is a replyTo present, don't store the comment because replies can't be replied to so // the reply comment doesn't need to be stored if comment.ReplyTo == nil || *comment.ReplyTo == "" { err = h.store.CreateComment(Comment{ AuthorDID: did, RKey: rkey, Body: comment.Body, Issue: comment.Issue, CreatedAt: createdAt.UnixMilli(), }) if err != nil { bugsnag.Notify(err) slog.Error("create comment", "error", err, "did", did, "rkey", rkey) return } } // TODO: now send a notification to either the issue creator or whoever the comment was a reply to slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) } func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) { did := event.Did rkey := event.Commit.RKey err := h.store.DeleteIssue(did, rkey) if err != nil { bugsnag.Notify(err) slog.Error("delete issue", "error", err, "did", did, "rkey", rkey) return } // now attempt to delete any comments on that issue since they can't be replied to now. // Note: if unsuccessful it doesn't matter because a deleted issue and its comments are // not visible on the UI and so no one will be able to reply to them so this is just a // cleanup operation issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey) err = h.store.DeleteCommentsForIssue(issueURI) if err != nil { bugsnag.Notify(err) slog.Error("delete comments for issue", "error", err, "issue URI", issueURI) } slog.Info("deleted issue ", "did", did, "rkey", rkey) } func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) { did := event.Did rkey := event.Commit.RKey err := h.store.DeleteComment(did, rkey) if err != nil { bugsnag.Notify(err) slog.Error("delete comment", "error", err, "did", did, "rkey", rkey) return } slog.Info("deleted comment ", "did", did, "rkey", rkey) }