forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "context"
5 "log"
6 "maps"
7 "slices"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14)
15
16type databaseNotifier struct {
17 db *db.DB
18 res *idresolver.Resolver
19}
20
21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
22 return &databaseNotifier{
23 db: database,
24 res: resolver,
25 }
26}
27
28var _ notify.Notifier = &databaseNotifier{}
29
30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
31 // no-op for now
32}
33
34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
35 var err error
36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt)))
37 if err != nil {
38 log.Printf("NewStar: failed to get repos: %v", err)
39 return
40 }
41
42 actorDid := syntax.DID(star.StarredByDid)
43 recipients := []syntax.DID{syntax.DID(repo.Did)}
44 eventType := models.NotificationTypeRepoStarred
45 entityType := "repo"
46 entityId := star.RepoAt.String()
47 repoId := &repo.Id
48 var issueId *int64
49 var pullId *int64
50
51 n.notifyEvent(
52 actorDid,
53 recipients,
54 eventType,
55 entityType,
56 entityId,
57 repoId,
58 issueId,
59 pullId,
60 )
61}
62
63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
64 // no-op
65}
66
67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue) {
68
69 // build the recipients list
70 // - owner of the repo
71 // - collaborators in the repo
72 var recipients []syntax.DID
73 recipients = append(recipients, syntax.DID(issue.Repo.Did))
74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
75 if err != nil {
76 log.Printf("failed to fetch collaborators: %v", err)
77 return
78 }
79 for _, c := range collaborators {
80 recipients = append(recipients, c.SubjectDid)
81 }
82
83 actorDid := syntax.DID(issue.Did)
84 eventType := models.NotificationTypeIssueCreated
85 entityType := "issue"
86 entityId := issue.AtUri().String()
87 repoId := &issue.Repo.Id
88 issueId := &issue.Id
89 var pullId *int64
90
91 n.notifyEvent(
92 actorDid,
93 recipients,
94 eventType,
95 entityType,
96 entityId,
97 repoId,
98 issueId,
99 pullId,
100 )
101}
102
103func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment) {
104 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt))
105 if err != nil {
106 log.Printf("NewIssueComment: failed to get issues: %v", err)
107 return
108 }
109 if len(issues) == 0 {
110 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
111 return
112 }
113 issue := issues[0]
114
115 var recipients []syntax.DID
116 recipients = append(recipients, syntax.DID(issue.Repo.Did))
117
118 if comment.IsReply() {
119 // if this comment is a reply, then notify everybody in that thread
120 parentAtUri := *comment.ReplyTo
121 allThreads := issue.CommentList()
122
123 // find the parent thread, and add all DIDs from here to the recipient list
124 for _, t := range allThreads {
125 if t.Self.AtUri().String() == parentAtUri {
126 recipients = append(recipients, t.Participants()...)
127 }
128 }
129 } else {
130 // not a reply, notify just the issue author
131 recipients = append(recipients, syntax.DID(issue.Did))
132 }
133
134 actorDid := syntax.DID(comment.Did)
135 eventType := models.NotificationTypeIssueCommented
136 entityType := "issue"
137 entityId := issue.AtUri().String()
138 repoId := &issue.Repo.Id
139 issueId := &issue.Id
140 var pullId *int64
141
142 n.notifyEvent(
143 actorDid,
144 recipients,
145 eventType,
146 entityType,
147 entityId,
148 repoId,
149 issueId,
150 pullId,
151 )
152}
153
154func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
155 actorDid := syntax.DID(follow.UserDid)
156 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)}
157 eventType := models.NotificationTypeFollowed
158 entityType := "follow"
159 entityId := follow.UserDid
160 var repoId, issueId, pullId *int64
161
162 n.notifyEvent(
163 actorDid,
164 recipients,
165 eventType,
166 entityType,
167 entityId,
168 repoId,
169 issueId,
170 pullId,
171 )
172}
173
174func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
175 // no-op
176}
177
178func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
179 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
180 if err != nil {
181 log.Printf("NewPull: failed to get repos: %v", err)
182 return
183 }
184
185 // build the recipients list
186 // - owner of the repo
187 // - collaborators in the repo
188 var recipients []syntax.DID
189 recipients = append(recipients, syntax.DID(repo.Did))
190 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
191 if err != nil {
192 log.Printf("failed to fetch collaborators: %v", err)
193 return
194 }
195 for _, c := range collaborators {
196 recipients = append(recipients, c.SubjectDid)
197 }
198
199 actorDid := syntax.DID(pull.OwnerDid)
200 eventType := models.NotificationTypePullCreated
201 entityType := "pull"
202 entityId := pull.PullAt().String()
203 repoId := &repo.Id
204 var issueId *int64
205 p := int64(pull.ID)
206 pullId := &p
207
208 n.notifyEvent(
209 actorDid,
210 recipients,
211 eventType,
212 entityType,
213 entityId,
214 repoId,
215 issueId,
216 pullId,
217 )
218}
219
220func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) {
221 pull, err := db.GetPull(n.db,
222 syntax.ATURI(comment.RepoAt),
223 comment.PullId,
224 )
225 if err != nil {
226 log.Printf("NewPullComment: failed to get pulls: %v", err)
227 return
228 }
229
230 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt))
231 if err != nil {
232 log.Printf("NewPullComment: failed to get repos: %v", err)
233 return
234 }
235
236 // build up the recipients list:
237 // - repo owner
238 // - all pull participants
239 var recipients []syntax.DID
240 recipients = append(recipients, syntax.DID(repo.Did))
241 for _, p := range pull.Participants() {
242 recipients = append(recipients, syntax.DID(p))
243 }
244
245 actorDid := syntax.DID(comment.OwnerDid)
246 eventType := models.NotificationTypePullCommented
247 entityType := "pull"
248 entityId := pull.PullAt().String()
249 repoId := &repo.Id
250 var issueId *int64
251 p := int64(pull.ID)
252 pullId := &p
253
254 n.notifyEvent(
255 actorDid,
256 recipients,
257 eventType,
258 entityType,
259 entityId,
260 repoId,
261 issueId,
262 pullId,
263 )
264}
265
266func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
267 // no-op
268}
269
270func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
271 // no-op
272}
273
274func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
275 // no-op
276}
277
278func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
279 // no-op
280}
281
282func (n *databaseNotifier) NewIssueClosed(ctx context.Context, issue *models.Issue) {
283 // build up the recipients list:
284 // - repo owner
285 // - repo collaborators
286 // - all issue participants
287 var recipients []syntax.DID
288 recipients = append(recipients, syntax.DID(issue.Repo.Did))
289 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
290 if err != nil {
291 log.Printf("failed to fetch collaborators: %v", err)
292 return
293 }
294 for _, c := range collaborators {
295 recipients = append(recipients, c.SubjectDid)
296 }
297 for _, p := range issue.Participants() {
298 recipients = append(recipients, syntax.DID(p))
299 }
300
301 actorDid := syntax.DID(issue.Repo.Did)
302 eventType := models.NotificationTypeIssueClosed
303 entityType := "pull"
304 entityId := issue.AtUri().String()
305 repoId := &issue.Repo.Id
306 issueId := &issue.Id
307 var pullId *int64
308
309 n.notifyEvent(
310 actorDid,
311 recipients,
312 eventType,
313 entityType,
314 entityId,
315 repoId,
316 issueId,
317 pullId,
318 )
319}
320
321func (n *databaseNotifier) NewPullMerged(ctx context.Context, pull *models.Pull) {
322 // Get repo details
323 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
324 if err != nil {
325 log.Printf("NewPullMerged: failed to get repos: %v", err)
326 return
327 }
328
329 // build up the recipients list:
330 // - repo owner
331 // - all pull participants
332 var recipients []syntax.DID
333 recipients = append(recipients, syntax.DID(repo.Did))
334 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
335 if err != nil {
336 log.Printf("failed to fetch collaborators: %v", err)
337 return
338 }
339 for _, c := range collaborators {
340 recipients = append(recipients, c.SubjectDid)
341 }
342 for _, p := range pull.Participants() {
343 recipients = append(recipients, syntax.DID(p))
344 }
345
346 actorDid := syntax.DID(repo.Did)
347 eventType := models.NotificationTypePullMerged
348 entityType := "pull"
349 entityId := pull.PullAt().String()
350 repoId := &repo.Id
351 var issueId *int64
352 p := int64(pull.ID)
353 pullId := &p
354
355 n.notifyEvent(
356 actorDid,
357 recipients,
358 eventType,
359 entityType,
360 entityId,
361 repoId,
362 issueId,
363 pullId,
364 )
365}
366
367func (n *databaseNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) {
368 // Get repo details
369 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
370 if err != nil {
371 log.Printf("NewPullMerged: failed to get repos: %v", err)
372 return
373 }
374
375 // build up the recipients list:
376 // - repo owner
377 // - all pull participants
378 var recipients []syntax.DID
379 recipients = append(recipients, syntax.DID(repo.Did))
380 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
381 if err != nil {
382 log.Printf("failed to fetch collaborators: %v", err)
383 return
384 }
385 for _, c := range collaborators {
386 recipients = append(recipients, c.SubjectDid)
387 }
388 for _, p := range pull.Participants() {
389 recipients = append(recipients, syntax.DID(p))
390 }
391
392 actorDid := syntax.DID(repo.Did)
393 eventType := models.NotificationTypePullClosed
394 entityType := "pull"
395 entityId := pull.PullAt().String()
396 repoId := &repo.Id
397 var issueId *int64
398 p := int64(pull.ID)
399 pullId := &p
400
401 n.notifyEvent(
402 actorDid,
403 recipients,
404 eventType,
405 entityType,
406 entityId,
407 repoId,
408 issueId,
409 pullId,
410 )
411}
412
413func (n *databaseNotifier) notifyEvent(
414 actorDid syntax.DID,
415 recipients []syntax.DID,
416 eventType models.NotificationType,
417 entityType string,
418 entityId string,
419 repoId *int64,
420 issueId *int64,
421 pullId *int64,
422) {
423 recipientSet := make(map[syntax.DID]struct{})
424 for _, did := range recipients {
425 // everybody except actor themselves
426 if did != actorDid {
427 recipientSet[did] = struct{}{}
428 }
429 }
430
431 prefMap, err := db.GetNotificationPreferences(
432 n.db,
433 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))),
434 )
435 if err != nil {
436 // failed to get prefs for users
437 return
438 }
439
440 // create a transaction for bulk notification storage
441 tx, err := n.db.Begin()
442 if err != nil {
443 // failed to start tx
444 return
445 }
446 defer tx.Rollback()
447
448 // filter based on preferences
449 for recipientDid := range recipientSet {
450 prefs, ok := prefMap[recipientDid]
451 if !ok {
452 prefs = models.DefaultNotificationPreferences(recipientDid)
453 }
454
455 // skip users who don’t want this type
456 if !prefs.ShouldNotify(eventType) {
457 continue
458 }
459
460 // create notification
461 notif := &models.Notification{
462 RecipientDid: recipientDid.String(),
463 ActorDid: actorDid.String(),
464 Type: eventType,
465 EntityType: entityType,
466 EntityId: entityId,
467 RepoId: repoId,
468 IssueId: issueId,
469 PullId: pullId,
470 }
471
472 if err := db.CreateNotification(tx, notif); err != nil {
473 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
474 }
475 }
476
477 if err := tx.Commit(); err != nil {
478 // failed to commit
479 return
480 }
481}