package tangledalertbot import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" "github.com/bugsnag/bugsnag-go" "tangled.sh/tangled.sh/core/api/tangled" ) type Issue struct { AuthorDID string `json:"authorDID"` RKey string `json:"rkey"` Title string `json:"title"` Body string `json:"body"` Repo string `json:"repo"` CreatedAt int64 `json:"createdAt"` } type Comment struct { AuthorDID string `json:"authorDID"` RKey string `json:"rkey"` Body string `json:"body"` Issue string `json:"issue" ` ReplyTo string `json:"replyTo"` CreatedAt int64 `json:"createdAt"` } type Store interface { CreateIssue(issue Issue) error CreateComment(comment Comment) error } // JetstreamConsumer is responsible for consuming from a jetstream instance type JetstreamConsumer struct { cfg *client.ClientConfig handler *Handler logger *slog.Logger } // NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { cfg := client.DefaultClientConfig() if jsAddr != "" { cfg.WebsocketURL = jsAddr } cfg.WantedCollections = []string{ tangled.RepoIssueNSID, tangled.RepoIssueCommentNSID, } cfg.WantedDids = []string{} return &JetstreamConsumer{ cfg: cfg, logger: logger, handler: handler, } } // Consume will connect to a Jetstream client and start to consume and handle messages from it func (c *JetstreamConsumer) Consume(ctx context.Context) error { scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) defer scheduler.Shutdown() client, err := client.NewClient(c.cfg, c.logger, scheduler) if err != nil { return fmt.Errorf("failed to create client: %w", err) } cursor := time.Now().Add(1 * -time.Minute).UnixMicro() if err := client.ConnectAndRead(ctx, &cursor); err != nil { return fmt.Errorf("connect and read: %w", err) } slog.Info("stopping consume") return nil } // Handler is responsible for handling a message consumed from Jetstream type Handler struct { store Store } // NewFeedHandler returns a new handler func NewFeedHandler(store Store) *Handler { return &Handler{store: store} } // HandleEvent will handle an event based on the event's commit operation func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit == nil { return nil } switch event.Commit.Operation { case models.CommitOperationCreate, models.CommitOperationUpdate: return h.handleCreateEvent(ctx, event) // TODO: handle deletes too default: return nil } } func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error { switch event.Commit.Collection { case tangled.RepoIssueNSID: h.handleIssueEvent(ctx, event) case tangled.RepoIssueCommentNSID: h.handleIssueCommentEvent(ctx, event) default: slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) return nil } return nil } func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) { var issue tangled.RepoIssue err := json.Unmarshal(event.Commit.Record, &issue) if err != nil { bugsnag.Notify(err) slog.Error("error unmarshalling event record to issue", "error", err) return } did := event.Did rkey := event.Commit.RKey createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt) if err != nil { bugsnag.Notify(err) slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt) createdAt = time.Now().UTC() } body := "" if issue.Body != nil { body = *issue.Body } err = h.store.CreateIssue(Issue{ AuthorDID: did, RKey: rkey, Title: issue.Title, Body: body, CreatedAt: createdAt.UnixMilli(), Repo: issue.Repo, }) if err != nil { bugsnag.Notify(err) slog.Error("create issue", "error", err, "did", did, "rkey", rkey) return } slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) } func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) { var comment tangled.RepoIssueComment err := json.Unmarshal(event.Commit.Record, &comment) if err != nil { bugsnag.Notify(err) slog.Error("error unmarshalling event record to comment", "error", err) return } did := event.Did rkey := event.Commit.RKey createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt) if err != nil { bugsnag.Notify(err) slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) createdAt = time.Now().UTC() } err = h.store.CreateComment(Comment{ AuthorDID: did, RKey: rkey, Body: comment.Body, Issue: comment.Issue, CreatedAt: createdAt.UnixMilli(), //ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported }) if err != nil { bugsnag.Notify(err) slog.Error("create comment", "error", err, "did", did, "rkey", rkey) return } // TODO: now send a notification to either the issue creator or whoever the comment was a reply to slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) }