forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/appview/pagination" 14) 15 16func CreateNotification(e Execer, notification *models.Notification) error { 17 query := ` 18 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id) 19 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 20 ` 21 22 result, err := e.Exec(query, 23 notification.RecipientDid, 24 notification.ActorDid, 25 string(notification.Type), 26 notification.EntityType, 27 notification.EntityId, 28 notification.Read, 29 notification.RepoId, 30 notification.IssueId, 31 notification.PullId, 32 ) 33 if err != nil { 34 return fmt.Errorf("failed to create notification: %w", err) 35 } 36 37 id, err := result.LastInsertId() 38 if err != nil { 39 return fmt.Errorf("failed to get notification ID: %w", err) 40 } 41 42 notification.ID = id 43 return nil 44} 45 46// GetNotificationsPaginated retrieves notifications with filters and pagination 47func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) { 48 var conditions []string 49 var args []any 50 51 for _, filter := range filters { 52 conditions = append(conditions, filter.Condition()) 53 args = append(args, filter.Arg()...) 54 } 55 56 whereClause := "" 57 if len(conditions) > 0 { 58 whereClause = "WHERE " + conditions[0] 59 for _, condition := range conditions[1:] { 60 whereClause += " AND " + condition 61 } 62 } 63 pageClause := "" 64 if page.Limit > 0 { 65 pageClause = " limit ? offset ? " 66 args = append(args, page.Limit, page.Offset) 67 } 68 69 query := fmt.Sprintf(` 70 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id 71 from notifications 72 %s 73 order by created desc 74 %s 75 `, whereClause, pageClause) 76 77 rows, err := e.QueryContext(context.Background(), query, args...) 78 if err != nil { 79 return nil, fmt.Errorf("failed to query notifications: %w", err) 80 } 81 defer rows.Close() 82 83 var notifications []*models.Notification 84 for rows.Next() { 85 var n models.Notification 86 var typeStr string 87 var createdStr string 88 err := rows.Scan( 89 &n.ID, 90 &n.RecipientDid, 91 &n.ActorDid, 92 &typeStr, 93 &n.EntityType, 94 &n.EntityId, 95 &n.Read, 96 &createdStr, 97 &n.RepoId, 98 &n.IssueId, 99 &n.PullId, 100 ) 101 if err != nil { 102 return nil, fmt.Errorf("failed to scan notification: %w", err) 103 } 104 n.Type = models.NotificationType(typeStr) 105 n.Created, err = time.Parse(time.RFC3339, createdStr) 106 if err != nil { 107 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 108 } 109 notifications = append(notifications, &n) 110 } 111 112 return notifications, nil 113} 114 115// GetNotificationsWithEntities retrieves notifications with their related entities 116func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) { 117 var conditions []string 118 var args []any 119 120 for _, filter := range filters { 121 conditions = append(conditions, filter.Condition()) 122 args = append(args, filter.Arg()...) 123 } 124 125 whereClause := "" 126 if len(conditions) > 0 { 127 whereClause = "WHERE " + conditions[0] 128 for _, condition := range conditions[1:] { 129 whereClause += " AND " + condition 130 } 131 } 132 133 query := fmt.Sprintf(` 134 select 135 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id, 136 n.read, n.created, n.repo_id, n.issue_id, n.pull_id, 137 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description, 138 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open, 139 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state 140 from notifications n 141 left join repos r on n.repo_id = r.id 142 left join issues i on n.issue_id = i.id 143 left join pulls p on n.pull_id = p.id 144 %s 145 order by n.created desc 146 limit ? offset ? 147 `, whereClause) 148 149 args = append(args, page.Limit, page.Offset) 150 151 rows, err := e.QueryContext(context.Background(), query, args...) 152 if err != nil { 153 return nil, fmt.Errorf("failed to query notifications with entities: %w", err) 154 } 155 defer rows.Close() 156 157 var notifications []*models.NotificationWithEntity 158 for rows.Next() { 159 var n models.Notification 160 var typeStr string 161 var createdStr string 162 var repo models.Repo 163 var issue models.Issue 164 var pull models.Pull 165 var rId, iId, pId sql.NullInt64 166 var rDid, rName, rDescription sql.NullString 167 var iDid sql.NullString 168 var iIssueId sql.NullInt64 169 var iTitle sql.NullString 170 var iOpen sql.NullBool 171 var pOwnerDid sql.NullString 172 var pPullId sql.NullInt64 173 var pTitle sql.NullString 174 var pState sql.NullInt64 175 176 err := rows.Scan( 177 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId, 178 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId, 179 &rId, &rDid, &rName, &rDescription, 180 &iId, &iDid, &iIssueId, &iTitle, &iOpen, 181 &pId, &pOwnerDid, &pPullId, &pTitle, &pState, 182 ) 183 if err != nil { 184 return nil, fmt.Errorf("failed to scan notification with entities: %w", err) 185 } 186 187 n.Type = models.NotificationType(typeStr) 188 n.Created, err = time.Parse(time.RFC3339, createdStr) 189 if err != nil { 190 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 191 } 192 193 nwe := &models.NotificationWithEntity{Notification: &n} 194 195 // populate repo if present 196 if rId.Valid { 197 repo.Id = rId.Int64 198 if rDid.Valid { 199 repo.Did = rDid.String 200 } 201 if rName.Valid { 202 repo.Name = rName.String 203 } 204 if rDescription.Valid { 205 repo.Description = rDescription.String 206 } 207 nwe.Repo = &repo 208 } 209 210 // populate issue if present 211 if iId.Valid { 212 issue.Id = iId.Int64 213 if iDid.Valid { 214 issue.Did = iDid.String 215 } 216 if iIssueId.Valid { 217 issue.IssueId = int(iIssueId.Int64) 218 } 219 if iTitle.Valid { 220 issue.Title = iTitle.String 221 } 222 if iOpen.Valid { 223 issue.Open = iOpen.Bool 224 } 225 nwe.Issue = &issue 226 } 227 228 // populate pull if present 229 if pId.Valid { 230 pull.ID = int(pId.Int64) 231 if pOwnerDid.Valid { 232 pull.OwnerDid = pOwnerDid.String 233 } 234 if pPullId.Valid { 235 pull.PullId = int(pPullId.Int64) 236 } 237 if pTitle.Valid { 238 pull.Title = pTitle.String 239 } 240 if pState.Valid { 241 pull.State = models.PullState(pState.Int64) 242 } 243 nwe.Pull = &pull 244 } 245 246 notifications = append(notifications, nwe) 247 } 248 249 return notifications, nil 250} 251 252// GetNotifications retrieves notifications with filters 253func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) { 254 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...) 255} 256 257func CountNotifications(e Execer, filters ...filter) (int64, error) { 258 var conditions []string 259 var args []any 260 for _, filter := range filters { 261 conditions = append(conditions, filter.Condition()) 262 args = append(args, filter.Arg()...) 263 } 264 265 whereClause := "" 266 if conditions != nil { 267 whereClause = " where " + strings.Join(conditions, " and ") 268 } 269 270 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause) 271 var count int64 272 err := e.QueryRow(query, args...).Scan(&count) 273 274 if !errors.Is(err, sql.ErrNoRows) && err != nil { 275 return 0, err 276 } 277 278 return count, nil 279} 280 281func MarkNotificationRead(e Execer, notificationID int64, userDID string) error { 282 idFilter := FilterEq("id", notificationID) 283 recipientFilter := FilterEq("recipient_did", userDID) 284 285 query := fmt.Sprintf(` 286 UPDATE notifications 287 SET read = 1 288 WHERE %s AND %s 289 `, idFilter.Condition(), recipientFilter.Condition()) 290 291 args := append(idFilter.Arg(), recipientFilter.Arg()...) 292 293 result, err := e.Exec(query, args...) 294 if err != nil { 295 return fmt.Errorf("failed to mark notification as read: %w", err) 296 } 297 298 rowsAffected, err := result.RowsAffected() 299 if err != nil { 300 return fmt.Errorf("failed to get rows affected: %w", err) 301 } 302 303 if rowsAffected == 0 { 304 return fmt.Errorf("notification not found or access denied") 305 } 306 307 return nil 308} 309 310func MarkAllNotificationsRead(e Execer, userDID string) error { 311 recipientFilter := FilterEq("recipient_did", userDID) 312 readFilter := FilterEq("read", 0) 313 314 query := fmt.Sprintf(` 315 UPDATE notifications 316 SET read = 1 317 WHERE %s AND %s 318 `, recipientFilter.Condition(), readFilter.Condition()) 319 320 args := append(recipientFilter.Arg(), readFilter.Arg()...) 321 322 _, err := e.Exec(query, args...) 323 if err != nil { 324 return fmt.Errorf("failed to mark all notifications as read: %w", err) 325 } 326 327 return nil 328} 329 330func DeleteNotification(e Execer, notificationID int64, userDID string) error { 331 idFilter := FilterEq("id", notificationID) 332 recipientFilter := FilterEq("recipient_did", userDID) 333 334 query := fmt.Sprintf(` 335 DELETE FROM notifications 336 WHERE %s AND %s 337 `, idFilter.Condition(), recipientFilter.Condition()) 338 339 args := append(idFilter.Arg(), recipientFilter.Arg()...) 340 341 result, err := e.Exec(query, args...) 342 if err != nil { 343 return fmt.Errorf("failed to delete notification: %w", err) 344 } 345 346 rowsAffected, err := result.RowsAffected() 347 if err != nil { 348 return fmt.Errorf("failed to get rows affected: %w", err) 349 } 350 351 if rowsAffected == 0 { 352 return fmt.Errorf("notification not found or access denied") 353 } 354 355 return nil 356} 357 358func GetNotificationPreference(e Execer, userDid string) (*models.NotificationPreferences, error) { 359 prefs, err := GetNotificationPreferences(e, FilterEq("user_did", userDid)) 360 if err != nil { 361 return nil, err 362 } 363 364 p, ok := prefs[syntax.DID(userDid)] 365 if !ok { 366 return models.DefaultNotificationPreferences(syntax.DID(userDid)), nil 367 } 368 369 return p, nil 370} 371 372func GetNotificationPreferences(e Execer, filters ...filter) (map[syntax.DID]*models.NotificationPreferences, error) { 373 prefsMap := make(map[syntax.DID]*models.NotificationPreferences) 374 375 var conditions []string 376 var args []any 377 for _, filter := range filters { 378 conditions = append(conditions, filter.Condition()) 379 args = append(args, filter.Arg()...) 380 } 381 382 whereClause := "" 383 if conditions != nil { 384 whereClause = " where " + strings.Join(conditions, " and ") 385 } 386 387 query := fmt.Sprintf(` 388 select 389 id, 390 user_did, 391 repo_starred, 392 issue_created, 393 issue_commented, 394 pull_created, 395 pull_commented, 396 followed, 397 pull_merged, 398 issue_closed, 399 email_notifications 400 from 401 notification_preferences 402 %s 403 `, whereClause) 404 405 rows, err := e.Query(query, args...) 406 if err != nil { 407 return nil, err 408 } 409 defer rows.Close() 410 411 for rows.Next() { 412 var prefs models.NotificationPreferences 413 if err := rows.Scan( 414 &prefs.ID, 415 &prefs.UserDid, 416 &prefs.RepoStarred, 417 &prefs.IssueCreated, 418 &prefs.IssueCommented, 419 &prefs.PullCreated, 420 &prefs.PullCommented, 421 &prefs.Followed, 422 &prefs.PullMerged, 423 &prefs.IssueClosed, 424 &prefs.EmailNotifications, 425 ); err != nil { 426 return nil, err 427 } 428 429 prefsMap[prefs.UserDid] = &prefs 430 } 431 432 if err := rows.Err(); err != nil { 433 return nil, err 434 } 435 436 return prefsMap, nil 437} 438 439func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error { 440 query := ` 441 INSERT OR REPLACE INTO notification_preferences 442 (user_did, repo_starred, issue_created, issue_commented, pull_created, 443 pull_commented, followed, pull_merged, issue_closed, email_notifications) 444 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 445 ` 446 447 result, err := d.DB.ExecContext(ctx, query, 448 prefs.UserDid, 449 prefs.RepoStarred, 450 prefs.IssueCreated, 451 prefs.IssueCommented, 452 prefs.PullCreated, 453 prefs.PullCommented, 454 prefs.Followed, 455 prefs.PullMerged, 456 prefs.IssueClosed, 457 prefs.EmailNotifications, 458 ) 459 if err != nil { 460 return fmt.Errorf("failed to update notification preferences: %w", err) 461 } 462 463 if prefs.ID == 0 { 464 id, err := result.LastInsertId() 465 if err != nil { 466 return fmt.Errorf("failed to get preferences ID: %w", err) 467 } 468 prefs.ID = id 469 } 470 471 return nil 472} 473 474func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error { 475 cutoff := time.Now().Add(-olderThan) 476 createdFilter := FilterLte("created", cutoff) 477 478 query := fmt.Sprintf(` 479 DELETE FROM notifications 480 WHERE %s 481 `, createdFilter.Condition()) 482 483 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...) 484 if err != nil { 485 return fmt.Errorf("failed to cleanup old notifications: %w", err) 486 } 487 488 return nil 489}