1package db
2
3import (
4 "context"
5 "log"
6 "maps"
7 "slices"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14)
15
16const (
17 maxMentions = 5
18)
19
20type databaseNotifier struct {
21 db *db.DB
22 res *idresolver.Resolver
23}
24
25func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
26 return &databaseNotifier{
27 db: database,
28 res: resolver,
29 }
30}
31
32var _ notify.Notifier = &databaseNotifier{}
33
34func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
35 // no-op for now
36}
37
38func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
39 var err error
40 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt)))
41 if err != nil {
42 log.Printf("NewStar: failed to get repos: %v", err)
43 return
44 }
45
46 actorDid := syntax.DID(star.StarredByDid)
47 recipients := []syntax.DID{syntax.DID(repo.Did)}
48 eventType := models.NotificationTypeRepoStarred
49 entityType := "repo"
50 entityId := star.RepoAt.String()
51 repoId := &repo.Id
52 var issueId *int64
53 var pullId *int64
54
55 n.notifyEvent(
56 actorDid,
57 recipients,
58 eventType,
59 entityType,
60 entityId,
61 repoId,
62 issueId,
63 pullId,
64 )
65}
66
67func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
68 // no-op
69}
70
71func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
72
73 // build the recipients list
74 // - owner of the repo
75 // - collaborators in the repo
76 var recipients []syntax.DID
77 recipients = append(recipients, syntax.DID(issue.Repo.Did))
78 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
79 if err != nil {
80 log.Printf("failed to fetch collaborators: %v", err)
81 return
82 }
83 for _, c := range collaborators {
84 recipients = append(recipients, c.SubjectDid)
85 }
86
87 actorDid := syntax.DID(issue.Did)
88 entityType := "issue"
89 entityId := issue.AtUri().String()
90 repoId := &issue.Repo.Id
91 issueId := &issue.Id
92 var pullId *int64
93
94 n.notifyEvent(
95 actorDid,
96 recipients,
97 models.NotificationTypeIssueCreated,
98 entityType,
99 entityId,
100 repoId,
101 issueId,
102 pullId,
103 )
104 n.notifyEvent(
105 actorDid,
106 mentions,
107 models.NotificationTypeUserMentioned,
108 entityType,
109 entityId,
110 repoId,
111 issueId,
112 pullId,
113 )
114}
115
116func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
117 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt))
118 if err != nil {
119 log.Printf("NewIssueComment: failed to get issues: %v", err)
120 return
121 }
122 if len(issues) == 0 {
123 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
124 return
125 }
126 issue := issues[0]
127
128 var recipients []syntax.DID
129 recipients = append(recipients, syntax.DID(issue.Repo.Did))
130
131 if comment.IsReply() {
132 // if this comment is a reply, then notify everybody in that thread
133 parentAtUri := *comment.ReplyTo
134 allThreads := issue.CommentList()
135
136 // find the parent thread, and add all DIDs from here to the recipient list
137 for _, t := range allThreads {
138 if t.Self.AtUri().String() == parentAtUri {
139 recipients = append(recipients, t.Participants()...)
140 }
141 }
142 } else {
143 // not a reply, notify just the issue author
144 recipients = append(recipients, syntax.DID(issue.Did))
145 }
146
147 actorDid := syntax.DID(comment.Did)
148 entityType := "issue"
149 entityId := issue.AtUri().String()
150 repoId := &issue.Repo.Id
151 issueId := &issue.Id
152 var pullId *int64
153
154 n.notifyEvent(
155 actorDid,
156 recipients,
157 models.NotificationTypeIssueCommented,
158 entityType,
159 entityId,
160 repoId,
161 issueId,
162 pullId,
163 )
164 n.notifyEvent(
165 actorDid,
166 mentions,
167 models.NotificationTypeUserMentioned,
168 entityType,
169 entityId,
170 repoId,
171 issueId,
172 pullId,
173 )
174}
175
176func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
177 // no-op for now
178}
179
180func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
181 actorDid := syntax.DID(follow.UserDid)
182 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)}
183 eventType := models.NotificationTypeFollowed
184 entityType := "follow"
185 entityId := follow.UserDid
186 var repoId, issueId, pullId *int64
187
188 n.notifyEvent(
189 actorDid,
190 recipients,
191 eventType,
192 entityType,
193 entityId,
194 repoId,
195 issueId,
196 pullId,
197 )
198}
199
200func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
201 // no-op
202}
203
204func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
205 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
206 if err != nil {
207 log.Printf("NewPull: failed to get repos: %v", err)
208 return
209 }
210
211 // build the recipients list
212 // - owner of the repo
213 // - collaborators in the repo
214 var recipients []syntax.DID
215 recipients = append(recipients, syntax.DID(repo.Did))
216 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
217 if err != nil {
218 log.Printf("failed to fetch collaborators: %v", err)
219 return
220 }
221 for _, c := range collaborators {
222 recipients = append(recipients, c.SubjectDid)
223 }
224
225 actorDid := syntax.DID(pull.OwnerDid)
226 eventType := models.NotificationTypePullCreated
227 entityType := "pull"
228 entityId := pull.AtUri().String()
229 repoId := &repo.Id
230 var issueId *int64
231 p := int64(pull.ID)
232 pullId := &p
233
234 n.notifyEvent(
235 actorDid,
236 recipients,
237 eventType,
238 entityType,
239 entityId,
240 repoId,
241 issueId,
242 pullId,
243 )
244}
245
246func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
247 pull, err := db.GetPull(n.db,
248 syntax.ATURI(comment.RepoAt),
249 comment.PullId,
250 )
251 if err != nil {
252 log.Printf("NewPullComment: failed to get pulls: %v", err)
253 return
254 }
255
256 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt))
257 if err != nil {
258 log.Printf("NewPullComment: failed to get repos: %v", err)
259 return
260 }
261
262 // build up the recipients list:
263 // - repo owner
264 // - all pull participants
265 var recipients []syntax.DID
266 recipients = append(recipients, syntax.DID(repo.Did))
267 for _, p := range pull.Participants() {
268 recipients = append(recipients, syntax.DID(p))
269 }
270
271 actorDid := syntax.DID(comment.OwnerDid)
272 eventType := models.NotificationTypePullCommented
273 entityType := "pull"
274 entityId := pull.AtUri().String()
275 repoId := &repo.Id
276 var issueId *int64
277 p := int64(pull.ID)
278 pullId := &p
279
280 n.notifyEvent(
281 actorDid,
282 recipients,
283 eventType,
284 entityType,
285 entityId,
286 repoId,
287 issueId,
288 pullId,
289 )
290 n.notifyEvent(
291 actorDid,
292 mentions,
293 models.NotificationTypeUserMentioned,
294 entityType,
295 entityId,
296 repoId,
297 issueId,
298 pullId,
299 )
300}
301
302func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
303 // no-op
304}
305
306func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
307 // no-op
308}
309
310func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
311 // no-op
312}
313
314func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
315 // no-op
316}
317
318func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
319 // build up the recipients list:
320 // - repo owner
321 // - repo collaborators
322 // - all issue participants
323 var recipients []syntax.DID
324 recipients = append(recipients, syntax.DID(issue.Repo.Did))
325 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
326 if err != nil {
327 log.Printf("failed to fetch collaborators: %v", err)
328 return
329 }
330 for _, c := range collaborators {
331 recipients = append(recipients, c.SubjectDid)
332 }
333 for _, p := range issue.Participants() {
334 recipients = append(recipients, syntax.DID(p))
335 }
336
337 entityType := "pull"
338 entityId := issue.AtUri().String()
339 repoId := &issue.Repo.Id
340 issueId := &issue.Id
341 var pullId *int64
342 var eventType models.NotificationType
343
344 if issue.Open {
345 eventType = models.NotificationTypeIssueReopen
346 } else {
347 eventType = models.NotificationTypeIssueClosed
348 }
349
350 n.notifyEvent(
351 actor,
352 recipients,
353 eventType,
354 entityType,
355 entityId,
356 repoId,
357 issueId,
358 pullId,
359 )
360}
361
362func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
363 // Get repo details
364 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
365 if err != nil {
366 log.Printf("NewPullState: failed to get repos: %v", err)
367 return
368 }
369
370 // build up the recipients list:
371 // - repo owner
372 // - all pull participants
373 var recipients []syntax.DID
374 recipients = append(recipients, syntax.DID(repo.Did))
375 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
376 if err != nil {
377 log.Printf("failed to fetch collaborators: %v", err)
378 return
379 }
380 for _, c := range collaborators {
381 recipients = append(recipients, c.SubjectDid)
382 }
383 for _, p := range pull.Participants() {
384 recipients = append(recipients, syntax.DID(p))
385 }
386
387 entityType := "pull"
388 entityId := pull.AtUri().String()
389 repoId := &repo.Id
390 var issueId *int64
391 var eventType models.NotificationType
392 switch pull.State {
393 case models.PullClosed:
394 eventType = models.NotificationTypePullClosed
395 case models.PullOpen:
396 eventType = models.NotificationTypePullReopen
397 case models.PullMerged:
398 eventType = models.NotificationTypePullMerged
399 default:
400 log.Println("NewPullState: unexpected new PR state:", pull.State)
401 return
402 }
403 p := int64(pull.ID)
404 pullId := &p
405
406 n.notifyEvent(
407 actor,
408 recipients,
409 eventType,
410 entityType,
411 entityId,
412 repoId,
413 issueId,
414 pullId,
415 )
416}
417
418func (n *databaseNotifier) notifyEvent(
419 actorDid syntax.DID,
420 recipients []syntax.DID,
421 eventType models.NotificationType,
422 entityType string,
423 entityId string,
424 repoId *int64,
425 issueId *int64,
426 pullId *int64,
427) {
428 if eventType == models.NotificationTypeUserMentioned && len(recipients) > maxMentions {
429 recipients = recipients[:maxMentions]
430 }
431 recipientSet := make(map[syntax.DID]struct{})
432 for _, did := range recipients {
433 // everybody except actor themselves
434 if did != actorDid {
435 recipientSet[did] = struct{}{}
436 }
437 }
438
439 prefMap, err := db.GetNotificationPreferences(
440 n.db,
441 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))),
442 )
443 if err != nil {
444 // failed to get prefs for users
445 return
446 }
447
448 // create a transaction for bulk notification storage
449 tx, err := n.db.Begin()
450 if err != nil {
451 // failed to start tx
452 return
453 }
454 defer tx.Rollback()
455
456 // filter based on preferences
457 for recipientDid := range recipientSet {
458 prefs, ok := prefMap[recipientDid]
459 if !ok {
460 prefs = models.DefaultNotificationPreferences(recipientDid)
461 }
462
463 // skip users who don’t want this type
464 if !prefs.ShouldNotify(eventType) {
465 continue
466 }
467
468 // create notification
469 notif := &models.Notification{
470 RecipientDid: recipientDid.String(),
471 ActorDid: actorDid.String(),
472 Type: eventType,
473 EntityType: entityType,
474 EntityId: entityId,
475 RepoId: repoId,
476 IssueId: issueId,
477 PullId: pullId,
478 }
479
480 if err := db.CreateNotification(tx, notif); err != nil {
481 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
482 }
483 }
484
485 if err := tx.Commit(); err != nil {
486 // failed to commit
487 return
488 }
489}