forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14) 15 16type databaseNotifier struct { 17 db *db.DB 18 res *idresolver.Resolver 19} 20 21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 22 return &databaseNotifier{ 23 db: database, 24 res: resolver, 25 } 26} 27 28var _ notify.Notifier = &databaseNotifier{} 29 30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 31 // no-op for now 32} 33 34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 35 var err error 36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt))) 37 if err != nil { 38 log.Printf("NewStar: failed to get repos: %v", err) 39 return 40 } 41 42 actorDid := syntax.DID(star.StarredByDid) 43 recipients := []syntax.DID{syntax.DID(repo.Did)} 44 eventType := models.NotificationTypeRepoStarred 45 entityType := "repo" 46 entityId := star.RepoAt.String() 47 repoId := &repo.Id 48 var issueId *int64 49 var pullId *int64 50 51 n.notifyEvent( 52 actorDid, 53 recipients, 54 eventType, 55 entityType, 56 entityId, 57 repoId, 58 issueId, 59 pullId, 60 ) 61} 62 63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 64 // no-op 65} 66 67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue) { 68 69 // build the recipients list 70 // - owner of the repo 71 // - collaborators in the repo 72 var recipients []syntax.DID 73 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 75 if err != nil { 76 log.Printf("failed to fetch collaborators: %v", err) 77 return 78 } 79 for _, c := range collaborators { 80 recipients = append(recipients, c.SubjectDid) 81 } 82 83 actorDid := syntax.DID(issue.Did) 84 eventType := models.NotificationTypeIssueCreated 85 entityType := "issue" 86 entityId := issue.AtUri().String() 87 repoId := &issue.Repo.Id 88 issueId := &issue.Id 89 var pullId *int64 90 91 n.notifyEvent( 92 actorDid, 93 recipients, 94 eventType, 95 entityType, 96 entityId, 97 repoId, 98 issueId, 99 pullId, 100 ) 101} 102 103func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment) { 104 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt)) 105 if err != nil { 106 log.Printf("NewIssueComment: failed to get issues: %v", err) 107 return 108 } 109 if len(issues) == 0 { 110 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 111 return 112 } 113 issue := issues[0] 114 115 var recipients []syntax.DID 116 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 117 118 if comment.IsReply() { 119 // if this comment is a reply, then notify everybody in that thread 120 parentAtUri := *comment.ReplyTo 121 allThreads := issue.CommentList() 122 123 // find the parent thread, and add all DIDs from here to the recipient list 124 for _, t := range allThreads { 125 if t.Self.AtUri().String() == parentAtUri { 126 recipients = append(recipients, t.Participants()...) 127 } 128 } 129 } else { 130 // not a reply, notify just the issue author 131 recipients = append(recipients, syntax.DID(issue.Did)) 132 } 133 134 actorDid := syntax.DID(comment.Did) 135 eventType := models.NotificationTypeIssueCommented 136 entityType := "issue" 137 entityId := issue.AtUri().String() 138 repoId := &issue.Repo.Id 139 issueId := &issue.Id 140 var pullId *int64 141 142 n.notifyEvent( 143 actorDid, 144 recipients, 145 eventType, 146 entityType, 147 entityId, 148 repoId, 149 issueId, 150 pullId, 151 ) 152} 153 154func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 155 actorDid := syntax.DID(follow.UserDid) 156 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 157 eventType := models.NotificationTypeFollowed 158 entityType := "follow" 159 entityId := follow.UserDid 160 var repoId, issueId, pullId *int64 161 162 n.notifyEvent( 163 actorDid, 164 recipients, 165 eventType, 166 entityType, 167 entityId, 168 repoId, 169 issueId, 170 pullId, 171 ) 172} 173 174func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 175 // no-op 176} 177 178func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 179 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 180 if err != nil { 181 log.Printf("NewPull: failed to get repos: %v", err) 182 return 183 } 184 185 // build the recipients list 186 // - owner of the repo 187 // - collaborators in the repo 188 var recipients []syntax.DID 189 recipients = append(recipients, syntax.DID(repo.Did)) 190 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 191 if err != nil { 192 log.Printf("failed to fetch collaborators: %v", err) 193 return 194 } 195 for _, c := range collaborators { 196 recipients = append(recipients, c.SubjectDid) 197 } 198 199 actorDid := syntax.DID(pull.OwnerDid) 200 eventType := models.NotificationTypePullCreated 201 entityType := "pull" 202 entityId := pull.PullAt().String() 203 repoId := &repo.Id 204 var issueId *int64 205 p := int64(pull.ID) 206 pullId := &p 207 208 n.notifyEvent( 209 actorDid, 210 recipients, 211 eventType, 212 entityType, 213 entityId, 214 repoId, 215 issueId, 216 pullId, 217 ) 218} 219 220func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) { 221 pull, err := db.GetPull(n.db, 222 syntax.ATURI(comment.RepoAt), 223 comment.PullId, 224 ) 225 if err != nil { 226 log.Printf("NewPullComment: failed to get pulls: %v", err) 227 return 228 } 229 230 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt)) 231 if err != nil { 232 log.Printf("NewPullComment: failed to get repos: %v", err) 233 return 234 } 235 236 // build up the recipients list: 237 // - repo owner 238 // - all pull participants 239 var recipients []syntax.DID 240 recipients = append(recipients, syntax.DID(repo.Did)) 241 for _, p := range pull.Participants() { 242 recipients = append(recipients, syntax.DID(p)) 243 } 244 245 actorDid := syntax.DID(comment.OwnerDid) 246 eventType := models.NotificationTypePullCommented 247 entityType := "pull" 248 entityId := pull.PullAt().String() 249 repoId := &repo.Id 250 var issueId *int64 251 p := int64(pull.ID) 252 pullId := &p 253 254 n.notifyEvent( 255 actorDid, 256 recipients, 257 eventType, 258 entityType, 259 entityId, 260 repoId, 261 issueId, 262 pullId, 263 ) 264} 265 266func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 267 // no-op 268} 269 270func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 271 // no-op 272} 273 274func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 275 // no-op 276} 277 278func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 279 // no-op 280} 281 282func (n *databaseNotifier) NewIssueClosed(ctx context.Context, issue *models.Issue) { 283 // build up the recipients list: 284 // - repo owner 285 // - repo collaborators 286 // - all issue participants 287 var recipients []syntax.DID 288 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 289 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 290 if err != nil { 291 log.Printf("failed to fetch collaborators: %v", err) 292 return 293 } 294 for _, c := range collaborators { 295 recipients = append(recipients, c.SubjectDid) 296 } 297 for _, p := range issue.Participants() { 298 recipients = append(recipients, syntax.DID(p)) 299 } 300 301 actorDid := syntax.DID(issue.Repo.Did) 302 eventType := models.NotificationTypeIssueClosed 303 entityType := "pull" 304 entityId := issue.AtUri().String() 305 repoId := &issue.Repo.Id 306 issueId := &issue.Id 307 var pullId *int64 308 309 n.notifyEvent( 310 actorDid, 311 recipients, 312 eventType, 313 entityType, 314 entityId, 315 repoId, 316 issueId, 317 pullId, 318 ) 319} 320 321func (n *databaseNotifier) NewPullMerged(ctx context.Context, pull *models.Pull) { 322 // Get repo details 323 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 324 if err != nil { 325 log.Printf("NewPullMerged: failed to get repos: %v", err) 326 return 327 } 328 329 // build up the recipients list: 330 // - repo owner 331 // - all pull participants 332 var recipients []syntax.DID 333 recipients = append(recipients, syntax.DID(repo.Did)) 334 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 335 if err != nil { 336 log.Printf("failed to fetch collaborators: %v", err) 337 return 338 } 339 for _, c := range collaborators { 340 recipients = append(recipients, c.SubjectDid) 341 } 342 for _, p := range pull.Participants() { 343 recipients = append(recipients, syntax.DID(p)) 344 } 345 346 actorDid := syntax.DID(repo.Did) 347 eventType := models.NotificationTypePullMerged 348 entityType := "pull" 349 entityId := pull.PullAt().String() 350 repoId := &repo.Id 351 var issueId *int64 352 p := int64(pull.ID) 353 pullId := &p 354 355 n.notifyEvent( 356 actorDid, 357 recipients, 358 eventType, 359 entityType, 360 entityId, 361 repoId, 362 issueId, 363 pullId, 364 ) 365} 366 367func (n *databaseNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) { 368 // Get repo details 369 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 370 if err != nil { 371 log.Printf("NewPullMerged: failed to get repos: %v", err) 372 return 373 } 374 375 // build up the recipients list: 376 // - repo owner 377 // - all pull participants 378 var recipients []syntax.DID 379 recipients = append(recipients, syntax.DID(repo.Did)) 380 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 381 if err != nil { 382 log.Printf("failed to fetch collaborators: %v", err) 383 return 384 } 385 for _, c := range collaborators { 386 recipients = append(recipients, c.SubjectDid) 387 } 388 for _, p := range pull.Participants() { 389 recipients = append(recipients, syntax.DID(p)) 390 } 391 392 actorDid := syntax.DID(repo.Did) 393 eventType := models.NotificationTypePullClosed 394 entityType := "pull" 395 entityId := pull.PullAt().String() 396 repoId := &repo.Id 397 var issueId *int64 398 p := int64(pull.ID) 399 pullId := &p 400 401 n.notifyEvent( 402 actorDid, 403 recipients, 404 eventType, 405 entityType, 406 entityId, 407 repoId, 408 issueId, 409 pullId, 410 ) 411} 412 413func (n *databaseNotifier) notifyEvent( 414 actorDid syntax.DID, 415 recipients []syntax.DID, 416 eventType models.NotificationType, 417 entityType string, 418 entityId string, 419 repoId *int64, 420 issueId *int64, 421 pullId *int64, 422) { 423 recipientSet := make(map[syntax.DID]struct{}) 424 for _, did := range recipients { 425 // everybody except actor themselves 426 if did != actorDid { 427 recipientSet[did] = struct{}{} 428 } 429 } 430 431 prefMap, err := db.GetNotificationPreferences( 432 n.db, 433 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 434 ) 435 if err != nil { 436 // failed to get prefs for users 437 return 438 } 439 440 // create a transaction for bulk notification storage 441 tx, err := n.db.Begin() 442 if err != nil { 443 // failed to start tx 444 return 445 } 446 defer tx.Rollback() 447 448 // filter based on preferences 449 for recipientDid := range recipientSet { 450 prefs, ok := prefMap[recipientDid] 451 if !ok { 452 prefs = models.DefaultNotificationPreferences(recipientDid) 453 } 454 455 // skip users who don’t want this type 456 if !prefs.ShouldNotify(eventType) { 457 continue 458 } 459 460 // create notification 461 notif := &models.Notification{ 462 RecipientDid: recipientDid.String(), 463 ActorDid: actorDid.String(), 464 Type: eventType, 465 EntityType: entityType, 466 EntityId: entityId, 467 RepoId: repoId, 468 IssueId: issueId, 469 PullId: pullId, 470 } 471 472 if err := db.CreateNotification(tx, notif); err != nil { 473 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 474 } 475 } 476 477 if err := tx.Commit(); err != nil { 478 // failed to commit 479 return 480 } 481}