forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 12 kB view raw
1package db 2 3import ( 4 "context" 5 "log" 6 "slices" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "tangled.org/core/api/tangled" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39 40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 41 if star.RepoAt.Collection().String() != tangled.RepoNSID { 42 // skip string stars for now 43 return 44 } 45 var err error 46 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 47 if err != nil { 48 log.Printf("NewStar: failed to get repos: %v", err) 49 return 50 } 51 52 actorDid := syntax.DID(star.Did) 53 recipients := sets.Singleton(syntax.DID(repo.Did)) 54 eventType := models.NotificationTypeRepoStarred 55 entityType := "repo" 56 entityId := star.RepoAt.String() 57 repoId := &repo.Id 58 var issueId *int64 59 var pullId *int64 60 61 n.notifyEvent( 62 actorDid, 63 recipients, 64 eventType, 65 entityType, 66 entityId, 67 repoId, 68 issueId, 69 pullId, 70 ) 71} 72 73func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 74 // no-op 75} 76 77func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 78 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 79 if err != nil { 80 log.Printf("failed to fetch collaborators: %v", err) 81 return 82 } 83 84 // build the recipients list 85 // - owner of the repo 86 // - collaborators in the repo 87 // - remove users already mentioned 88 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 89 for _, c := range collaborators { 90 recipients.Insert(c.SubjectDid) 91 } 92 for _, m := range mentions { 93 recipients.Remove(m) 94 } 95 96 actorDid := syntax.DID(issue.Did) 97 entityType := "issue" 98 entityId := issue.AtUri().String() 99 repoId := &issue.Repo.Id 100 issueId := &issue.Id 101 var pullId *int64 102 103 n.notifyEvent( 104 actorDid, 105 recipients, 106 models.NotificationTypeIssueCreated, 107 entityType, 108 entityId, 109 repoId, 110 issueId, 111 pullId, 112 ) 113 n.notifyEvent( 114 actorDid, 115 sets.Collect(slices.Values(mentions)), 116 models.NotificationTypeUserMentioned, 117 entityType, 118 entityId, 119 repoId, 120 issueId, 121 pullId, 122 ) 123} 124 125func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 126 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt)) 127 if err != nil { 128 log.Printf("NewIssueComment: failed to get issues: %v", err) 129 return 130 } 131 if len(issues) == 0 { 132 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 133 return 134 } 135 issue := issues[0] 136 137 // built the recipients list: 138 // - the owner of the repo 139 // - | if the comment is a reply -> everybody on that thread 140 // | if the comment is a top level -> just the issue owner 141 // - remove mentioned users from the recipients list 142 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 143 144 if comment.IsReply() { 145 // if this comment is a reply, then notify everybody in that thread 146 parentAtUri := *comment.ReplyTo 147 148 // find the parent thread, and add all DIDs from here to the recipient list 149 for _, t := range issue.CommentList() { 150 if t.Self.AtUri().String() == parentAtUri { 151 for _, p := range t.Participants() { 152 recipients.Insert(p) 153 } 154 } 155 } 156 } else { 157 // not a reply, notify just the issue author 158 recipients.Insert(syntax.DID(issue.Did)) 159 } 160 161 for _, m := range mentions { 162 recipients.Remove(m) 163 } 164 165 actorDid := syntax.DID(comment.Did) 166 entityType := "issue" 167 entityId := issue.AtUri().String() 168 repoId := &issue.Repo.Id 169 issueId := &issue.Id 170 var pullId *int64 171 172 n.notifyEvent( 173 actorDid, 174 recipients, 175 models.NotificationTypeIssueCommented, 176 entityType, 177 entityId, 178 repoId, 179 issueId, 180 pullId, 181 ) 182 n.notifyEvent( 183 actorDid, 184 sets.Collect(slices.Values(mentions)), 185 models.NotificationTypeUserMentioned, 186 entityType, 187 entityId, 188 repoId, 189 issueId, 190 pullId, 191 ) 192} 193 194func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 195 // no-op for now 196} 197 198func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 199 actorDid := syntax.DID(follow.UserDid) 200 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 201 eventType := models.NotificationTypeFollowed 202 entityType := "follow" 203 entityId := follow.UserDid 204 var repoId, issueId, pullId *int64 205 206 n.notifyEvent( 207 actorDid, 208 recipients, 209 eventType, 210 entityType, 211 entityId, 212 repoId, 213 issueId, 214 pullId, 215 ) 216} 217 218func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 219 // no-op 220} 221 222func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 223 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 224 if err != nil { 225 log.Printf("NewPull: failed to get repos: %v", err) 226 return 227 } 228 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 229 if err != nil { 230 log.Printf("failed to fetch collaborators: %v", err) 231 return 232 } 233 234 // build the recipients list 235 // - owner of the repo 236 // - collaborators in the repo 237 recipients := sets.Singleton(syntax.DID(repo.Did)) 238 for _, c := range collaborators { 239 recipients.Insert(c.SubjectDid) 240 } 241 242 actorDid := syntax.DID(pull.OwnerDid) 243 eventType := models.NotificationTypePullCreated 244 entityType := "pull" 245 entityId := pull.AtUri().String() 246 repoId := &repo.Id 247 var issueId *int64 248 p := int64(pull.ID) 249 pullId := &p 250 251 n.notifyEvent( 252 actorDid, 253 recipients, 254 eventType, 255 entityType, 256 entityId, 257 repoId, 258 issueId, 259 pullId, 260 ) 261} 262 263func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 264 pull, err := db.GetPull(n.db, 265 syntax.ATURI(comment.RepoAt), 266 comment.PullId, 267 ) 268 if err != nil { 269 log.Printf("NewPullComment: failed to get pulls: %v", err) 270 return 271 } 272 273 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt)) 274 if err != nil { 275 log.Printf("NewPullComment: failed to get repos: %v", err) 276 return 277 } 278 279 // build up the recipients list: 280 // - repo owner 281 // - all pull participants 282 // - remove those already mentioned 283 recipients := sets.Singleton(syntax.DID(repo.Did)) 284 for _, p := range pull.Participants() { 285 recipients.Insert(syntax.DID(p)) 286 } 287 for _, m := range mentions { 288 recipients.Remove(m) 289 } 290 291 actorDid := syntax.DID(comment.OwnerDid) 292 eventType := models.NotificationTypePullCommented 293 entityType := "pull" 294 entityId := pull.AtUri().String() 295 repoId := &repo.Id 296 var issueId *int64 297 p := int64(pull.ID) 298 pullId := &p 299 300 n.notifyEvent( 301 actorDid, 302 recipients, 303 eventType, 304 entityType, 305 entityId, 306 repoId, 307 issueId, 308 pullId, 309 ) 310 n.notifyEvent( 311 actorDid, 312 sets.Collect(slices.Values(mentions)), 313 models.NotificationTypeUserMentioned, 314 entityType, 315 entityId, 316 repoId, 317 issueId, 318 pullId, 319 ) 320} 321 322func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 323 // no-op 324} 325 326func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 327 // no-op 328} 329 330func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 331 // no-op 332} 333 334func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 335 // no-op 336} 337 338func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 339 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 340 if err != nil { 341 log.Printf("failed to fetch collaborators: %v", err) 342 return 343 } 344 345 // build up the recipients list: 346 // - repo owner 347 // - repo collaborators 348 // - all issue participants 349 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 350 for _, c := range collaborators { 351 recipients.Insert(c.SubjectDid) 352 } 353 for _, p := range issue.Participants() { 354 recipients.Insert(syntax.DID(p)) 355 } 356 357 entityType := "pull" 358 entityId := issue.AtUri().String() 359 repoId := &issue.Repo.Id 360 issueId := &issue.Id 361 var pullId *int64 362 var eventType models.NotificationType 363 364 if issue.Open { 365 eventType = models.NotificationTypeIssueReopen 366 } else { 367 eventType = models.NotificationTypeIssueClosed 368 } 369 370 n.notifyEvent( 371 actor, 372 recipients, 373 eventType, 374 entityType, 375 entityId, 376 repoId, 377 issueId, 378 pullId, 379 ) 380} 381 382func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 383 // Get repo details 384 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 385 if err != nil { 386 log.Printf("NewPullState: failed to get repos: %v", err) 387 return 388 } 389 390 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 391 if err != nil { 392 log.Printf("failed to fetch collaborators: %v", err) 393 return 394 } 395 396 // build up the recipients list: 397 // - repo owner 398 // - all pull participants 399 recipients := sets.Singleton(syntax.DID(repo.Did)) 400 for _, c := range collaborators { 401 recipients.Insert(c.SubjectDid) 402 } 403 for _, p := range pull.Participants() { 404 recipients.Insert(syntax.DID(p)) 405 } 406 407 entityType := "pull" 408 entityId := pull.AtUri().String() 409 repoId := &repo.Id 410 var issueId *int64 411 var eventType models.NotificationType 412 switch pull.State { 413 case models.PullClosed: 414 eventType = models.NotificationTypePullClosed 415 case models.PullOpen: 416 eventType = models.NotificationTypePullReopen 417 case models.PullMerged: 418 eventType = models.NotificationTypePullMerged 419 default: 420 log.Println("NewPullState: unexpected new PR state:", pull.State) 421 return 422 } 423 p := int64(pull.ID) 424 pullId := &p 425 426 n.notifyEvent( 427 actor, 428 recipients, 429 eventType, 430 entityType, 431 entityId, 432 repoId, 433 issueId, 434 pullId, 435 ) 436} 437 438func (n *databaseNotifier) notifyEvent( 439 actorDid syntax.DID, 440 recipients sets.Set[syntax.DID], 441 eventType models.NotificationType, 442 entityType string, 443 entityId string, 444 repoId *int64, 445 issueId *int64, 446 pullId *int64, 447) { 448 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 449 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 450 return 451 } 452 453 recipients.Remove(actorDid) 454 455 prefMap, err := db.GetNotificationPreferences( 456 n.db, 457 orm.FilterIn("user_did", slices.Collect(recipients.All())), 458 ) 459 if err != nil { 460 // failed to get prefs for users 461 return 462 } 463 464 // create a transaction for bulk notification storage 465 tx, err := n.db.Begin() 466 if err != nil { 467 // failed to start tx 468 return 469 } 470 defer tx.Rollback() 471 472 // filter based on preferences 473 for recipientDid := range recipients.All() { 474 prefs, ok := prefMap[recipientDid] 475 if !ok { 476 prefs = models.DefaultNotificationPreferences(recipientDid) 477 } 478 479 // skip users who don’t want this type 480 if !prefs.ShouldNotify(eventType) { 481 continue 482 } 483 484 // create notification 485 notif := &models.Notification{ 486 RecipientDid: recipientDid.String(), 487 ActorDid: actorDid.String(), 488 Type: eventType, 489 EntityType: entityType, 490 EntityId: entityId, 491 RepoId: repoId, 492 IssueId: issueId, 493 PullId: pullId, 494 } 495 496 if err := db.CreateNotification(tx, notif); err != nil { 497 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 498 } 499 } 500 501 if err := tx.Commit(); err != nil { 502 // failed to commit 503 return 504 } 505}