1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "time"
8
9 "tangled.org/core/appview/models"
10 "tangled.org/core/appview/pagination"
11)
12
13func (d *DB) CreateNotification(ctx context.Context, notification *models.Notification) error {
14 query := `
15 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id)
16 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
17 `
18
19 result, err := d.DB.ExecContext(ctx, query,
20 notification.RecipientDid,
21 notification.ActorDid,
22 string(notification.Type),
23 notification.EntityType,
24 notification.EntityId,
25 notification.Read,
26 notification.RepoId,
27 notification.IssueId,
28 notification.PullId,
29 )
30 if err != nil {
31 return fmt.Errorf("failed to create notification: %w", err)
32 }
33
34 id, err := result.LastInsertId()
35 if err != nil {
36 return fmt.Errorf("failed to get notification ID: %w", err)
37 }
38
39 notification.ID = id
40 return nil
41}
42
43// GetNotificationsPaginated retrieves notifications with filters and pagination
44func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) {
45 var conditions []string
46 var args []any
47
48 for _, filter := range filters {
49 conditions = append(conditions, filter.Condition())
50 args = append(args, filter.Arg()...)
51 }
52
53 whereClause := ""
54 if len(conditions) > 0 {
55 whereClause = "WHERE " + conditions[0]
56 for _, condition := range conditions[1:] {
57 whereClause += " AND " + condition
58 }
59 }
60
61 query := fmt.Sprintf(`
62 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id
63 from notifications
64 %s
65 order by created desc
66 limit ? offset ?
67 `, whereClause)
68
69 args = append(args, page.Limit, page.Offset)
70
71 rows, err := e.QueryContext(context.Background(), query, args...)
72 if err != nil {
73 return nil, fmt.Errorf("failed to query notifications: %w", err)
74 }
75 defer rows.Close()
76
77 var notifications []*models.Notification
78 for rows.Next() {
79 var n models.Notification
80 var typeStr string
81 var createdStr string
82 err := rows.Scan(
83 &n.ID,
84 &n.RecipientDid,
85 &n.ActorDid,
86 &typeStr,
87 &n.EntityType,
88 &n.EntityId,
89 &n.Read,
90 &createdStr,
91 &n.RepoId,
92 &n.IssueId,
93 &n.PullId,
94 )
95 if err != nil {
96 return nil, fmt.Errorf("failed to scan notification: %w", err)
97 }
98 n.Type = models.NotificationType(typeStr)
99 n.Created, err = time.Parse(time.RFC3339, createdStr)
100 if err != nil {
101 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
102 }
103 notifications = append(notifications, &n)
104 }
105
106 return notifications, nil
107}
108
109// GetNotificationsWithEntities retrieves notifications with their related entities
110func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) {
111 var conditions []string
112 var args []any
113
114 for _, filter := range filters {
115 conditions = append(conditions, filter.Condition())
116 args = append(args, filter.Arg()...)
117 }
118
119 whereClause := ""
120 if len(conditions) > 0 {
121 whereClause = "WHERE " + conditions[0]
122 for _, condition := range conditions[1:] {
123 whereClause += " AND " + condition
124 }
125 }
126
127 query := fmt.Sprintf(`
128 select
129 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id,
130 n.read, n.created, n.repo_id, n.issue_id, n.pull_id,
131 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description,
132 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open,
133 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state
134 from notifications n
135 left join repos r on n.repo_id = r.id
136 left join issues i on n.issue_id = i.id
137 left join pulls p on n.pull_id = p.id
138 %s
139 order by n.created desc
140 limit ? offset ?
141 `, whereClause)
142
143 args = append(args, page.Limit, page.Offset)
144
145 rows, err := e.QueryContext(context.Background(), query, args...)
146 if err != nil {
147 return nil, fmt.Errorf("failed to query notifications with entities: %w", err)
148 }
149 defer rows.Close()
150
151 var notifications []*models.NotificationWithEntity
152 for rows.Next() {
153 var n models.Notification
154 var typeStr string
155 var createdStr string
156 var repo models.Repo
157 var issue models.Issue
158 var pull models.Pull
159 var rId, iId, pId sql.NullInt64
160 var rDid, rName, rDescription sql.NullString
161 var iDid sql.NullString
162 var iIssueId sql.NullInt64
163 var iTitle sql.NullString
164 var iOpen sql.NullBool
165 var pOwnerDid sql.NullString
166 var pPullId sql.NullInt64
167 var pTitle sql.NullString
168 var pState sql.NullInt64
169
170 err := rows.Scan(
171 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId,
172 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId,
173 &rId, &rDid, &rName, &rDescription,
174 &iId, &iDid, &iIssueId, &iTitle, &iOpen,
175 &pId, &pOwnerDid, &pPullId, &pTitle, &pState,
176 )
177 if err != nil {
178 return nil, fmt.Errorf("failed to scan notification with entities: %w", err)
179 }
180
181 n.Type = models.NotificationType(typeStr)
182 n.Created, err = time.Parse(time.RFC3339, createdStr)
183 if err != nil {
184 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
185 }
186
187 nwe := &models.NotificationWithEntity{Notification: &n}
188
189 // populate repo if present
190 if rId.Valid {
191 repo.Id = rId.Int64
192 if rDid.Valid {
193 repo.Did = rDid.String
194 }
195 if rName.Valid {
196 repo.Name = rName.String
197 }
198 if rDescription.Valid {
199 repo.Description = rDescription.String
200 }
201 nwe.Repo = &repo
202 }
203
204 // populate issue if present
205 if iId.Valid {
206 issue.Id = iId.Int64
207 if iDid.Valid {
208 issue.Did = iDid.String
209 }
210 if iIssueId.Valid {
211 issue.IssueId = int(iIssueId.Int64)
212 }
213 if iTitle.Valid {
214 issue.Title = iTitle.String
215 }
216 if iOpen.Valid {
217 issue.Open = iOpen.Bool
218 }
219 nwe.Issue = &issue
220 }
221
222 // populate pull if present
223 if pId.Valid {
224 pull.ID = int(pId.Int64)
225 if pOwnerDid.Valid {
226 pull.OwnerDid = pOwnerDid.String
227 }
228 if pPullId.Valid {
229 pull.PullId = int(pPullId.Int64)
230 }
231 if pTitle.Valid {
232 pull.Title = pTitle.String
233 }
234 if pState.Valid {
235 pull.State = models.PullState(pState.Int64)
236 }
237 nwe.Pull = &pull
238 }
239
240 notifications = append(notifications, nwe)
241 }
242
243 return notifications, nil
244}
245
246// GetNotifications retrieves notifications with filters
247func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) {
248 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...)
249}
250
251// GetNotifications retrieves notifications for a user with pagination (legacy method for backward compatibility)
252func (d *DB) GetNotifications(ctx context.Context, userDID string, limit, offset int) ([]*models.Notification, error) {
253 page := pagination.Page{Limit: limit, Offset: offset}
254 return GetNotificationsPaginated(d.DB, page, FilterEq("recipient_did", userDID))
255}
256
257// GetNotificationsWithEntities retrieves notifications with entities for a user with pagination
258func (d *DB) GetNotificationsWithEntities(ctx context.Context, userDID string, limit, offset int) ([]*models.NotificationWithEntity, error) {
259 page := pagination.Page{Limit: limit, Offset: offset}
260 return GetNotificationsWithEntities(d.DB, page, FilterEq("recipient_did", userDID))
261}
262
263func (d *DB) GetUnreadNotificationCount(ctx context.Context, userDID string) (int, error) {
264 recipientFilter := FilterEq("recipient_did", userDID)
265 readFilter := FilterEq("read", 0)
266
267 query := fmt.Sprintf(`
268 SELECT COUNT(*)
269 FROM notifications
270 WHERE %s AND %s
271 `, recipientFilter.Condition(), readFilter.Condition())
272
273 args := append(recipientFilter.Arg(), readFilter.Arg()...)
274
275 var count int
276 err := d.DB.QueryRowContext(ctx, query, args...).Scan(&count)
277 if err != nil {
278 return 0, fmt.Errorf("failed to get unread count: %w", err)
279 }
280
281 return count, nil
282}
283
284func (d *DB) MarkNotificationRead(ctx context.Context, notificationID int64, userDID string) error {
285 idFilter := FilterEq("id", notificationID)
286 recipientFilter := FilterEq("recipient_did", userDID)
287
288 query := fmt.Sprintf(`
289 UPDATE notifications
290 SET read = 1
291 WHERE %s AND %s
292 `, idFilter.Condition(), recipientFilter.Condition())
293
294 args := append(idFilter.Arg(), recipientFilter.Arg()...)
295
296 result, err := d.DB.ExecContext(ctx, query, args...)
297 if err != nil {
298 return fmt.Errorf("failed to mark notification as read: %w", err)
299 }
300
301 rowsAffected, err := result.RowsAffected()
302 if err != nil {
303 return fmt.Errorf("failed to get rows affected: %w", err)
304 }
305
306 if rowsAffected == 0 {
307 return fmt.Errorf("notification not found or access denied")
308 }
309
310 return nil
311}
312
313func (d *DB) MarkAllNotificationsRead(ctx context.Context, userDID string) error {
314 recipientFilter := FilterEq("recipient_did", userDID)
315 readFilter := FilterEq("read", 0)
316
317 query := fmt.Sprintf(`
318 UPDATE notifications
319 SET read = 1
320 WHERE %s AND %s
321 `, recipientFilter.Condition(), readFilter.Condition())
322
323 args := append(recipientFilter.Arg(), readFilter.Arg()...)
324
325 _, err := d.DB.ExecContext(ctx, query, args...)
326 if err != nil {
327 return fmt.Errorf("failed to mark all notifications as read: %w", err)
328 }
329
330 return nil
331}
332
333func (d *DB) DeleteNotification(ctx context.Context, notificationID int64, userDID string) error {
334 idFilter := FilterEq("id", notificationID)
335 recipientFilter := FilterEq("recipient_did", userDID)
336
337 query := fmt.Sprintf(`
338 DELETE FROM notifications
339 WHERE %s AND %s
340 `, idFilter.Condition(), recipientFilter.Condition())
341
342 args := append(idFilter.Arg(), recipientFilter.Arg()...)
343
344 result, err := d.DB.ExecContext(ctx, query, args...)
345 if err != nil {
346 return fmt.Errorf("failed to delete notification: %w", err)
347 }
348
349 rowsAffected, err := result.RowsAffected()
350 if err != nil {
351 return fmt.Errorf("failed to get rows affected: %w", err)
352 }
353
354 if rowsAffected == 0 {
355 return fmt.Errorf("notification not found or access denied")
356 }
357
358 return nil
359}
360
361func (d *DB) GetNotificationPreferences(ctx context.Context, userDID string) (*models.NotificationPreferences, error) {
362 userFilter := FilterEq("user_did", userDID)
363
364 query := fmt.Sprintf(`
365 SELECT id, user_did, repo_starred, issue_created, issue_commented, pull_created,
366 pull_commented, followed, pull_merged, issue_closed, email_notifications
367 FROM notification_preferences
368 WHERE %s
369 `, userFilter.Condition())
370
371 var prefs models.NotificationPreferences
372 err := d.DB.QueryRowContext(ctx, query, userFilter.Arg()...).Scan(
373 &prefs.ID,
374 &prefs.UserDid,
375 &prefs.RepoStarred,
376 &prefs.IssueCreated,
377 &prefs.IssueCommented,
378 &prefs.PullCreated,
379 &prefs.PullCommented,
380 &prefs.Followed,
381 &prefs.PullMerged,
382 &prefs.IssueClosed,
383 &prefs.EmailNotifications,
384 )
385
386 if err != nil {
387 if err == sql.ErrNoRows {
388 return &models.NotificationPreferences{
389 UserDid: userDID,
390 RepoStarred: true,
391 IssueCreated: true,
392 IssueCommented: true,
393 PullCreated: true,
394 PullCommented: true,
395 Followed: true,
396 PullMerged: true,
397 IssueClosed: true,
398 EmailNotifications: false,
399 }, nil
400 }
401 return nil, fmt.Errorf("failed to get notification preferences: %w", err)
402 }
403
404 return &prefs, nil
405}
406
407func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error {
408 query := `
409 INSERT OR REPLACE INTO notification_preferences
410 (user_did, repo_starred, issue_created, issue_commented, pull_created,
411 pull_commented, followed, pull_merged, issue_closed, email_notifications)
412 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
413 `
414
415 result, err := d.DB.ExecContext(ctx, query,
416 prefs.UserDid,
417 prefs.RepoStarred,
418 prefs.IssueCreated,
419 prefs.IssueCommented,
420 prefs.PullCreated,
421 prefs.PullCommented,
422 prefs.Followed,
423 prefs.PullMerged,
424 prefs.IssueClosed,
425 prefs.EmailNotifications,
426 )
427 if err != nil {
428 return fmt.Errorf("failed to update notification preferences: %w", err)
429 }
430
431 if prefs.ID == 0 {
432 id, err := result.LastInsertId()
433 if err != nil {
434 return fmt.Errorf("failed to get preferences ID: %w", err)
435 }
436 prefs.ID = id
437 }
438
439 return nil
440}
441
442func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error {
443 cutoff := time.Now().Add(-olderThan)
444 createdFilter := FilterLte("created", cutoff)
445
446 query := fmt.Sprintf(`
447 DELETE FROM notifications
448 WHERE %s
449 `, createdFilter.Condition())
450
451 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...)
452 if err != nil {
453 return fmt.Errorf("failed to cleanup old notifications: %w", err)
454 }
455
456 return nil
457}