this repo has no description
1package tangledalertbot
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14 "tangled.sh/tangled.sh/core/api/tangled"
15)
16
17type Issue struct {
18 AuthorDID string `json:"authorDID"`
19 RKey string `json:"rkey"`
20 Title string `json:"title"`
21 Body string `json:"body"`
22 Repo string `json:"repo"`
23 CreatedAt int64 `json:"createdAt"`
24}
25
26type Comment struct {
27 AuthorDID string `json:"authorDID"`
28 RKey string `json:"rkey"`
29 Body string `json:"body"`
30 Issue string `json:"issue" `
31 ReplyTo string `json:"replyTo"`
32 CreatedAt int64 `json:"createdAt"`
33}
34
35type Store interface {
36 CreateIssue(issue Issue) error
37 CreateComment(comment Comment) error
38}
39
40// JetstreamConsumer is responsible for consuming from a jetstream instance
41type JetstreamConsumer struct {
42 cfg *client.ClientConfig
43 handler *Handler
44 logger *slog.Logger
45}
46
47// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function
48func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer {
49 cfg := client.DefaultClientConfig()
50 if jsAddr != "" {
51 cfg.WebsocketURL = jsAddr
52 }
53 cfg.WantedCollections = []string{
54 tangled.RepoIssueNSID,
55 tangled.RepoIssueCommentNSID,
56 }
57 cfg.WantedDids = []string{}
58
59 return &JetstreamConsumer{
60 cfg: cfg,
61 logger: logger,
62 handler: handler,
63 }
64}
65
66// Consume will connect to a Jetstream client and start to consume and handle messages from it
67func (c *JetstreamConsumer) Consume(ctx context.Context) error {
68 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent)
69 defer scheduler.Shutdown()
70
71 client, err := client.NewClient(c.cfg, c.logger, scheduler)
72 if err != nil {
73 return fmt.Errorf("failed to create client: %w", err)
74 }
75
76 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
77
78 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
79 return fmt.Errorf("connect and read: %w", err)
80 }
81
82 slog.Info("stopping consume")
83 return nil
84}
85
86// Handler is responsible for handling a message consumed from Jetstream
87type Handler struct {
88 store Store
89}
90
91// NewFeedHandler returns a new handler
92func NewFeedHandler(store Store) *Handler {
93 return &Handler{store: store}
94}
95
96// HandleEvent will handle an event based on the event's commit operation
97func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error {
98 if event.Commit == nil {
99 return nil
100 }
101
102 switch event.Commit.Operation {
103 case models.CommitOperationCreate:
104 return h.handleCreateEvent(ctx, event)
105 // TODO: handle deletes too
106 default:
107 return nil
108 }
109}
110
111func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error {
112 switch event.Commit.Collection {
113 case tangled.RepoIssueNSID:
114 h.handleIssueEvent(ctx, event)
115 case tangled.RepoIssueCommentNSID:
116 h.handleIssueCommentEvent(ctx, event)
117 default:
118 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
119 return nil
120 }
121
122 return nil
123}
124
125func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) {
126 var issue tangled.RepoIssue
127
128 err := json.Unmarshal(event.Commit.Record, &issue)
129 if err != nil {
130 slog.Error("error unmarshalling event record to issue", "error", err)
131 return
132 }
133
134 did := event.Did
135 rkey := event.Commit.RKey
136
137 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt)
138 if err != nil {
139 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt)
140 createdAt = time.Now().UTC()
141 }
142 body := ""
143 if issue.Body != nil {
144 body = *&body
145 }
146 err = h.store.CreateIssue(Issue{
147 AuthorDID: did,
148 RKey: rkey,
149 Title: issue.Title,
150 Body: body,
151 CreatedAt: createdAt.UnixMilli(),
152 Repo: issue.Repo,
153 })
154 if err != nil {
155 slog.Error("create issue", "error", err, "did", did, "rkey", rkey)
156 return
157 }
158 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
159}
160
161func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) {
162 var comment tangled.RepoIssueComment
163
164 err := json.Unmarshal(event.Commit.Record, &comment)
165 if err != nil {
166 slog.Error("error unmarshalling event record to comment", "error", err)
167 return
168 }
169
170 did := event.Did
171 rkey := event.Commit.RKey
172
173 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt)
174 if err != nil {
175 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
176 createdAt = time.Now().UTC()
177 }
178 err = h.store.CreateComment(Comment{
179 AuthorDID: did,
180 RKey: rkey,
181 Body: comment.Body,
182 Issue: comment.Issue,
183 CreatedAt: createdAt.UnixMilli(),
184 //ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported
185 })
186 if err != nil {
187 slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
188 return
189 }
190
191 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to
192
193 slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
194}