forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "time" 8 9 "tangled.org/core/appview/models" 10 "tangled.org/core/appview/pagination" 11) 12 13func (d *DB) CreateNotification(ctx context.Context, notification *models.Notification) error { 14 query := ` 15 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id) 16 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 17 ` 18 19 result, err := d.DB.ExecContext(ctx, query, 20 notification.RecipientDid, 21 notification.ActorDid, 22 string(notification.Type), 23 notification.EntityType, 24 notification.EntityId, 25 notification.Read, 26 notification.RepoId, 27 notification.IssueId, 28 notification.PullId, 29 ) 30 if err != nil { 31 return fmt.Errorf("failed to create notification: %w", err) 32 } 33 34 id, err := result.LastInsertId() 35 if err != nil { 36 return fmt.Errorf("failed to get notification ID: %w", err) 37 } 38 39 notification.ID = id 40 return nil 41} 42 43// GetNotificationsPaginated retrieves notifications with filters and pagination 44func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) { 45 var conditions []string 46 var args []any 47 48 for _, filter := range filters { 49 conditions = append(conditions, filter.Condition()) 50 args = append(args, filter.Arg()...) 51 } 52 53 whereClause := "" 54 if len(conditions) > 0 { 55 whereClause = "WHERE " + conditions[0] 56 for _, condition := range conditions[1:] { 57 whereClause += " AND " + condition 58 } 59 } 60 61 query := fmt.Sprintf(` 62 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id 63 from notifications 64 %s 65 order by created desc 66 limit ? offset ? 67 `, whereClause) 68 69 args = append(args, page.Limit, page.Offset) 70 71 rows, err := e.QueryContext(context.Background(), query, args...) 72 if err != nil { 73 return nil, fmt.Errorf("failed to query notifications: %w", err) 74 } 75 defer rows.Close() 76 77 var notifications []*models.Notification 78 for rows.Next() { 79 var n models.Notification 80 var typeStr string 81 var createdStr string 82 err := rows.Scan( 83 &n.ID, 84 &n.RecipientDid, 85 &n.ActorDid, 86 &typeStr, 87 &n.EntityType, 88 &n.EntityId, 89 &n.Read, 90 &createdStr, 91 &n.RepoId, 92 &n.IssueId, 93 &n.PullId, 94 ) 95 if err != nil { 96 return nil, fmt.Errorf("failed to scan notification: %w", err) 97 } 98 n.Type = models.NotificationType(typeStr) 99 n.Created, err = time.Parse(time.RFC3339, createdStr) 100 if err != nil { 101 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 102 } 103 notifications = append(notifications, &n) 104 } 105 106 return notifications, nil 107} 108 109// GetNotificationsWithEntities retrieves notifications with their related entities 110func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) { 111 var conditions []string 112 var args []any 113 114 for _, filter := range filters { 115 conditions = append(conditions, filter.Condition()) 116 args = append(args, filter.Arg()...) 117 } 118 119 whereClause := "" 120 if len(conditions) > 0 { 121 whereClause = "WHERE " + conditions[0] 122 for _, condition := range conditions[1:] { 123 whereClause += " AND " + condition 124 } 125 } 126 127 query := fmt.Sprintf(` 128 select 129 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id, 130 n.read, n.created, n.repo_id, n.issue_id, n.pull_id, 131 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description, 132 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open, 133 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state 134 from notifications n 135 left join repos r on n.repo_id = r.id 136 left join issues i on n.issue_id = i.id 137 left join pulls p on n.pull_id = p.id 138 %s 139 order by n.created desc 140 limit ? offset ? 141 `, whereClause) 142 143 args = append(args, page.Limit, page.Offset) 144 145 rows, err := e.QueryContext(context.Background(), query, args...) 146 if err != nil { 147 return nil, fmt.Errorf("failed to query notifications with entities: %w", err) 148 } 149 defer rows.Close() 150 151 var notifications []*models.NotificationWithEntity 152 for rows.Next() { 153 var n models.Notification 154 var typeStr string 155 var createdStr string 156 var repo models.Repo 157 var issue models.Issue 158 var pull models.Pull 159 var rId, iId, pId sql.NullInt64 160 var rDid, rName, rDescription sql.NullString 161 var iDid sql.NullString 162 var iIssueId sql.NullInt64 163 var iTitle sql.NullString 164 var iOpen sql.NullBool 165 var pOwnerDid sql.NullString 166 var pPullId sql.NullInt64 167 var pTitle sql.NullString 168 var pState sql.NullInt64 169 170 err := rows.Scan( 171 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId, 172 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId, 173 &rId, &rDid, &rName, &rDescription, 174 &iId, &iDid, &iIssueId, &iTitle, &iOpen, 175 &pId, &pOwnerDid, &pPullId, &pTitle, &pState, 176 ) 177 if err != nil { 178 return nil, fmt.Errorf("failed to scan notification with entities: %w", err) 179 } 180 181 n.Type = models.NotificationType(typeStr) 182 n.Created, err = time.Parse(time.RFC3339, createdStr) 183 if err != nil { 184 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 185 } 186 187 nwe := &models.NotificationWithEntity{Notification: &n} 188 189 // populate repo if present 190 if rId.Valid { 191 repo.Id = rId.Int64 192 if rDid.Valid { 193 repo.Did = rDid.String 194 } 195 if rName.Valid { 196 repo.Name = rName.String 197 } 198 if rDescription.Valid { 199 repo.Description = rDescription.String 200 } 201 nwe.Repo = &repo 202 } 203 204 // populate issue if present 205 if iId.Valid { 206 issue.Id = iId.Int64 207 if iDid.Valid { 208 issue.Did = iDid.String 209 } 210 if iIssueId.Valid { 211 issue.IssueId = int(iIssueId.Int64) 212 } 213 if iTitle.Valid { 214 issue.Title = iTitle.String 215 } 216 if iOpen.Valid { 217 issue.Open = iOpen.Bool 218 } 219 nwe.Issue = &issue 220 } 221 222 // populate pull if present 223 if pId.Valid { 224 pull.ID = int(pId.Int64) 225 if pOwnerDid.Valid { 226 pull.OwnerDid = pOwnerDid.String 227 } 228 if pPullId.Valid { 229 pull.PullId = int(pPullId.Int64) 230 } 231 if pTitle.Valid { 232 pull.Title = pTitle.String 233 } 234 if pState.Valid { 235 pull.State = models.PullState(pState.Int64) 236 } 237 nwe.Pull = &pull 238 } 239 240 notifications = append(notifications, nwe) 241 } 242 243 return notifications, nil 244} 245 246// GetNotifications retrieves notifications with filters 247func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) { 248 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...) 249} 250 251// GetNotifications retrieves notifications for a user with pagination (legacy method for backward compatibility) 252func (d *DB) GetNotifications(ctx context.Context, userDID string, limit, offset int) ([]*models.Notification, error) { 253 page := pagination.Page{Limit: limit, Offset: offset} 254 return GetNotificationsPaginated(d.DB, page, FilterEq("recipient_did", userDID)) 255} 256 257// GetNotificationsWithEntities retrieves notifications with entities for a user with pagination 258func (d *DB) GetNotificationsWithEntities(ctx context.Context, userDID string, limit, offset int) ([]*models.NotificationWithEntity, error) { 259 page := pagination.Page{Limit: limit, Offset: offset} 260 return GetNotificationsWithEntities(d.DB, page, FilterEq("recipient_did", userDID)) 261} 262 263func (d *DB) GetUnreadNotificationCount(ctx context.Context, userDID string) (int, error) { 264 recipientFilter := FilterEq("recipient_did", userDID) 265 readFilter := FilterEq("read", 0) 266 267 query := fmt.Sprintf(` 268 SELECT COUNT(*) 269 FROM notifications 270 WHERE %s AND %s 271 `, recipientFilter.Condition(), readFilter.Condition()) 272 273 args := append(recipientFilter.Arg(), readFilter.Arg()...) 274 275 var count int 276 err := d.DB.QueryRowContext(ctx, query, args...).Scan(&count) 277 if err != nil { 278 return 0, fmt.Errorf("failed to get unread count: %w", err) 279 } 280 281 return count, nil 282} 283 284func (d *DB) MarkNotificationRead(ctx context.Context, notificationID int64, userDID string) error { 285 idFilter := FilterEq("id", notificationID) 286 recipientFilter := FilterEq("recipient_did", userDID) 287 288 query := fmt.Sprintf(` 289 UPDATE notifications 290 SET read = 1 291 WHERE %s AND %s 292 `, idFilter.Condition(), recipientFilter.Condition()) 293 294 args := append(idFilter.Arg(), recipientFilter.Arg()...) 295 296 result, err := d.DB.ExecContext(ctx, query, args...) 297 if err != nil { 298 return fmt.Errorf("failed to mark notification as read: %w", err) 299 } 300 301 rowsAffected, err := result.RowsAffected() 302 if err != nil { 303 return fmt.Errorf("failed to get rows affected: %w", err) 304 } 305 306 if rowsAffected == 0 { 307 return fmt.Errorf("notification not found or access denied") 308 } 309 310 return nil 311} 312 313func (d *DB) MarkAllNotificationsRead(ctx context.Context, userDID string) error { 314 recipientFilter := FilterEq("recipient_did", userDID) 315 readFilter := FilterEq("read", 0) 316 317 query := fmt.Sprintf(` 318 UPDATE notifications 319 SET read = 1 320 WHERE %s AND %s 321 `, recipientFilter.Condition(), readFilter.Condition()) 322 323 args := append(recipientFilter.Arg(), readFilter.Arg()...) 324 325 _, err := d.DB.ExecContext(ctx, query, args...) 326 if err != nil { 327 return fmt.Errorf("failed to mark all notifications as read: %w", err) 328 } 329 330 return nil 331} 332 333func (d *DB) DeleteNotification(ctx context.Context, notificationID int64, userDID string) error { 334 idFilter := FilterEq("id", notificationID) 335 recipientFilter := FilterEq("recipient_did", userDID) 336 337 query := fmt.Sprintf(` 338 DELETE FROM notifications 339 WHERE %s AND %s 340 `, idFilter.Condition(), recipientFilter.Condition()) 341 342 args := append(idFilter.Arg(), recipientFilter.Arg()...) 343 344 result, err := d.DB.ExecContext(ctx, query, args...) 345 if err != nil { 346 return fmt.Errorf("failed to delete notification: %w", err) 347 } 348 349 rowsAffected, err := result.RowsAffected() 350 if err != nil { 351 return fmt.Errorf("failed to get rows affected: %w", err) 352 } 353 354 if rowsAffected == 0 { 355 return fmt.Errorf("notification not found or access denied") 356 } 357 358 return nil 359} 360 361func (d *DB) GetNotificationPreferences(ctx context.Context, userDID string) (*models.NotificationPreferences, error) { 362 userFilter := FilterEq("user_did", userDID) 363 364 query := fmt.Sprintf(` 365 SELECT id, user_did, repo_starred, issue_created, issue_commented, pull_created, 366 pull_commented, followed, pull_merged, issue_closed, email_notifications 367 FROM notification_preferences 368 WHERE %s 369 `, userFilter.Condition()) 370 371 var prefs models.NotificationPreferences 372 err := d.DB.QueryRowContext(ctx, query, userFilter.Arg()...).Scan( 373 &prefs.ID, 374 &prefs.UserDid, 375 &prefs.RepoStarred, 376 &prefs.IssueCreated, 377 &prefs.IssueCommented, 378 &prefs.PullCreated, 379 &prefs.PullCommented, 380 &prefs.Followed, 381 &prefs.PullMerged, 382 &prefs.IssueClosed, 383 &prefs.EmailNotifications, 384 ) 385 386 if err != nil { 387 if err == sql.ErrNoRows { 388 return &models.NotificationPreferences{ 389 UserDid: userDID, 390 RepoStarred: true, 391 IssueCreated: true, 392 IssueCommented: true, 393 PullCreated: true, 394 PullCommented: true, 395 Followed: true, 396 PullMerged: true, 397 IssueClosed: true, 398 EmailNotifications: false, 399 }, nil 400 } 401 return nil, fmt.Errorf("failed to get notification preferences: %w", err) 402 } 403 404 return &prefs, nil 405} 406 407func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error { 408 query := ` 409 INSERT OR REPLACE INTO notification_preferences 410 (user_did, repo_starred, issue_created, issue_commented, pull_created, 411 pull_commented, followed, pull_merged, issue_closed, email_notifications) 412 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 413 ` 414 415 result, err := d.DB.ExecContext(ctx, query, 416 prefs.UserDid, 417 prefs.RepoStarred, 418 prefs.IssueCreated, 419 prefs.IssueCommented, 420 prefs.PullCreated, 421 prefs.PullCommented, 422 prefs.Followed, 423 prefs.PullMerged, 424 prefs.IssueClosed, 425 prefs.EmailNotifications, 426 ) 427 if err != nil { 428 return fmt.Errorf("failed to update notification preferences: %w", err) 429 } 430 431 if prefs.ID == 0 { 432 id, err := result.LastInsertId() 433 if err != nil { 434 return fmt.Errorf("failed to get preferences ID: %w", err) 435 } 436 prefs.ID = id 437 } 438 439 return nil 440} 441 442func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error { 443 cutoff := time.Now().Add(-olderThan) 444 createdFilter := FilterLte("created", cutoff) 445 446 query := fmt.Sprintf(` 447 DELETE FROM notifications 448 WHERE %s 449 `, createdFilter.Condition()) 450 451 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...) 452 if err != nil { 453 return fmt.Errorf("failed to cleanup old notifications: %w", err) 454 } 455 456 return nil 457}