this repo has no description
1package tangledalertbot 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14 "tangled.sh/tangled.sh/core/api/tangled" 15) 16 17type Issue struct { 18 AuthorDID string `json:"authorDID"` 19 RKey string `json:"rkey"` 20 Title string `json:"title"` 21 Body string `json:"body"` 22 Repo string `json:"repo"` 23 CreatedAt int64 `json:"createdAt"` 24} 25 26type Comment struct { 27 AuthorDID string `json:"authorDID"` 28 RKey string `json:"rkey"` 29 Body string `json:"body"` 30 Issue string `json:"issue" ` 31 ReplyTo string `json:"replyTo"` 32 CreatedAt int64 `json:"createdAt"` 33} 34 35type Store interface { 36 CreateIssue(issue Issue) error 37 CreateComment(comment Comment) error 38} 39 40// JetstreamConsumer is responsible for consuming from a jetstream instance 41type JetstreamConsumer struct { 42 cfg *client.ClientConfig 43 handler *Handler 44 logger *slog.Logger 45} 46 47// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function 48func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 49 cfg := client.DefaultClientConfig() 50 if jsAddr != "" { 51 cfg.WebsocketURL = jsAddr 52 } 53 cfg.WantedCollections = []string{ 54 tangled.RepoIssueNSID, 55 tangled.RepoIssueCommentNSID, 56 } 57 cfg.WantedDids = []string{} 58 59 return &JetstreamConsumer{ 60 cfg: cfg, 61 logger: logger, 62 handler: handler, 63 } 64} 65 66// Consume will connect to a Jetstream client and start to consume and handle messages from it 67func (c *JetstreamConsumer) Consume(ctx context.Context) error { 68 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) 69 defer scheduler.Shutdown() 70 71 client, err := client.NewClient(c.cfg, c.logger, scheduler) 72 if err != nil { 73 return fmt.Errorf("failed to create client: %w", err) 74 } 75 76 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 77 78 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 79 return fmt.Errorf("connect and read: %w", err) 80 } 81 82 slog.Info("stopping consume") 83 return nil 84} 85 86// Handler is responsible for handling a message consumed from Jetstream 87type Handler struct { 88 store Store 89} 90 91// NewFeedHandler returns a new handler 92func NewFeedHandler(store Store) *Handler { 93 return &Handler{store: store} 94} 95 96// HandleEvent will handle an event based on the event's commit operation 97func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { 98 if event.Commit == nil { 99 return nil 100 } 101 102 switch event.Commit.Operation { 103 case models.CommitOperationCreate: 104 return h.handleCreateEvent(ctx, event) 105 // TODO: handle deletes too 106 default: 107 return nil 108 } 109} 110 111func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error { 112 switch event.Commit.Collection { 113 case tangled.RepoIssueNSID: 114 h.handleIssueEvent(ctx, event) 115 case tangled.RepoIssueCommentNSID: 116 h.handleIssueCommentEvent(ctx, event) 117 default: 118 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 119 return nil 120 } 121 122 return nil 123} 124 125func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) { 126 var issue tangled.RepoIssue 127 128 err := json.Unmarshal(event.Commit.Record, &issue) 129 if err != nil { 130 slog.Error("error unmarshalling event record to issue", "error", err) 131 return 132 } 133 134 did := event.Did 135 rkey := event.Commit.RKey 136 137 createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt) 138 if err != nil { 139 slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt) 140 createdAt = time.Now().UTC() 141 } 142 body := "" 143 if issue.Body != nil { 144 body = *&body 145 } 146 err = h.store.CreateIssue(Issue{ 147 AuthorDID: did, 148 RKey: rkey, 149 Title: issue.Title, 150 Body: body, 151 CreatedAt: createdAt.UnixMilli(), 152 Repo: issue.Repo, 153 }) 154 if err != nil { 155 slog.Error("create issue", "error", err, "did", did, "rkey", rkey) 156 return 157 } 158 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) 159} 160 161func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) { 162 var comment tangled.RepoIssueComment 163 164 err := json.Unmarshal(event.Commit.Record, &comment) 165 if err != nil { 166 slog.Error("error unmarshalling event record to comment", "error", err) 167 return 168 } 169 170 did := event.Did 171 rkey := event.Commit.RKey 172 173 createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt) 174 if err != nil { 175 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) 176 createdAt = time.Now().UTC() 177 } 178 err = h.store.CreateComment(Comment{ 179 AuthorDID: did, 180 RKey: rkey, 181 Body: comment.Body, 182 Issue: comment.Issue, 183 CreatedAt: createdAt.UnixMilli(), 184 //ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported 185 }) 186 if err != nil { 187 slog.Error("create comment", "error", err, "did", did, "rkey", rkey) 188 return 189 } 190 191 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 192 193 slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) 194}