forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14) 15 16const ( 17 maxMentions = 5 18) 19 20type databaseNotifier struct { 21 db *db.DB 22 res *idresolver.Resolver 23} 24 25func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 26 return &databaseNotifier{ 27 db: database, 28 res: resolver, 29 } 30} 31 32var _ notify.Notifier = &databaseNotifier{} 33 34func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 35 // no-op for now 36} 37 38func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 39 var err error 40 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt))) 41 if err != nil { 42 log.Printf("NewStar: failed to get repos: %v", err) 43 return 44 } 45 46 actorDid := syntax.DID(star.StarredByDid) 47 recipients := []syntax.DID{syntax.DID(repo.Did)} 48 eventType := models.NotificationTypeRepoStarred 49 entityType := "repo" 50 entityId := star.RepoAt.String() 51 repoId := &repo.Id 52 var issueId *int64 53 var pullId *int64 54 55 n.notifyEvent( 56 actorDid, 57 recipients, 58 eventType, 59 entityType, 60 entityId, 61 repoId, 62 issueId, 63 pullId, 64 ) 65} 66 67func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 68 // no-op 69} 70 71func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 72 73 // build the recipients list 74 // - owner of the repo 75 // - collaborators in the repo 76 var recipients []syntax.DID 77 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 78 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 79 if err != nil { 80 log.Printf("failed to fetch collaborators: %v", err) 81 return 82 } 83 for _, c := range collaborators { 84 recipients = append(recipients, c.SubjectDid) 85 } 86 87 actorDid := syntax.DID(issue.Did) 88 entityType := "issue" 89 entityId := issue.AtUri().String() 90 repoId := &issue.Repo.Id 91 issueId := &issue.Id 92 var pullId *int64 93 94 n.notifyEvent( 95 actorDid, 96 recipients, 97 models.NotificationTypeIssueCreated, 98 entityType, 99 entityId, 100 repoId, 101 issueId, 102 pullId, 103 ) 104 n.notifyEvent( 105 actorDid, 106 mentions, 107 models.NotificationTypeUserMentioned, 108 entityType, 109 entityId, 110 repoId, 111 issueId, 112 pullId, 113 ) 114} 115 116func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 117 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt)) 118 if err != nil { 119 log.Printf("NewIssueComment: failed to get issues: %v", err) 120 return 121 } 122 if len(issues) == 0 { 123 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 124 return 125 } 126 issue := issues[0] 127 128 var recipients []syntax.DID 129 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 130 131 if comment.IsReply() { 132 // if this comment is a reply, then notify everybody in that thread 133 parentAtUri := *comment.ReplyTo 134 allThreads := issue.CommentList() 135 136 // find the parent thread, and add all DIDs from here to the recipient list 137 for _, t := range allThreads { 138 if t.Self.AtUri().String() == parentAtUri { 139 recipients = append(recipients, t.Participants()...) 140 } 141 } 142 } else { 143 // not a reply, notify just the issue author 144 recipients = append(recipients, syntax.DID(issue.Did)) 145 } 146 147 actorDid := syntax.DID(comment.Did) 148 entityType := "issue" 149 entityId := issue.AtUri().String() 150 repoId := &issue.Repo.Id 151 issueId := &issue.Id 152 var pullId *int64 153 154 n.notifyEvent( 155 actorDid, 156 recipients, 157 models.NotificationTypeIssueCommented, 158 entityType, 159 entityId, 160 repoId, 161 issueId, 162 pullId, 163 ) 164 n.notifyEvent( 165 actorDid, 166 mentions, 167 models.NotificationTypeUserMentioned, 168 entityType, 169 entityId, 170 repoId, 171 issueId, 172 pullId, 173 ) 174} 175 176func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 177 // no-op for now 178} 179 180func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 181 actorDid := syntax.DID(follow.UserDid) 182 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 183 eventType := models.NotificationTypeFollowed 184 entityType := "follow" 185 entityId := follow.UserDid 186 var repoId, issueId, pullId *int64 187 188 n.notifyEvent( 189 actorDid, 190 recipients, 191 eventType, 192 entityType, 193 entityId, 194 repoId, 195 issueId, 196 pullId, 197 ) 198} 199 200func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 201 // no-op 202} 203 204func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 205 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 206 if err != nil { 207 log.Printf("NewPull: failed to get repos: %v", err) 208 return 209 } 210 211 // build the recipients list 212 // - owner of the repo 213 // - collaborators in the repo 214 var recipients []syntax.DID 215 recipients = append(recipients, syntax.DID(repo.Did)) 216 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 217 if err != nil { 218 log.Printf("failed to fetch collaborators: %v", err) 219 return 220 } 221 for _, c := range collaborators { 222 recipients = append(recipients, c.SubjectDid) 223 } 224 225 actorDid := syntax.DID(pull.OwnerDid) 226 eventType := models.NotificationTypePullCreated 227 entityType := "pull" 228 entityId := pull.AtUri().String() 229 repoId := &repo.Id 230 var issueId *int64 231 p := int64(pull.ID) 232 pullId := &p 233 234 n.notifyEvent( 235 actorDid, 236 recipients, 237 eventType, 238 entityType, 239 entityId, 240 repoId, 241 issueId, 242 pullId, 243 ) 244} 245 246func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 247 pull, err := db.GetPull(n.db, 248 syntax.ATURI(comment.RepoAt), 249 comment.PullId, 250 ) 251 if err != nil { 252 log.Printf("NewPullComment: failed to get pulls: %v", err) 253 return 254 } 255 256 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt)) 257 if err != nil { 258 log.Printf("NewPullComment: failed to get repos: %v", err) 259 return 260 } 261 262 // build up the recipients list: 263 // - repo owner 264 // - all pull participants 265 var recipients []syntax.DID 266 recipients = append(recipients, syntax.DID(repo.Did)) 267 for _, p := range pull.Participants() { 268 recipients = append(recipients, syntax.DID(p)) 269 } 270 271 actorDid := syntax.DID(comment.OwnerDid) 272 eventType := models.NotificationTypePullCommented 273 entityType := "pull" 274 entityId := pull.AtUri().String() 275 repoId := &repo.Id 276 var issueId *int64 277 p := int64(pull.ID) 278 pullId := &p 279 280 n.notifyEvent( 281 actorDid, 282 recipients, 283 eventType, 284 entityType, 285 entityId, 286 repoId, 287 issueId, 288 pullId, 289 ) 290 n.notifyEvent( 291 actorDid, 292 mentions, 293 models.NotificationTypeUserMentioned, 294 entityType, 295 entityId, 296 repoId, 297 issueId, 298 pullId, 299 ) 300} 301 302func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 303 // no-op 304} 305 306func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 307 // no-op 308} 309 310func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 311 // no-op 312} 313 314func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 315 // no-op 316} 317 318func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 319 // build up the recipients list: 320 // - repo owner 321 // - repo collaborators 322 // - all issue participants 323 var recipients []syntax.DID 324 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 325 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 326 if err != nil { 327 log.Printf("failed to fetch collaborators: %v", err) 328 return 329 } 330 for _, c := range collaborators { 331 recipients = append(recipients, c.SubjectDid) 332 } 333 for _, p := range issue.Participants() { 334 recipients = append(recipients, syntax.DID(p)) 335 } 336 337 entityType := "pull" 338 entityId := issue.AtUri().String() 339 repoId := &issue.Repo.Id 340 issueId := &issue.Id 341 var pullId *int64 342 var eventType models.NotificationType 343 344 if issue.Open { 345 eventType = models.NotificationTypeIssueReopen 346 } else { 347 eventType = models.NotificationTypeIssueClosed 348 } 349 350 n.notifyEvent( 351 actor, 352 recipients, 353 eventType, 354 entityType, 355 entityId, 356 repoId, 357 issueId, 358 pullId, 359 ) 360} 361 362func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 363 // Get repo details 364 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 365 if err != nil { 366 log.Printf("NewPullState: failed to get repos: %v", err) 367 return 368 } 369 370 // build up the recipients list: 371 // - repo owner 372 // - all pull participants 373 var recipients []syntax.DID 374 recipients = append(recipients, syntax.DID(repo.Did)) 375 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 376 if err != nil { 377 log.Printf("failed to fetch collaborators: %v", err) 378 return 379 } 380 for _, c := range collaborators { 381 recipients = append(recipients, c.SubjectDid) 382 } 383 for _, p := range pull.Participants() { 384 recipients = append(recipients, syntax.DID(p)) 385 } 386 387 entityType := "pull" 388 entityId := pull.AtUri().String() 389 repoId := &repo.Id 390 var issueId *int64 391 var eventType models.NotificationType 392 switch pull.State { 393 case models.PullClosed: 394 eventType = models.NotificationTypePullClosed 395 case models.PullOpen: 396 eventType = models.NotificationTypePullReopen 397 case models.PullMerged: 398 eventType = models.NotificationTypePullMerged 399 default: 400 log.Println("NewPullState: unexpected new PR state:", pull.State) 401 return 402 } 403 p := int64(pull.ID) 404 pullId := &p 405 406 n.notifyEvent( 407 actor, 408 recipients, 409 eventType, 410 entityType, 411 entityId, 412 repoId, 413 issueId, 414 pullId, 415 ) 416} 417 418func (n *databaseNotifier) notifyEvent( 419 actorDid syntax.DID, 420 recipients []syntax.DID, 421 eventType models.NotificationType, 422 entityType string, 423 entityId string, 424 repoId *int64, 425 issueId *int64, 426 pullId *int64, 427) { 428 if eventType == models.NotificationTypeUserMentioned && len(recipients) > maxMentions { 429 recipients = recipients[:maxMentions] 430 } 431 recipientSet := make(map[syntax.DID]struct{}) 432 for _, did := range recipients { 433 // everybody except actor themselves 434 if did != actorDid { 435 recipientSet[did] = struct{}{} 436 } 437 } 438 439 prefMap, err := db.GetNotificationPreferences( 440 n.db, 441 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 442 ) 443 if err != nil { 444 // failed to get prefs for users 445 return 446 } 447 448 // create a transaction for bulk notification storage 449 tx, err := n.db.Begin() 450 if err != nil { 451 // failed to start tx 452 return 453 } 454 defer tx.Rollback() 455 456 // filter based on preferences 457 for recipientDid := range recipientSet { 458 prefs, ok := prefMap[recipientDid] 459 if !ok { 460 prefs = models.DefaultNotificationPreferences(recipientDid) 461 } 462 463 // skip users who don’t want this type 464 if !prefs.ShouldNotify(eventType) { 465 continue 466 } 467 468 // create notification 469 notif := &models.Notification{ 470 RecipientDid: recipientDid.String(), 471 ActorDid: actorDid.String(), 472 Type: eventType, 473 EntityType: entityType, 474 EntityId: entityId, 475 RepoId: repoId, 476 IssueId: issueId, 477 PullId: pullId, 478 } 479 480 if err := db.CreateNotification(tx, notif); err != nil { 481 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 482 } 483 } 484 485 if err := tx.Commit(); err != nil { 486 // failed to commit 487 return 488 } 489}