1package db
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/pagination"
13)
14
15func (d *DB) CreateNotification(ctx context.Context, notification *models.Notification) error {
16 query := `
17 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id)
18 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
19 `
20
21 result, err := d.DB.ExecContext(ctx, query,
22 notification.RecipientDid,
23 notification.ActorDid,
24 string(notification.Type),
25 notification.EntityType,
26 notification.EntityId,
27 notification.Read,
28 notification.RepoId,
29 notification.IssueId,
30 notification.PullId,
31 )
32 if err != nil {
33 return fmt.Errorf("failed to create notification: %w", err)
34 }
35
36 id, err := result.LastInsertId()
37 if err != nil {
38 return fmt.Errorf("failed to get notification ID: %w", err)
39 }
40
41 notification.ID = id
42 return nil
43}
44
45// GetNotificationsPaginated retrieves notifications with filters and pagination
46func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) {
47 var conditions []string
48 var args []any
49
50 for _, filter := range filters {
51 conditions = append(conditions, filter.Condition())
52 args = append(args, filter.Arg()...)
53 }
54
55 whereClause := ""
56 if len(conditions) > 0 {
57 whereClause = "WHERE " + conditions[0]
58 for _, condition := range conditions[1:] {
59 whereClause += " AND " + condition
60 }
61 }
62
63 query := fmt.Sprintf(`
64 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id
65 from notifications
66 %s
67 order by created desc
68 limit ? offset ?
69 `, whereClause)
70
71 args = append(args, page.Limit, page.Offset)
72
73 rows, err := e.QueryContext(context.Background(), query, args...)
74 if err != nil {
75 return nil, fmt.Errorf("failed to query notifications: %w", err)
76 }
77 defer rows.Close()
78
79 var notifications []*models.Notification
80 for rows.Next() {
81 var n models.Notification
82 var typeStr string
83 var createdStr string
84 err := rows.Scan(
85 &n.ID,
86 &n.RecipientDid,
87 &n.ActorDid,
88 &typeStr,
89 &n.EntityType,
90 &n.EntityId,
91 &n.Read,
92 &createdStr,
93 &n.RepoId,
94 &n.IssueId,
95 &n.PullId,
96 )
97 if err != nil {
98 return nil, fmt.Errorf("failed to scan notification: %w", err)
99 }
100 n.Type = models.NotificationType(typeStr)
101 n.Created, err = time.Parse(time.RFC3339, createdStr)
102 if err != nil {
103 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
104 }
105 notifications = append(notifications, &n)
106 }
107
108 return notifications, nil
109}
110
111// GetNotificationsWithEntities retrieves notifications with their related entities
112func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) {
113 var conditions []string
114 var args []any
115
116 for _, filter := range filters {
117 conditions = append(conditions, filter.Condition())
118 args = append(args, filter.Arg()...)
119 }
120
121 whereClause := ""
122 if len(conditions) > 0 {
123 whereClause = "WHERE " + conditions[0]
124 for _, condition := range conditions[1:] {
125 whereClause += " AND " + condition
126 }
127 }
128
129 query := fmt.Sprintf(`
130 select
131 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id,
132 n.read, n.created, n.repo_id, n.issue_id, n.pull_id,
133 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description,
134 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open,
135 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state
136 from notifications n
137 left join repos r on n.repo_id = r.id
138 left join issues i on n.issue_id = i.id
139 left join pulls p on n.pull_id = p.id
140 %s
141 order by n.created desc
142 limit ? offset ?
143 `, whereClause)
144
145 args = append(args, page.Limit, page.Offset)
146
147 rows, err := e.QueryContext(context.Background(), query, args...)
148 if err != nil {
149 return nil, fmt.Errorf("failed to query notifications with entities: %w", err)
150 }
151 defer rows.Close()
152
153 var notifications []*models.NotificationWithEntity
154 for rows.Next() {
155 var n models.Notification
156 var typeStr string
157 var createdStr string
158 var repo models.Repo
159 var issue models.Issue
160 var pull models.Pull
161 var rId, iId, pId sql.NullInt64
162 var rDid, rName, rDescription sql.NullString
163 var iDid sql.NullString
164 var iIssueId sql.NullInt64
165 var iTitle sql.NullString
166 var iOpen sql.NullBool
167 var pOwnerDid sql.NullString
168 var pPullId sql.NullInt64
169 var pTitle sql.NullString
170 var pState sql.NullInt64
171
172 err := rows.Scan(
173 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId,
174 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId,
175 &rId, &rDid, &rName, &rDescription,
176 &iId, &iDid, &iIssueId, &iTitle, &iOpen,
177 &pId, &pOwnerDid, &pPullId, &pTitle, &pState,
178 )
179 if err != nil {
180 return nil, fmt.Errorf("failed to scan notification with entities: %w", err)
181 }
182
183 n.Type = models.NotificationType(typeStr)
184 n.Created, err = time.Parse(time.RFC3339, createdStr)
185 if err != nil {
186 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
187 }
188
189 nwe := &models.NotificationWithEntity{Notification: &n}
190
191 // populate repo if present
192 if rId.Valid {
193 repo.Id = rId.Int64
194 if rDid.Valid {
195 repo.Did = rDid.String
196 }
197 if rName.Valid {
198 repo.Name = rName.String
199 }
200 if rDescription.Valid {
201 repo.Description = rDescription.String
202 }
203 nwe.Repo = &repo
204 }
205
206 // populate issue if present
207 if iId.Valid {
208 issue.Id = iId.Int64
209 if iDid.Valid {
210 issue.Did = iDid.String
211 }
212 if iIssueId.Valid {
213 issue.IssueId = int(iIssueId.Int64)
214 }
215 if iTitle.Valid {
216 issue.Title = iTitle.String
217 }
218 if iOpen.Valid {
219 issue.Open = iOpen.Bool
220 }
221 nwe.Issue = &issue
222 }
223
224 // populate pull if present
225 if pId.Valid {
226 pull.ID = int(pId.Int64)
227 if pOwnerDid.Valid {
228 pull.OwnerDid = pOwnerDid.String
229 }
230 if pPullId.Valid {
231 pull.PullId = int(pPullId.Int64)
232 }
233 if pTitle.Valid {
234 pull.Title = pTitle.String
235 }
236 if pState.Valid {
237 pull.State = models.PullState(pState.Int64)
238 }
239 nwe.Pull = &pull
240 }
241
242 notifications = append(notifications, nwe)
243 }
244
245 return notifications, nil
246}
247
248// GetNotifications retrieves notifications with filters
249func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) {
250 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...)
251}
252
253func CountNotifications(e Execer, filters ...filter) (int64, error) {
254 var conditions []string
255 var args []any
256 for _, filter := range filters {
257 conditions = append(conditions, filter.Condition())
258 args = append(args, filter.Arg()...)
259 }
260
261 whereClause := ""
262 if conditions != nil {
263 whereClause = " where " + strings.Join(conditions, " and ")
264 }
265
266 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause)
267 var count int64
268 err := e.QueryRow(query, args...).Scan(&count)
269
270 if !errors.Is(err, sql.ErrNoRows) && err != nil {
271 return 0, err
272 }
273
274 return count, nil
275}
276
277func (d *DB) MarkNotificationRead(ctx context.Context, notificationID int64, userDID string) error {
278 idFilter := FilterEq("id", notificationID)
279 recipientFilter := FilterEq("recipient_did", userDID)
280
281 query := fmt.Sprintf(`
282 UPDATE notifications
283 SET read = 1
284 WHERE %s AND %s
285 `, idFilter.Condition(), recipientFilter.Condition())
286
287 args := append(idFilter.Arg(), recipientFilter.Arg()...)
288
289 result, err := d.DB.ExecContext(ctx, query, args...)
290 if err != nil {
291 return fmt.Errorf("failed to mark notification as read: %w", err)
292 }
293
294 rowsAffected, err := result.RowsAffected()
295 if err != nil {
296 return fmt.Errorf("failed to get rows affected: %w", err)
297 }
298
299 if rowsAffected == 0 {
300 return fmt.Errorf("notification not found or access denied")
301 }
302
303 return nil
304}
305
306func (d *DB) MarkAllNotificationsRead(ctx context.Context, userDID string) error {
307 recipientFilter := FilterEq("recipient_did", userDID)
308 readFilter := FilterEq("read", 0)
309
310 query := fmt.Sprintf(`
311 UPDATE notifications
312 SET read = 1
313 WHERE %s AND %s
314 `, recipientFilter.Condition(), readFilter.Condition())
315
316 args := append(recipientFilter.Arg(), readFilter.Arg()...)
317
318 _, err := d.DB.ExecContext(ctx, query, args...)
319 if err != nil {
320 return fmt.Errorf("failed to mark all notifications as read: %w", err)
321 }
322
323 return nil
324}
325
326func (d *DB) DeleteNotification(ctx context.Context, notificationID int64, userDID string) error {
327 idFilter := FilterEq("id", notificationID)
328 recipientFilter := FilterEq("recipient_did", userDID)
329
330 query := fmt.Sprintf(`
331 DELETE FROM notifications
332 WHERE %s AND %s
333 `, idFilter.Condition(), recipientFilter.Condition())
334
335 args := append(idFilter.Arg(), recipientFilter.Arg()...)
336
337 result, err := d.DB.ExecContext(ctx, query, args...)
338 if err != nil {
339 return fmt.Errorf("failed to delete notification: %w", err)
340 }
341
342 rowsAffected, err := result.RowsAffected()
343 if err != nil {
344 return fmt.Errorf("failed to get rows affected: %w", err)
345 }
346
347 if rowsAffected == 0 {
348 return fmt.Errorf("notification not found or access denied")
349 }
350
351 return nil
352}
353
354func (d *DB) GetNotificationPreferences(ctx context.Context, userDID string) (*models.NotificationPreferences, error) {
355 userFilter := FilterEq("user_did", userDID)
356
357 query := fmt.Sprintf(`
358 SELECT id, user_did, repo_starred, issue_created, issue_commented, pull_created,
359 pull_commented, followed, pull_merged, issue_closed, email_notifications
360 FROM notification_preferences
361 WHERE %s
362 `, userFilter.Condition())
363
364 var prefs models.NotificationPreferences
365 err := d.DB.QueryRowContext(ctx, query, userFilter.Arg()...).Scan(
366 &prefs.ID,
367 &prefs.UserDid,
368 &prefs.RepoStarred,
369 &prefs.IssueCreated,
370 &prefs.IssueCommented,
371 &prefs.PullCreated,
372 &prefs.PullCommented,
373 &prefs.Followed,
374 &prefs.PullMerged,
375 &prefs.IssueClosed,
376 &prefs.EmailNotifications,
377 )
378
379 if err != nil {
380 if err == sql.ErrNoRows {
381 return &models.NotificationPreferences{
382 UserDid: userDID,
383 RepoStarred: true,
384 IssueCreated: true,
385 IssueCommented: true,
386 PullCreated: true,
387 PullCommented: true,
388 Followed: true,
389 PullMerged: true,
390 IssueClosed: true,
391 EmailNotifications: false,
392 }, nil
393 }
394 return nil, fmt.Errorf("failed to get notification preferences: %w", err)
395 }
396
397 return &prefs, nil
398}
399
400func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error {
401 query := `
402 INSERT OR REPLACE INTO notification_preferences
403 (user_did, repo_starred, issue_created, issue_commented, pull_created,
404 pull_commented, followed, pull_merged, issue_closed, email_notifications)
405 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
406 `
407
408 result, err := d.DB.ExecContext(ctx, query,
409 prefs.UserDid,
410 prefs.RepoStarred,
411 prefs.IssueCreated,
412 prefs.IssueCommented,
413 prefs.PullCreated,
414 prefs.PullCommented,
415 prefs.Followed,
416 prefs.PullMerged,
417 prefs.IssueClosed,
418 prefs.EmailNotifications,
419 )
420 if err != nil {
421 return fmt.Errorf("failed to update notification preferences: %w", err)
422 }
423
424 if prefs.ID == 0 {
425 id, err := result.LastInsertId()
426 if err != nil {
427 return fmt.Errorf("failed to get preferences ID: %w", err)
428 }
429 prefs.ID = id
430 }
431
432 return nil
433}
434
435func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error {
436 cutoff := time.Now().Add(-olderThan)
437 createdFilter := FilterLte("created", cutoff)
438
439 query := fmt.Sprintf(`
440 DELETE FROM notifications
441 WHERE %s
442 `, createdFilter.Condition())
443
444 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...)
445 if err != nil {
446 return fmt.Errorf("failed to cleanup old notifications: %w", err)
447 }
448
449 return nil
450}