forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 11 kB view raw
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/appview/db" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/appview/notify" 14 "tangled.org/core/idresolver" 15) 16 17const ( 18 maxMentions = 5 19) 20 21type databaseNotifier struct { 22 db *db.DB 23 res *idresolver.Resolver 24} 25 26func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 27 return &databaseNotifier{ 28 db: database, 29 res: resolver, 30 } 31} 32 33var _ notify.Notifier = &databaseNotifier{} 34 35func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 36 // no-op for now 37} 38 39func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 40 if star.RepoAt.Collection().String() != tangled.RepoNSID { 41 // skip string stars for now 42 return 43 } 44 var err error 45 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt))) 46 if err != nil { 47 log.Printf("NewStar: failed to get repos: %v", err) 48 return 49 } 50 51 actorDid := syntax.DID(star.Did) 52 recipients := []syntax.DID{syntax.DID(repo.Did)} 53 eventType := models.NotificationTypeRepoStarred 54 entityType := "repo" 55 entityId := star.RepoAt.String() 56 repoId := &repo.Id 57 var issueId *int64 58 var pullId *int64 59 60 n.notifyEvent( 61 actorDid, 62 recipients, 63 eventType, 64 entityType, 65 entityId, 66 repoId, 67 issueId, 68 pullId, 69 ) 70} 71 72func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 73 // no-op 74} 75 76func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 77 78 // build the recipients list 79 // - owner of the repo 80 // - collaborators in the repo 81 var recipients []syntax.DID 82 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 83 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 84 if err != nil { 85 log.Printf("failed to fetch collaborators: %v", err) 86 return 87 } 88 for _, c := range collaborators { 89 recipients = append(recipients, c.SubjectDid) 90 } 91 92 actorDid := syntax.DID(issue.Did) 93 entityType := "issue" 94 entityId := issue.AtUri().String() 95 repoId := &issue.Repo.Id 96 issueId := &issue.Id 97 var pullId *int64 98 99 n.notifyEvent( 100 actorDid, 101 recipients, 102 models.NotificationTypeIssueCreated, 103 entityType, 104 entityId, 105 repoId, 106 issueId, 107 pullId, 108 ) 109 n.notifyEvent( 110 actorDid, 111 mentions, 112 models.NotificationTypeUserMentioned, 113 entityType, 114 entityId, 115 repoId, 116 issueId, 117 pullId, 118 ) 119} 120 121func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 122 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt)) 123 if err != nil { 124 log.Printf("NewIssueComment: failed to get issues: %v", err) 125 return 126 } 127 if len(issues) == 0 { 128 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 129 return 130 } 131 issue := issues[0] 132 133 var recipients []syntax.DID 134 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 135 136 if comment.IsReply() { 137 // if this comment is a reply, then notify everybody in that thread 138 parentAtUri := *comment.ReplyTo 139 allThreads := issue.CommentList() 140 141 // find the parent thread, and add all DIDs from here to the recipient list 142 for _, t := range allThreads { 143 if t.Self.AtUri().String() == parentAtUri { 144 recipients = append(recipients, t.Participants()...) 145 } 146 } 147 } else { 148 // not a reply, notify just the issue author 149 recipients = append(recipients, syntax.DID(issue.Did)) 150 } 151 152 actorDid := syntax.DID(comment.Did) 153 entityType := "issue" 154 entityId := issue.AtUri().String() 155 repoId := &issue.Repo.Id 156 issueId := &issue.Id 157 var pullId *int64 158 159 n.notifyEvent( 160 actorDid, 161 recipients, 162 models.NotificationTypeIssueCommented, 163 entityType, 164 entityId, 165 repoId, 166 issueId, 167 pullId, 168 ) 169 n.notifyEvent( 170 actorDid, 171 mentions, 172 models.NotificationTypeUserMentioned, 173 entityType, 174 entityId, 175 repoId, 176 issueId, 177 pullId, 178 ) 179} 180 181func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 182 // no-op for now 183} 184 185func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 186 actorDid := syntax.DID(follow.UserDid) 187 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 188 eventType := models.NotificationTypeFollowed 189 entityType := "follow" 190 entityId := follow.UserDid 191 var repoId, issueId, pullId *int64 192 193 n.notifyEvent( 194 actorDid, 195 recipients, 196 eventType, 197 entityType, 198 entityId, 199 repoId, 200 issueId, 201 pullId, 202 ) 203} 204 205func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 206 // no-op 207} 208 209func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 210 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 211 if err != nil { 212 log.Printf("NewPull: failed to get repos: %v", err) 213 return 214 } 215 216 // build the recipients list 217 // - owner of the repo 218 // - collaborators in the repo 219 var recipients []syntax.DID 220 recipients = append(recipients, syntax.DID(repo.Did)) 221 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 222 if err != nil { 223 log.Printf("failed to fetch collaborators: %v", err) 224 return 225 } 226 for _, c := range collaborators { 227 recipients = append(recipients, c.SubjectDid) 228 } 229 230 actorDid := syntax.DID(pull.OwnerDid) 231 eventType := models.NotificationTypePullCreated 232 entityType := "pull" 233 entityId := pull.AtUri().String() 234 repoId := &repo.Id 235 var issueId *int64 236 p := int64(pull.ID) 237 pullId := &p 238 239 n.notifyEvent( 240 actorDid, 241 recipients, 242 eventType, 243 entityType, 244 entityId, 245 repoId, 246 issueId, 247 pullId, 248 ) 249} 250 251func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 252 pull, err := db.GetPull(n.db, 253 syntax.ATURI(comment.RepoAt), 254 comment.PullId, 255 ) 256 if err != nil { 257 log.Printf("NewPullComment: failed to get pulls: %v", err) 258 return 259 } 260 261 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt)) 262 if err != nil { 263 log.Printf("NewPullComment: failed to get repos: %v", err) 264 return 265 } 266 267 // build up the recipients list: 268 // - repo owner 269 // - all pull participants 270 var recipients []syntax.DID 271 recipients = append(recipients, syntax.DID(repo.Did)) 272 for _, p := range pull.Participants() { 273 recipients = append(recipients, syntax.DID(p)) 274 } 275 276 actorDid := syntax.DID(comment.OwnerDid) 277 eventType := models.NotificationTypePullCommented 278 entityType := "pull" 279 entityId := pull.AtUri().String() 280 repoId := &repo.Id 281 var issueId *int64 282 p := int64(pull.ID) 283 pullId := &p 284 285 n.notifyEvent( 286 actorDid, 287 recipients, 288 eventType, 289 entityType, 290 entityId, 291 repoId, 292 issueId, 293 pullId, 294 ) 295 n.notifyEvent( 296 actorDid, 297 mentions, 298 models.NotificationTypeUserMentioned, 299 entityType, 300 entityId, 301 repoId, 302 issueId, 303 pullId, 304 ) 305} 306 307func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 308 // no-op 309} 310 311func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 312 // no-op 313} 314 315func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 316 // no-op 317} 318 319func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 320 // no-op 321} 322 323func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 324 // build up the recipients list: 325 // - repo owner 326 // - repo collaborators 327 // - all issue participants 328 var recipients []syntax.DID 329 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 330 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 331 if err != nil { 332 log.Printf("failed to fetch collaborators: %v", err) 333 return 334 } 335 for _, c := range collaborators { 336 recipients = append(recipients, c.SubjectDid) 337 } 338 for _, p := range issue.Participants() { 339 recipients = append(recipients, syntax.DID(p)) 340 } 341 342 entityType := "pull" 343 entityId := issue.AtUri().String() 344 repoId := &issue.Repo.Id 345 issueId := &issue.Id 346 var pullId *int64 347 var eventType models.NotificationType 348 349 if issue.Open { 350 eventType = models.NotificationTypeIssueReopen 351 } else { 352 eventType = models.NotificationTypeIssueClosed 353 } 354 355 n.notifyEvent( 356 actor, 357 recipients, 358 eventType, 359 entityType, 360 entityId, 361 repoId, 362 issueId, 363 pullId, 364 ) 365} 366 367func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 368 // Get repo details 369 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 370 if err != nil { 371 log.Printf("NewPullState: failed to get repos: %v", err) 372 return 373 } 374 375 // build up the recipients list: 376 // - repo owner 377 // - all pull participants 378 var recipients []syntax.DID 379 recipients = append(recipients, syntax.DID(repo.Did)) 380 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 381 if err != nil { 382 log.Printf("failed to fetch collaborators: %v", err) 383 return 384 } 385 for _, c := range collaborators { 386 recipients = append(recipients, c.SubjectDid) 387 } 388 for _, p := range pull.Participants() { 389 recipients = append(recipients, syntax.DID(p)) 390 } 391 392 entityType := "pull" 393 entityId := pull.AtUri().String() 394 repoId := &repo.Id 395 var issueId *int64 396 var eventType models.NotificationType 397 switch pull.State { 398 case models.PullClosed: 399 eventType = models.NotificationTypePullClosed 400 case models.PullOpen: 401 eventType = models.NotificationTypePullReopen 402 case models.PullMerged: 403 eventType = models.NotificationTypePullMerged 404 default: 405 log.Println("NewPullState: unexpected new PR state:", pull.State) 406 return 407 } 408 p := int64(pull.ID) 409 pullId := &p 410 411 n.notifyEvent( 412 actor, 413 recipients, 414 eventType, 415 entityType, 416 entityId, 417 repoId, 418 issueId, 419 pullId, 420 ) 421} 422 423func (n *databaseNotifier) notifyEvent( 424 actorDid syntax.DID, 425 recipients []syntax.DID, 426 eventType models.NotificationType, 427 entityType string, 428 entityId string, 429 repoId *int64, 430 issueId *int64, 431 pullId *int64, 432) { 433 if eventType == models.NotificationTypeUserMentioned && len(recipients) > maxMentions { 434 recipients = recipients[:maxMentions] 435 } 436 recipientSet := make(map[syntax.DID]struct{}) 437 for _, did := range recipients { 438 // everybody except actor themselves 439 if did != actorDid { 440 recipientSet[did] = struct{}{} 441 } 442 } 443 444 prefMap, err := db.GetNotificationPreferences( 445 n.db, 446 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 447 ) 448 if err != nil { 449 // failed to get prefs for users 450 return 451 } 452 453 // create a transaction for bulk notification storage 454 tx, err := n.db.Begin() 455 if err != nil { 456 // failed to start tx 457 return 458 } 459 defer tx.Rollback() 460 461 // filter based on preferences 462 for recipientDid := range recipientSet { 463 prefs, ok := prefMap[recipientDid] 464 if !ok { 465 prefs = models.DefaultNotificationPreferences(recipientDid) 466 } 467 468 // skip users who don’t want this type 469 if !prefs.ShouldNotify(eventType) { 470 continue 471 } 472 473 // create notification 474 notif := &models.Notification{ 475 RecipientDid: recipientDid.String(), 476 ActorDid: actorDid.String(), 477 Type: eventType, 478 EntityType: entityType, 479 EntityId: entityId, 480 RepoId: repoId, 481 IssueId: issueId, 482 PullId: pullId, 483 } 484 485 if err := db.CreateNotification(tx, notif); err != nil { 486 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 487 } 488 } 489 490 if err := tx.Commit(); err != nil { 491 // failed to commit 492 return 493 } 494}