this repo has no description
1package tangledalertbot 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/bugsnag/bugsnag-go" 15 "tangled.sh/tangled.sh/core/api/tangled" 16) 17 18type Issue struct { 19 AuthorDID string `json:"authorDID"` 20 RKey string `json:"rkey"` 21 Title string `json:"title"` 22 Body string `json:"body"` 23 Repo string `json:"repo"` 24 CreatedAt int64 `json:"createdAt"` 25} 26 27type Comment struct { 28 AuthorDID string `json:"authorDID"` 29 RKey string `json:"rkey"` 30 Body string `json:"body"` 31 Issue string `json:"issue" ` 32 ReplyTo string `json:"replyTo"` 33 CreatedAt int64 `json:"createdAt"` 34} 35 36type Store interface { 37 CreateIssue(issue Issue) error 38 CreateComment(comment Comment) error 39} 40 41// JetstreamConsumer is responsible for consuming from a jetstream instance 42type JetstreamConsumer struct { 43 cfg *client.ClientConfig 44 handler *Handler 45 logger *slog.Logger 46} 47 48// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function 49func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 50 cfg := client.DefaultClientConfig() 51 if jsAddr != "" { 52 cfg.WebsocketURL = jsAddr 53 } 54 cfg.WantedCollections = []string{ 55 tangled.RepoIssueNSID, 56 tangled.RepoIssueCommentNSID, 57 } 58 cfg.WantedDids = []string{} 59 60 return &JetstreamConsumer{ 61 cfg: cfg, 62 logger: logger, 63 handler: handler, 64 } 65} 66 67// Consume will connect to a Jetstream client and start to consume and handle messages from it 68func (c *JetstreamConsumer) Consume(ctx context.Context) error { 69 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) 70 defer scheduler.Shutdown() 71 72 client, err := client.NewClient(c.cfg, c.logger, scheduler) 73 if err != nil { 74 return fmt.Errorf("failed to create client: %w", err) 75 } 76 77 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 78 79 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 80 return fmt.Errorf("connect and read: %w", err) 81 } 82 83 slog.Info("stopping consume") 84 return nil 85} 86 87// Handler is responsible for handling a message consumed from Jetstream 88type Handler struct { 89 store Store 90} 91 92// NewFeedHandler returns a new handler 93func NewFeedHandler(store Store) *Handler { 94 return &Handler{store: store} 95} 96 97// HandleEvent will handle an event based on the event's commit operation 98func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { 99 if event.Commit == nil { 100 return nil 101 } 102 103 switch event.Commit.Operation { 104 case models.CommitOperationCreate, models.CommitOperationUpdate: 105 return h.handleCreateEvent(ctx, event) 106 // TODO: handle deletes too 107 default: 108 return nil 109 } 110} 111 112func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error { 113 switch event.Commit.Collection { 114 case tangled.RepoIssueNSID: 115 h.handleIssueEvent(ctx, event) 116 case tangled.RepoIssueCommentNSID: 117 h.handleIssueCommentEvent(ctx, event) 118 default: 119 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 120 return nil 121 } 122 123 return nil 124} 125 126func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) { 127 var issue tangled.RepoIssue 128 129 err := json.Unmarshal(event.Commit.Record, &issue) 130 if err != nil { 131 bugsnag.Notify(err) 132 slog.Error("error unmarshalling event record to issue", "error", err) 133 return 134 } 135 136 did := event.Did 137 rkey := event.Commit.RKey 138 139 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt) 140 if err != nil { 141 bugsnag.Notify(err) 142 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt) 143 createdAt = time.Now().UTC() 144 } 145 body := "" 146 if issue.Body != nil { 147 body = *issue.Body 148 } 149 err = h.store.CreateIssue(Issue{ 150 AuthorDID: did, 151 RKey: rkey, 152 Title: issue.Title, 153 Body: body, 154 CreatedAt: createdAt.UnixMilli(), 155 Repo: issue.Repo, 156 }) 157 if err != nil { 158 bugsnag.Notify(err) 159 slog.Error("create issue", "error", err, "did", did, "rkey", rkey) 160 return 161 } 162 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) 163} 164 165func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) { 166 var comment tangled.RepoIssueComment 167 168 err := json.Unmarshal(event.Commit.Record, &comment) 169 if err != nil { 170 bugsnag.Notify(err) 171 slog.Error("error unmarshalling event record to comment", "error", err) 172 return 173 } 174 175 did := event.Did 176 rkey := event.Commit.RKey 177 178 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt) 179 if err != nil { 180 bugsnag.Notify(err) 181 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) 182 createdAt = time.Now().UTC() 183 } 184 err = h.store.CreateComment(Comment{ 185 AuthorDID: did, 186 RKey: rkey, 187 Body: comment.Body, 188 Issue: comment.Issue, 189 CreatedAt: createdAt.UnixMilli(), 190 //ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported 191 }) 192 if err != nil { 193 bugsnag.Notify(err) 194 slog.Error("create comment", "error", err, "did", did, "rkey", rkey) 195 return 196 } 197 198 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 199 200 slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) 201}