this repo has no description
1package tangledalertbot
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/bugsnag/bugsnag-go"
15 "tangled.sh/tangled.sh/core/api/tangled"
16)
17
18type Issue struct {
19 AuthorDID string `json:"authorDID"`
20 RKey string `json:"rkey"`
21 Title string `json:"title"`
22 Body string `json:"body"`
23 Repo string `json:"repo"`
24 CreatedAt int64 `json:"createdAt"`
25}
26
27type Comment struct {
28 AuthorDID string `json:"authorDID"`
29 RKey string `json:"rkey"`
30 Body string `json:"body"`
31 Issue string `json:"issue" `
32 ReplyTo string `json:"replyTo"`
33 CreatedAt int64 `json:"createdAt"`
34}
35
36type Store interface {
37 CreateIssue(issue Issue) error
38 CreateComment(comment Comment) error
39}
40
41// JetstreamConsumer is responsible for consuming from a jetstream instance
42type JetstreamConsumer struct {
43 cfg *client.ClientConfig
44 handler *Handler
45 logger *slog.Logger
46}
47
48// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function
49func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer {
50 cfg := client.DefaultClientConfig()
51 if jsAddr != "" {
52 cfg.WebsocketURL = jsAddr
53 }
54 cfg.WantedCollections = []string{
55 tangled.RepoIssueNSID,
56 tangled.RepoIssueCommentNSID,
57 }
58 cfg.WantedDids = []string{}
59
60 return &JetstreamConsumer{
61 cfg: cfg,
62 logger: logger,
63 handler: handler,
64 }
65}
66
67// Consume will connect to a Jetstream client and start to consume and handle messages from it
68func (c *JetstreamConsumer) Consume(ctx context.Context) error {
69 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent)
70 defer scheduler.Shutdown()
71
72 client, err := client.NewClient(c.cfg, c.logger, scheduler)
73 if err != nil {
74 return fmt.Errorf("failed to create client: %w", err)
75 }
76
77 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
78
79 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
80 return fmt.Errorf("connect and read: %w", err)
81 }
82
83 slog.Info("stopping consume")
84 return nil
85}
86
87// Handler is responsible for handling a message consumed from Jetstream
88type Handler struct {
89 store Store
90}
91
92// NewFeedHandler returns a new handler
93func NewFeedHandler(store Store) *Handler {
94 return &Handler{store: store}
95}
96
97// HandleEvent will handle an event based on the event's commit operation
98func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error {
99 if event.Commit == nil {
100 return nil
101 }
102
103 switch event.Commit.Operation {
104 case models.CommitOperationCreate, models.CommitOperationUpdate:
105 return h.handleCreateEvent(ctx, event)
106 // TODO: handle deletes too
107 default:
108 return nil
109 }
110}
111
112func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error {
113 switch event.Commit.Collection {
114 case tangled.RepoIssueNSID:
115 h.handleIssueEvent(ctx, event)
116 case tangled.RepoIssueCommentNSID:
117 h.handleIssueCommentEvent(ctx, event)
118 default:
119 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
120 return nil
121 }
122
123 return nil
124}
125
126func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) {
127 var issue tangled.RepoIssue
128
129 err := json.Unmarshal(event.Commit.Record, &issue)
130 if err != nil {
131 bugsnag.Notify(err)
132 slog.Error("error unmarshalling event record to issue", "error", err)
133 return
134 }
135
136 did := event.Did
137 rkey := event.Commit.RKey
138
139 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt)
140 if err != nil {
141 bugsnag.Notify(err)
142 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt)
143 createdAt = time.Now().UTC()
144 }
145 body := ""
146 if issue.Body != nil {
147 body = *issue.Body
148 }
149 err = h.store.CreateIssue(Issue{
150 AuthorDID: did,
151 RKey: rkey,
152 Title: issue.Title,
153 Body: body,
154 CreatedAt: createdAt.UnixMilli(),
155 Repo: issue.Repo,
156 })
157 if err != nil {
158 bugsnag.Notify(err)
159 slog.Error("create issue", "error", err, "did", did, "rkey", rkey)
160 return
161 }
162 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
163}
164
165func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) {
166 var comment tangled.RepoIssueComment
167
168 err := json.Unmarshal(event.Commit.Record, &comment)
169 if err != nil {
170 bugsnag.Notify(err)
171 slog.Error("error unmarshalling event record to comment", "error", err)
172 return
173 }
174
175 did := event.Did
176 rkey := event.Commit.RKey
177
178 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt)
179 if err != nil {
180 bugsnag.Notify(err)
181 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
182 createdAt = time.Now().UTC()
183 }
184 err = h.store.CreateComment(Comment{
185 AuthorDID: did,
186 RKey: rkey,
187 Body: comment.Body,
188 Issue: comment.Issue,
189 CreatedAt: createdAt.UnixMilli(),
190 //ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported
191 })
192 if err != nil {
193 bugsnag.Notify(err)
194 slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
195 return
196 }
197
198 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to
199
200 slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
201}