forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "context"
5 "log"
6 "maps"
7 "slices"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14)
15
16type databaseNotifier struct {
17 db *db.DB
18 res *idresolver.Resolver
19}
20
21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
22 return &databaseNotifier{
23 db: database,
24 res: resolver,
25 }
26}
27
28var _ notify.Notifier = &databaseNotifier{}
29
30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
31 // no-op for now
32}
33
34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
35 var err error
36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt)))
37 if err != nil {
38 log.Printf("NewStar: failed to get repos: %v", err)
39 return
40 }
41
42 actorDid := syntax.DID(star.StarredByDid)
43 recipients := []syntax.DID{syntax.DID(repo.Did)}
44 eventType := models.NotificationTypeRepoStarred
45 entityType := "repo"
46 entityId := star.RepoAt.String()
47 repoId := &repo.Id
48 var issueId *int64
49 var pullId *int64
50
51 n.notifyEvent(
52 actorDid,
53 recipients,
54 eventType,
55 entityType,
56 entityId,
57 repoId,
58 issueId,
59 pullId,
60 )
61}
62
63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
64 // no-op
65}
66
67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue) {
68
69 // build the recipients list
70 // - owner of the repo
71 // - collaborators in the repo
72 var recipients []syntax.DID
73 recipients = append(recipients, syntax.DID(issue.Repo.Did))
74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
75 if err != nil {
76 log.Printf("failed to fetch collaborators: %v", err)
77 return
78 }
79 for _, c := range collaborators {
80 recipients = append(recipients, c.SubjectDid)
81 }
82
83 actorDid := syntax.DID(issue.Did)
84 eventType := models.NotificationTypeIssueCreated
85 entityType := "issue"
86 entityId := issue.AtUri().String()
87 repoId := &issue.Repo.Id
88 issueId := &issue.Id
89 var pullId *int64
90
91 n.notifyEvent(
92 actorDid,
93 recipients,
94 eventType,
95 entityType,
96 entityId,
97 repoId,
98 issueId,
99 pullId,
100 )
101}
102
103func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment) {
104 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt))
105 if err != nil {
106 log.Printf("NewIssueComment: failed to get issues: %v", err)
107 return
108 }
109 if len(issues) == 0 {
110 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
111 return
112 }
113 issue := issues[0]
114
115 var recipients []syntax.DID
116 recipients = append(recipients, syntax.DID(issue.Repo.Did))
117
118 if comment.IsReply() {
119 // if this comment is a reply, then notify everybody in that thread
120 parentAtUri := *comment.ReplyTo
121 allThreads := issue.CommentList()
122
123 // find the parent thread, and add all DIDs from here to the recipient list
124 for _, t := range allThreads {
125 if t.Self.AtUri().String() == parentAtUri {
126 recipients = append(recipients, t.Participants()...)
127 }
128 }
129 } else {
130 // not a reply, notify just the issue author
131 recipients = append(recipients, syntax.DID(issue.Did))
132 }
133
134 actorDid := syntax.DID(comment.Did)
135 eventType := models.NotificationTypeIssueCommented
136 entityType := "issue"
137 entityId := issue.AtUri().String()
138 repoId := &issue.Repo.Id
139 issueId := &issue.Id
140 var pullId *int64
141
142 n.notifyEvent(
143 actorDid,
144 recipients,
145 eventType,
146 entityType,
147 entityId,
148 repoId,
149 issueId,
150 pullId,
151 )
152}
153
154func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
155 // no-op for now
156}
157
158func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
159 actorDid := syntax.DID(follow.UserDid)
160 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)}
161 eventType := models.NotificationTypeFollowed
162 entityType := "follow"
163 entityId := follow.UserDid
164 var repoId, issueId, pullId *int64
165
166 n.notifyEvent(
167 actorDid,
168 recipients,
169 eventType,
170 entityType,
171 entityId,
172 repoId,
173 issueId,
174 pullId,
175 )
176}
177
178func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
179 // no-op
180}
181
182func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
183 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
184 if err != nil {
185 log.Printf("NewPull: failed to get repos: %v", err)
186 return
187 }
188
189 // build the recipients list
190 // - owner of the repo
191 // - collaborators in the repo
192 var recipients []syntax.DID
193 recipients = append(recipients, syntax.DID(repo.Did))
194 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
195 if err != nil {
196 log.Printf("failed to fetch collaborators: %v", err)
197 return
198 }
199 for _, c := range collaborators {
200 recipients = append(recipients, c.SubjectDid)
201 }
202
203 actorDid := syntax.DID(pull.OwnerDid)
204 eventType := models.NotificationTypePullCreated
205 entityType := "pull"
206 entityId := pull.PullAt().String()
207 repoId := &repo.Id
208 var issueId *int64
209 p := int64(pull.ID)
210 pullId := &p
211
212 n.notifyEvent(
213 actorDid,
214 recipients,
215 eventType,
216 entityType,
217 entityId,
218 repoId,
219 issueId,
220 pullId,
221 )
222}
223
224func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) {
225 pull, err := db.GetPull(n.db,
226 syntax.ATURI(comment.RepoAt),
227 comment.PullId,
228 )
229 if err != nil {
230 log.Printf("NewPullComment: failed to get pulls: %v", err)
231 return
232 }
233
234 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt))
235 if err != nil {
236 log.Printf("NewPullComment: failed to get repos: %v", err)
237 return
238 }
239
240 // build up the recipients list:
241 // - repo owner
242 // - all pull participants
243 var recipients []syntax.DID
244 recipients = append(recipients, syntax.DID(repo.Did))
245 for _, p := range pull.Participants() {
246 recipients = append(recipients, syntax.DID(p))
247 }
248
249 actorDid := syntax.DID(comment.OwnerDid)
250 eventType := models.NotificationTypePullCommented
251 entityType := "pull"
252 entityId := pull.PullAt().String()
253 repoId := &repo.Id
254 var issueId *int64
255 p := int64(pull.ID)
256 pullId := &p
257
258 n.notifyEvent(
259 actorDid,
260 recipients,
261 eventType,
262 entityType,
263 entityId,
264 repoId,
265 issueId,
266 pullId,
267 )
268}
269
270func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
271 // no-op
272}
273
274func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
275 // no-op
276}
277
278func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
279 // no-op
280}
281
282func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
283 // no-op
284}
285
286func (n *databaseNotifier) NewIssueState(ctx context.Context, issue *models.Issue) {
287 // build up the recipients list:
288 // - repo owner
289 // - repo collaborators
290 // - all issue participants
291 var recipients []syntax.DID
292 recipients = append(recipients, syntax.DID(issue.Repo.Did))
293 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
294 if err != nil {
295 log.Printf("failed to fetch collaborators: %v", err)
296 return
297 }
298 for _, c := range collaborators {
299 recipients = append(recipients, c.SubjectDid)
300 }
301 for _, p := range issue.Participants() {
302 recipients = append(recipients, syntax.DID(p))
303 }
304
305 actorDid := syntax.DID(issue.Repo.Did)
306 entityType := "pull"
307 entityId := issue.AtUri().String()
308 repoId := &issue.Repo.Id
309 issueId := &issue.Id
310 var pullId *int64
311 var eventType models.NotificationType
312
313 if issue.Open {
314 eventType = models.NotificationTypeIssueReopen
315 } else {
316 eventType = models.NotificationTypeIssueClosed
317 }
318
319 n.notifyEvent(
320 actorDid,
321 recipients,
322 eventType,
323 entityType,
324 entityId,
325 repoId,
326 issueId,
327 pullId,
328 )
329}
330
331func (n *databaseNotifier) NewPullState(ctx context.Context, pull *models.Pull) {
332 // Get repo details
333 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
334 if err != nil {
335 log.Printf("NewPullState: failed to get repos: %v", err)
336 return
337 }
338
339 // build up the recipients list:
340 // - repo owner
341 // - all pull participants
342 var recipients []syntax.DID
343 recipients = append(recipients, syntax.DID(repo.Did))
344 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
345 if err != nil {
346 log.Printf("failed to fetch collaborators: %v", err)
347 return
348 }
349 for _, c := range collaborators {
350 recipients = append(recipients, c.SubjectDid)
351 }
352 for _, p := range pull.Participants() {
353 recipients = append(recipients, syntax.DID(p))
354 }
355
356 actorDid := syntax.DID(repo.Did)
357 entityType := "pull"
358 entityId := pull.PullAt().String()
359 repoId := &repo.Id
360 var issueId *int64
361 var eventType models.NotificationType
362 switch pull.State {
363 case models.PullClosed:
364 eventType = models.NotificationTypePullClosed
365 case models.PullOpen:
366 eventType = models.NotificationTypePullReopen
367 case models.PullMerged:
368 eventType = models.NotificationTypePullMerged
369 default:
370 log.Println("NewPullState: unexpected new PR state:", pull.State)
371 return
372 }
373 p := int64(pull.ID)
374 pullId := &p
375
376 n.notifyEvent(
377 actorDid,
378 recipients,
379 eventType,
380 entityType,
381 entityId,
382 repoId,
383 issueId,
384 pullId,
385 )
386}
387
388func (n *databaseNotifier) notifyEvent(
389 actorDid syntax.DID,
390 recipients []syntax.DID,
391 eventType models.NotificationType,
392 entityType string,
393 entityId string,
394 repoId *int64,
395 issueId *int64,
396 pullId *int64,
397) {
398 recipientSet := make(map[syntax.DID]struct{})
399 for _, did := range recipients {
400 // everybody except actor themselves
401 if did != actorDid {
402 recipientSet[did] = struct{}{}
403 }
404 }
405
406 prefMap, err := db.GetNotificationPreferences(
407 n.db,
408 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))),
409 )
410 if err != nil {
411 // failed to get prefs for users
412 return
413 }
414
415 // create a transaction for bulk notification storage
416 tx, err := n.db.Begin()
417 if err != nil {
418 // failed to start tx
419 return
420 }
421 defer tx.Rollback()
422
423 // filter based on preferences
424 for recipientDid := range recipientSet {
425 prefs, ok := prefMap[recipientDid]
426 if !ok {
427 prefs = models.DefaultNotificationPreferences(recipientDid)
428 }
429
430 // skip users who don’t want this type
431 if !prefs.ShouldNotify(eventType) {
432 continue
433 }
434
435 // create notification
436 notif := &models.Notification{
437 RecipientDid: recipientDid.String(),
438 ActorDid: actorDid.String(),
439 Type: eventType,
440 EntityType: entityType,
441 EntityId: entityId,
442 RepoId: repoId,
443 IssueId: issueId,
444 PullId: pullId,
445 }
446
447 if err := db.CreateNotification(tx, notif); err != nil {
448 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
449 }
450 }
451
452 if err := tx.Commit(); err != nil {
453 // failed to commit
454 return
455 }
456}