forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14) 15 16type databaseNotifier struct { 17 db *db.DB 18 res *idresolver.Resolver 19} 20 21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 22 return &databaseNotifier{ 23 db: database, 24 res: resolver, 25 } 26} 27 28var _ notify.Notifier = &databaseNotifier{} 29 30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 31 // no-op for now 32} 33 34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 35 var err error 36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt))) 37 if err != nil { 38 log.Printf("NewStar: failed to get repos: %v", err) 39 return 40 } 41 42 actorDid := syntax.DID(star.StarredByDid) 43 recipients := []syntax.DID{syntax.DID(repo.Did)} 44 eventType := models.NotificationTypeRepoStarred 45 entityType := "repo" 46 entityId := star.RepoAt.String() 47 repoId := &repo.Id 48 var issueId *int64 49 var pullId *int64 50 51 n.notifyEvent( 52 actorDid, 53 recipients, 54 eventType, 55 entityType, 56 entityId, 57 repoId, 58 issueId, 59 pullId, 60 ) 61} 62 63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 64 // no-op 65} 66 67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue) { 68 69 // build the recipients list 70 // - owner of the repo 71 // - collaborators in the repo 72 var recipients []syntax.DID 73 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 75 if err != nil { 76 log.Printf("failed to fetch collaborators: %v", err) 77 return 78 } 79 for _, c := range collaborators { 80 recipients = append(recipients, c.SubjectDid) 81 } 82 83 actorDid := syntax.DID(issue.Did) 84 eventType := models.NotificationTypeIssueCreated 85 entityType := "issue" 86 entityId := issue.AtUri().String() 87 repoId := &issue.Repo.Id 88 issueId := &issue.Id 89 var pullId *int64 90 91 n.notifyEvent( 92 actorDid, 93 recipients, 94 eventType, 95 entityType, 96 entityId, 97 repoId, 98 issueId, 99 pullId, 100 ) 101} 102 103func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment) { 104 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt)) 105 if err != nil { 106 log.Printf("NewIssueComment: failed to get issues: %v", err) 107 return 108 } 109 if len(issues) == 0 { 110 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 111 return 112 } 113 issue := issues[0] 114 115 var recipients []syntax.DID 116 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 117 118 if comment.IsReply() { 119 // if this comment is a reply, then notify everybody in that thread 120 parentAtUri := *comment.ReplyTo 121 allThreads := issue.CommentList() 122 123 // find the parent thread, and add all DIDs from here to the recipient list 124 for _, t := range allThreads { 125 if t.Self.AtUri().String() == parentAtUri { 126 recipients = append(recipients, t.Participants()...) 127 } 128 } 129 } else { 130 // not a reply, notify just the issue author 131 recipients = append(recipients, syntax.DID(issue.Did)) 132 } 133 134 actorDid := syntax.DID(comment.Did) 135 eventType := models.NotificationTypeIssueCommented 136 entityType := "issue" 137 entityId := issue.AtUri().String() 138 repoId := &issue.Repo.Id 139 issueId := &issue.Id 140 var pullId *int64 141 142 n.notifyEvent( 143 actorDid, 144 recipients, 145 eventType, 146 entityType, 147 entityId, 148 repoId, 149 issueId, 150 pullId, 151 ) 152} 153 154func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 155 // no-op for now 156} 157 158func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 159 actorDid := syntax.DID(follow.UserDid) 160 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 161 eventType := models.NotificationTypeFollowed 162 entityType := "follow" 163 entityId := follow.UserDid 164 var repoId, issueId, pullId *int64 165 166 n.notifyEvent( 167 actorDid, 168 recipients, 169 eventType, 170 entityType, 171 entityId, 172 repoId, 173 issueId, 174 pullId, 175 ) 176} 177 178func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 179 // no-op 180} 181 182func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 183 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 184 if err != nil { 185 log.Printf("NewPull: failed to get repos: %v", err) 186 return 187 } 188 189 // build the recipients list 190 // - owner of the repo 191 // - collaborators in the repo 192 var recipients []syntax.DID 193 recipients = append(recipients, syntax.DID(repo.Did)) 194 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 195 if err != nil { 196 log.Printf("failed to fetch collaborators: %v", err) 197 return 198 } 199 for _, c := range collaborators { 200 recipients = append(recipients, c.SubjectDid) 201 } 202 203 actorDid := syntax.DID(pull.OwnerDid) 204 eventType := models.NotificationTypePullCreated 205 entityType := "pull" 206 entityId := pull.PullAt().String() 207 repoId := &repo.Id 208 var issueId *int64 209 p := int64(pull.ID) 210 pullId := &p 211 212 n.notifyEvent( 213 actorDid, 214 recipients, 215 eventType, 216 entityType, 217 entityId, 218 repoId, 219 issueId, 220 pullId, 221 ) 222} 223 224func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) { 225 pull, err := db.GetPull(n.db, 226 syntax.ATURI(comment.RepoAt), 227 comment.PullId, 228 ) 229 if err != nil { 230 log.Printf("NewPullComment: failed to get pulls: %v", err) 231 return 232 } 233 234 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt)) 235 if err != nil { 236 log.Printf("NewPullComment: failed to get repos: %v", err) 237 return 238 } 239 240 // build up the recipients list: 241 // - repo owner 242 // - all pull participants 243 var recipients []syntax.DID 244 recipients = append(recipients, syntax.DID(repo.Did)) 245 for _, p := range pull.Participants() { 246 recipients = append(recipients, syntax.DID(p)) 247 } 248 249 actorDid := syntax.DID(comment.OwnerDid) 250 eventType := models.NotificationTypePullCommented 251 entityType := "pull" 252 entityId := pull.PullAt().String() 253 repoId := &repo.Id 254 var issueId *int64 255 p := int64(pull.ID) 256 pullId := &p 257 258 n.notifyEvent( 259 actorDid, 260 recipients, 261 eventType, 262 entityType, 263 entityId, 264 repoId, 265 issueId, 266 pullId, 267 ) 268} 269 270func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 271 // no-op 272} 273 274func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 275 // no-op 276} 277 278func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 279 // no-op 280} 281 282func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 283 // no-op 284} 285 286func (n *databaseNotifier) NewIssueState(ctx context.Context, issue *models.Issue) { 287 // build up the recipients list: 288 // - repo owner 289 // - repo collaborators 290 // - all issue participants 291 var recipients []syntax.DID 292 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 293 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 294 if err != nil { 295 log.Printf("failed to fetch collaborators: %v", err) 296 return 297 } 298 for _, c := range collaborators { 299 recipients = append(recipients, c.SubjectDid) 300 } 301 for _, p := range issue.Participants() { 302 recipients = append(recipients, syntax.DID(p)) 303 } 304 305 actorDid := syntax.DID(issue.Repo.Did) 306 entityType := "pull" 307 entityId := issue.AtUri().String() 308 repoId := &issue.Repo.Id 309 issueId := &issue.Id 310 var pullId *int64 311 var eventType models.NotificationType 312 313 if issue.Open { 314 eventType = models.NotificationTypeIssueReopen 315 } else { 316 eventType = models.NotificationTypeIssueClosed 317 } 318 319 n.notifyEvent( 320 actorDid, 321 recipients, 322 eventType, 323 entityType, 324 entityId, 325 repoId, 326 issueId, 327 pullId, 328 ) 329} 330 331func (n *databaseNotifier) NewPullState(ctx context.Context, pull *models.Pull) { 332 // Get repo details 333 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 334 if err != nil { 335 log.Printf("NewPullState: failed to get repos: %v", err) 336 return 337 } 338 339 // build up the recipients list: 340 // - repo owner 341 // - all pull participants 342 var recipients []syntax.DID 343 recipients = append(recipients, syntax.DID(repo.Did)) 344 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 345 if err != nil { 346 log.Printf("failed to fetch collaborators: %v", err) 347 return 348 } 349 for _, c := range collaborators { 350 recipients = append(recipients, c.SubjectDid) 351 } 352 for _, p := range pull.Participants() { 353 recipients = append(recipients, syntax.DID(p)) 354 } 355 356 actorDid := syntax.DID(repo.Did) 357 entityType := "pull" 358 entityId := pull.PullAt().String() 359 repoId := &repo.Id 360 var issueId *int64 361 var eventType models.NotificationType 362 switch pull.State { 363 case models.PullClosed: 364 eventType = models.NotificationTypePullClosed 365 case models.PullOpen: 366 eventType = models.NotificationTypePullReopen 367 case models.PullMerged: 368 eventType = models.NotificationTypePullMerged 369 default: 370 log.Println("NewPullState: unexpected new PR state:", pull.State) 371 return 372 } 373 p := int64(pull.ID) 374 pullId := &p 375 376 n.notifyEvent( 377 actorDid, 378 recipients, 379 eventType, 380 entityType, 381 entityId, 382 repoId, 383 issueId, 384 pullId, 385 ) 386} 387 388func (n *databaseNotifier) notifyEvent( 389 actorDid syntax.DID, 390 recipients []syntax.DID, 391 eventType models.NotificationType, 392 entityType string, 393 entityId string, 394 repoId *int64, 395 issueId *int64, 396 pullId *int64, 397) { 398 recipientSet := make(map[syntax.DID]struct{}) 399 for _, did := range recipients { 400 // everybody except actor themselves 401 if did != actorDid { 402 recipientSet[did] = struct{}{} 403 } 404 } 405 406 prefMap, err := db.GetNotificationPreferences( 407 n.db, 408 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 409 ) 410 if err != nil { 411 // failed to get prefs for users 412 return 413 } 414 415 // create a transaction for bulk notification storage 416 tx, err := n.db.Begin() 417 if err != nil { 418 // failed to start tx 419 return 420 } 421 defer tx.Rollback() 422 423 // filter based on preferences 424 for recipientDid := range recipientSet { 425 prefs, ok := prefMap[recipientDid] 426 if !ok { 427 prefs = models.DefaultNotificationPreferences(recipientDid) 428 } 429 430 // skip users who don’t want this type 431 if !prefs.ShouldNotify(eventType) { 432 continue 433 } 434 435 // create notification 436 notif := &models.Notification{ 437 RecipientDid: recipientDid.String(), 438 ActorDid: actorDid.String(), 439 Type: eventType, 440 EntityType: entityType, 441 EntityId: entityId, 442 RepoId: repoId, 443 IssueId: issueId, 444 PullId: pullId, 445 } 446 447 if err := db.CreateNotification(tx, notif); err != nil { 448 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 449 } 450 } 451 452 if err := tx.Commit(); err != nil { 453 // failed to commit 454 return 455 } 456}