forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/appview/pagination" 14) 15 16func CreateNotification(e Execer, notification *models.Notification) error { 17 query := ` 18 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id) 19 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 20 ` 21 22 result, err := e.Exec(query, 23 notification.RecipientDid, 24 notification.ActorDid, 25 string(notification.Type), 26 notification.EntityType, 27 notification.EntityId, 28 notification.Read, 29 notification.RepoId, 30 notification.IssueId, 31 notification.PullId, 32 ) 33 if err != nil { 34 return fmt.Errorf("failed to create notification: %w", err) 35 } 36 37 id, err := result.LastInsertId() 38 if err != nil { 39 return fmt.Errorf("failed to get notification ID: %w", err) 40 } 41 42 notification.ID = id 43 return nil 44} 45 46// GetNotificationsPaginated retrieves notifications with filters and pagination 47func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) { 48 var conditions []string 49 var args []any 50 51 for _, filter := range filters { 52 conditions = append(conditions, filter.Condition()) 53 args = append(args, filter.Arg()...) 54 } 55 56 whereClause := "" 57 if len(conditions) > 0 { 58 whereClause = "WHERE " + conditions[0] 59 for _, condition := range conditions[1:] { 60 whereClause += " AND " + condition 61 } 62 } 63 64 query := fmt.Sprintf(` 65 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id 66 from notifications 67 %s 68 order by created desc 69 limit ? offset ? 70 `, whereClause) 71 72 args = append(args, page.Limit, page.Offset) 73 74 rows, err := e.QueryContext(context.Background(), query, args...) 75 if err != nil { 76 return nil, fmt.Errorf("failed to query notifications: %w", err) 77 } 78 defer rows.Close() 79 80 var notifications []*models.Notification 81 for rows.Next() { 82 var n models.Notification 83 var typeStr string 84 var createdStr string 85 err := rows.Scan( 86 &n.ID, 87 &n.RecipientDid, 88 &n.ActorDid, 89 &typeStr, 90 &n.EntityType, 91 &n.EntityId, 92 &n.Read, 93 &createdStr, 94 &n.RepoId, 95 &n.IssueId, 96 &n.PullId, 97 ) 98 if err != nil { 99 return nil, fmt.Errorf("failed to scan notification: %w", err) 100 } 101 n.Type = models.NotificationType(typeStr) 102 n.Created, err = time.Parse(time.RFC3339, createdStr) 103 if err != nil { 104 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 105 } 106 notifications = append(notifications, &n) 107 } 108 109 return notifications, nil 110} 111 112// GetNotificationsWithEntities retrieves notifications with their related entities 113func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) { 114 var conditions []string 115 var args []any 116 117 for _, filter := range filters { 118 conditions = append(conditions, filter.Condition()) 119 args = append(args, filter.Arg()...) 120 } 121 122 whereClause := "" 123 if len(conditions) > 0 { 124 whereClause = "WHERE " + conditions[0] 125 for _, condition := range conditions[1:] { 126 whereClause += " AND " + condition 127 } 128 } 129 130 query := fmt.Sprintf(` 131 select 132 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id, 133 n.read, n.created, n.repo_id, n.issue_id, n.pull_id, 134 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description, 135 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open, 136 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state 137 from notifications n 138 left join repos r on n.repo_id = r.id 139 left join issues i on n.issue_id = i.id 140 left join pulls p on n.pull_id = p.id 141 %s 142 order by n.created desc 143 limit ? offset ? 144 `, whereClause) 145 146 args = append(args, page.Limit, page.Offset) 147 148 rows, err := e.QueryContext(context.Background(), query, args...) 149 if err != nil { 150 return nil, fmt.Errorf("failed to query notifications with entities: %w", err) 151 } 152 defer rows.Close() 153 154 var notifications []*models.NotificationWithEntity 155 for rows.Next() { 156 var n models.Notification 157 var typeStr string 158 var createdStr string 159 var repo models.Repo 160 var issue models.Issue 161 var pull models.Pull 162 var rId, iId, pId sql.NullInt64 163 var rDid, rName, rDescription sql.NullString 164 var iDid sql.NullString 165 var iIssueId sql.NullInt64 166 var iTitle sql.NullString 167 var iOpen sql.NullBool 168 var pOwnerDid sql.NullString 169 var pPullId sql.NullInt64 170 var pTitle sql.NullString 171 var pState sql.NullInt64 172 173 err := rows.Scan( 174 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId, 175 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId, 176 &rId, &rDid, &rName, &rDescription, 177 &iId, &iDid, &iIssueId, &iTitle, &iOpen, 178 &pId, &pOwnerDid, &pPullId, &pTitle, &pState, 179 ) 180 if err != nil { 181 return nil, fmt.Errorf("failed to scan notification with entities: %w", err) 182 } 183 184 n.Type = models.NotificationType(typeStr) 185 n.Created, err = time.Parse(time.RFC3339, createdStr) 186 if err != nil { 187 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 188 } 189 190 nwe := &models.NotificationWithEntity{Notification: &n} 191 192 // populate repo if present 193 if rId.Valid { 194 repo.Id = rId.Int64 195 if rDid.Valid { 196 repo.Did = rDid.String 197 } 198 if rName.Valid { 199 repo.Name = rName.String 200 } 201 if rDescription.Valid { 202 repo.Description = rDescription.String 203 } 204 nwe.Repo = &repo 205 } 206 207 // populate issue if present 208 if iId.Valid { 209 issue.Id = iId.Int64 210 if iDid.Valid { 211 issue.Did = iDid.String 212 } 213 if iIssueId.Valid { 214 issue.IssueId = int(iIssueId.Int64) 215 } 216 if iTitle.Valid { 217 issue.Title = iTitle.String 218 } 219 if iOpen.Valid { 220 issue.Open = iOpen.Bool 221 } 222 nwe.Issue = &issue 223 } 224 225 // populate pull if present 226 if pId.Valid { 227 pull.ID = int(pId.Int64) 228 if pOwnerDid.Valid { 229 pull.OwnerDid = pOwnerDid.String 230 } 231 if pPullId.Valid { 232 pull.PullId = int(pPullId.Int64) 233 } 234 if pTitle.Valid { 235 pull.Title = pTitle.String 236 } 237 if pState.Valid { 238 pull.State = models.PullState(pState.Int64) 239 } 240 nwe.Pull = &pull 241 } 242 243 notifications = append(notifications, nwe) 244 } 245 246 return notifications, nil 247} 248 249// GetNotifications retrieves notifications with filters 250func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) { 251 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...) 252} 253 254func CountNotifications(e Execer, filters ...filter) (int64, error) { 255 var conditions []string 256 var args []any 257 for _, filter := range filters { 258 conditions = append(conditions, filter.Condition()) 259 args = append(args, filter.Arg()...) 260 } 261 262 whereClause := "" 263 if conditions != nil { 264 whereClause = " where " + strings.Join(conditions, " and ") 265 } 266 267 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause) 268 var count int64 269 err := e.QueryRow(query, args...).Scan(&count) 270 271 if !errors.Is(err, sql.ErrNoRows) && err != nil { 272 return 0, err 273 } 274 275 return count, nil 276} 277 278func MarkNotificationRead(e Execer, notificationID int64, userDID string) error { 279 idFilter := FilterEq("id", notificationID) 280 recipientFilter := FilterEq("recipient_did", userDID) 281 282 query := fmt.Sprintf(` 283 UPDATE notifications 284 SET read = 1 285 WHERE %s AND %s 286 `, idFilter.Condition(), recipientFilter.Condition()) 287 288 args := append(idFilter.Arg(), recipientFilter.Arg()...) 289 290 result, err := e.Exec(query, args...) 291 if err != nil { 292 return fmt.Errorf("failed to mark notification as read: %w", err) 293 } 294 295 rowsAffected, err := result.RowsAffected() 296 if err != nil { 297 return fmt.Errorf("failed to get rows affected: %w", err) 298 } 299 300 if rowsAffected == 0 { 301 return fmt.Errorf("notification not found or access denied") 302 } 303 304 return nil 305} 306 307func MarkAllNotificationsRead(e Execer, userDID string) error { 308 recipientFilter := FilterEq("recipient_did", userDID) 309 readFilter := FilterEq("read", 0) 310 311 query := fmt.Sprintf(` 312 UPDATE notifications 313 SET read = 1 314 WHERE %s AND %s 315 `, recipientFilter.Condition(), readFilter.Condition()) 316 317 args := append(recipientFilter.Arg(), readFilter.Arg()...) 318 319 _, err := e.Exec(query, args...) 320 if err != nil { 321 return fmt.Errorf("failed to mark all notifications as read: %w", err) 322 } 323 324 return nil 325} 326 327func DeleteNotification(e Execer, notificationID int64, userDID string) error { 328 idFilter := FilterEq("id", notificationID) 329 recipientFilter := FilterEq("recipient_did", userDID) 330 331 query := fmt.Sprintf(` 332 DELETE FROM notifications 333 WHERE %s AND %s 334 `, idFilter.Condition(), recipientFilter.Condition()) 335 336 args := append(idFilter.Arg(), recipientFilter.Arg()...) 337 338 result, err := e.Exec(query, args...) 339 if err != nil { 340 return fmt.Errorf("failed to delete notification: %w", err) 341 } 342 343 rowsAffected, err := result.RowsAffected() 344 if err != nil { 345 return fmt.Errorf("failed to get rows affected: %w", err) 346 } 347 348 if rowsAffected == 0 { 349 return fmt.Errorf("notification not found or access denied") 350 } 351 352 return nil 353} 354 355func GetNotificationPreference(e Execer, userDid string) (*models.NotificationPreferences, error) { 356 prefs, err := GetNotificationPreferences(e, FilterEq("user_did", userDid)) 357 if err != nil { 358 return nil, err 359 } 360 361 p, ok := prefs[syntax.DID(userDid)] 362 if !ok { 363 return models.DefaultNotificationPreferences(syntax.DID(userDid)), nil 364 } 365 366 return p, nil 367} 368 369func GetNotificationPreferences(e Execer, filters ...filter) (map[syntax.DID]*models.NotificationPreferences, error) { 370 prefsMap := make(map[syntax.DID]*models.NotificationPreferences) 371 372 var conditions []string 373 var args []any 374 for _, filter := range filters { 375 conditions = append(conditions, filter.Condition()) 376 args = append(args, filter.Arg()...) 377 } 378 379 whereClause := "" 380 if conditions != nil { 381 whereClause = " where " + strings.Join(conditions, " and ") 382 } 383 384 query := fmt.Sprintf(` 385 select 386 id, 387 user_did, 388 repo_starred, 389 issue_created, 390 issue_commented, 391 pull_created, 392 pull_commented, 393 followed, 394 pull_merged, 395 issue_closed, 396 email_notifications 397 from 398 notification_preferences 399 %s 400 `, whereClause) 401 402 rows, err := e.Query(query, args...) 403 if err != nil { 404 return nil, err 405 } 406 defer rows.Close() 407 408 for rows.Next() { 409 var prefs models.NotificationPreferences 410 if err := rows.Scan( 411 &prefs.ID, 412 &prefs.UserDid, 413 &prefs.RepoStarred, 414 &prefs.IssueCreated, 415 &prefs.IssueCommented, 416 &prefs.PullCreated, 417 &prefs.PullCommented, 418 &prefs.Followed, 419 &prefs.PullMerged, 420 &prefs.IssueClosed, 421 &prefs.EmailNotifications, 422 ); err != nil { 423 return nil, err 424 } 425 426 prefsMap[prefs.UserDid] = &prefs 427 } 428 429 if err := rows.Err(); err != nil { 430 return nil, err 431 } 432 433 return prefsMap, nil 434} 435 436func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error { 437 query := ` 438 INSERT OR REPLACE INTO notification_preferences 439 (user_did, repo_starred, issue_created, issue_commented, pull_created, 440 pull_commented, followed, pull_merged, issue_closed, email_notifications) 441 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 442 ` 443 444 result, err := d.DB.ExecContext(ctx, query, 445 prefs.UserDid, 446 prefs.RepoStarred, 447 prefs.IssueCreated, 448 prefs.IssueCommented, 449 prefs.PullCreated, 450 prefs.PullCommented, 451 prefs.Followed, 452 prefs.PullMerged, 453 prefs.IssueClosed, 454 prefs.EmailNotifications, 455 ) 456 if err != nil { 457 return fmt.Errorf("failed to update notification preferences: %w", err) 458 } 459 460 if prefs.ID == 0 { 461 id, err := result.LastInsertId() 462 if err != nil { 463 return fmt.Errorf("failed to get preferences ID: %w", err) 464 } 465 prefs.ID = id 466 } 467 468 return nil 469} 470 471func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error { 472 cutoff := time.Now().Add(-olderThan) 473 createdFilter := FilterLte("created", cutoff) 474 475 query := fmt.Sprintf(` 476 DELETE FROM notifications 477 WHERE %s 478 `, createdFilter.Condition()) 479 480 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...) 481 if err != nil { 482 return fmt.Errorf("failed to cleanup old notifications: %w", err) 483 } 484 485 return nil 486}